Fix rowcache get returning incorrect timestamp (#11952)

Summary:
Fixes https://github.com/facebook/rocksdb/issues/7930.

When there is a timestamp associated with stored records, get from row cache will return the timestamp provided in query instead of the timestamp associated with the stored record.

## Cause of error:
Currently a row_handle is fetched using row_cache_key(contains a timestamp provided by user query) and the row_handle itself does not persist timestamp associated with the object. Hence the [GetContext::SaveValue()
](6e3429b8a6/table/get_context.cc (L257)) function will fetch the timestamp in row_cache_key and may return the incorrect timestamp value.

## Proposed Solution
If current cf enables ts, append a timestamp associated with stored records after the value in replay_log (equivalently the value of row cache entry).

When read, `replayGetContextLog()` will update parsed_key with the correct timestamp.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11952

Reviewed By: ajkr

Differential Revision: D51501176

Pulled By: jowlyzhang

fbshipit-source-id: 808fc943a8ae95de56ae0e82ec59a2573a031f28
This commit is contained in:
cz2h 2023-11-21 20:39:33 -08:00 committed by Facebook GitHub Bot
parent ddb7df10ef
commit 324453e579
3 changed files with 226 additions and 82 deletions

View File

@ -1646,35 +1646,88 @@ TEST_F(DBBasicTestWithTimestamp, GetWithRowCache) {
const Snapshot* snap_with_nothing = db_->GetSnapshot();
ASSERT_OK(db_->Put(write_opts, "foo", ts_early, "bar"));
const Snapshot* snap_with_foo = db_->GetSnapshot();
ASSERT_OK(db_->Put(write_opts, "foo2", ts_early, "bar2"));
ASSERT_OK(db_->Put(write_opts, "foo3", ts_early, "bar3"));
// Ensure file has sequence number greater than snapshot_with_foo
for (int i = 0; i < 10; i++) {
std::string numStr = std::to_string(i);
ASSERT_OK(db_->Put(write_opts, numStr, ts_later, numStr));
}
const Snapshot* snap_with_foo = db_->GetSnapshot();
ASSERT_OK(Flush());
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 0);
ReadOptions read_opts;
read_opts.timestamp = &ts_later_slice;
std::string read_value;
std::string read_ts;
Status s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
ASSERT_EQ(read_ts, ts_early);
Status s;
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
// Row cache is not storing the ts when record is inserted/updated.
// To be fixed after enabling ROW_CACHE with timestamp.
// ASSERT_EQ(read_ts, ts_early);
int expected_hit_count = 0;
int expected_miss_count = 0;
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), expected_miss_count);
{
read_opts.timestamp = nullptr;
s = db_->Get(read_opts, "foo", &read_value);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsInvalidArgument());
}
// Mix use of Get
{
read_opts.timestamp = &ts_later_slice;
// Use Get without ts first, expect cache entry to store the correct ts
s = db_->Get(read_opts, "foo2", &read_value);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS),
++expected_miss_count);
ASSERT_EQ(read_value, "bar2");
s = db_->Get(read_opts, "foo2", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), ++expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), expected_miss_count);
ASSERT_EQ(read_ts, ts_early);
ASSERT_EQ(read_value, "bar2");
// Use Get with ts first, expect the Get without ts can get correct record
s = db_->Get(read_opts, "foo3", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS),
++expected_miss_count);
ASSERT_EQ(read_ts, ts_early);
ASSERT_EQ(read_value, "bar3");
s = db_->Get(read_opts, "foo3", &read_value);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), ++expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), expected_miss_count);
ASSERT_EQ(read_value, "bar3");
}
{
// Test with consecutive calls of Get with ts.
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS),
++expected_miss_count);
ASSERT_EQ(read_ts, ts_early);
ASSERT_EQ(read_value, "bar");
// Test repeated get on cache entry
for (int i = 0; i < 3; i++) {
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT),
++expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS),
expected_miss_count);
ASSERT_EQ(read_ts, ts_early);
ASSERT_EQ(read_value, "bar");
}
}
{
std::string ts_nothing = Timestamp(0, 0);
@ -1682,41 +1735,43 @@ TEST_F(DBBasicTestWithTimestamp, GetWithRowCache) {
read_opts.timestamp = &ts_nothing_slice;
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
read_opts.timestamp = &ts_later_slice;
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 2);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS),
++expected_miss_count);
}
{
read_opts.snapshot = snap_with_foo;
read_opts.timestamp = &ts_later_slice;
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS),
++expected_miss_count);
ASSERT_EQ(read_ts, ts_early);
ASSERT_EQ(read_value, "bar");
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 2);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 3);
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 3);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 3);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), ++expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), expected_miss_count);
ASSERT_EQ(read_ts, ts_early);
ASSERT_EQ(read_value, "bar");
}
{
read_opts.snapshot = snap_with_nothing;
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 3);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 4);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS),
++expected_miss_count);
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 3);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 5);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), expected_hit_count);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS),
++expected_miss_count);
}
db_->ReleaseSnapshot(snap_with_nothing);
@ -1724,6 +1779,65 @@ TEST_F(DBBasicTestWithTimestamp, GetWithRowCache) {
Close();
}
TEST_F(DBBasicTestWithTimestamp, GetWithRowCacheMultiSST) {
BlockBasedTableOptions table_options;
table_options.block_size = 1;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
LRUCacheOptions cache_options;
cache_options.capacity = 8192;
options.row_cache = cache_options.MakeSharedRowCache();
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
options.disable_auto_compactions = true;
DestroyAndReopen(options);
std::string ts_early = Timestamp(1, 0);
std::string ts_later = Timestamp(10, 0);
Slice ts_later_slice = ts_later;
ASSERT_OK(db_->Put(WriteOptions(), "foo", ts_early, "v1"));
ASSERT_OK(Flush());
ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
ASSERT_OK(
db_->Merge(WriteOptions(), default_cf, "foo", Timestamp(2, 0), "v2"));
ASSERT_OK(
db_->Merge(WriteOptions(), default_cf, "foo", Timestamp(3, 0), "v3"));
ASSERT_OK(Flush());
ReadOptions read_opts;
read_opts.timestamp = &ts_later_slice;
std::string read_value;
std::string read_ts;
Status s;
{
// Since there are two SST files, will trigger the table lookup twice.
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
ASSERT_EQ(read_ts, Timestamp(3, 0));
ASSERT_EQ(read_value, "v1,v2,v3");
s = db_->Get(read_opts, "foo", &read_value, &read_ts);
ASSERT_OK(s);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 2);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
ASSERT_EQ(read_ts, Timestamp(3, 0));
ASSERT_EQ(read_value, "v1,v2,v3");
}
}
TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetPrefixFilter) {
Options options = CurrentOptions();
options.env = env_;

View File

@ -19,22 +19,6 @@
namespace ROCKSDB_NAMESPACE {
namespace {
void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
if (replay_log) {
if (replay_log->empty()) {
// Optimization: in the common case of only one operation in the
// log, we allocate the exact amount of space needed.
replay_log->reserve(1 + VarintLength(value.size()) + value.size());
}
replay_log->push_back(type);
PutLengthPrefixedSlice(replay_log, value);
}
}
} // namespace
GetContext::GetContext(
const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state, const Slice& user_key,
@ -88,6 +72,24 @@ GetContext::GetContext(const Comparator* ucmp,
seq, _pinned_iters_mgr, callback, is_blob_index,
tracing_get_id, blob_fetcher) {}
void GetContext::appendToReplayLog(ValueType type, Slice value, Slice ts) {
if (replay_log_) {
if (replay_log_->empty()) {
// Optimization: in the common case of only one operation in the
// log, we allocate the exact amount of space needed.
replay_log_->reserve(1 + VarintLength(value.size()) + value.size());
}
replay_log_->push_back(type);
PutLengthPrefixedSlice(replay_log_, value);
// If cf enables ts, there should always be a ts following each value
if (ucmp_->timestamp_size() > 0) {
assert(ts.size() == ucmp_->timestamp_size());
PutLengthPrefixedSlice(replay_log_, ts);
}
}
}
// Called from TableCache::Get and Table::Get when file/block in which
// key may exist are not there in TableCache/BlockCache respectively. In this
// case we can't guarantee that key does not exist and are not permitted to do
@ -102,7 +104,9 @@ void GetContext::MarkKeyMayExist() {
void GetContext::SaveValue(const Slice& value, SequenceNumber /*seq*/) {
assert(state_ == kNotFound);
appendToReplayLog(replay_log_, kTypeValue, value);
assert(ucmp_->timestamp_size() == 0);
appendToReplayLog(kTypeValue, value, Slice());
state_ = kFound;
if (LIKELY(pinnable_val_ != nullptr)) {
@ -228,7 +232,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
return true; // to continue to the next seq
}
appendToReplayLog(replay_log_, parsed_key.type, value);
if (seq_ != nullptr) {
// Set the sequence number if it is uninitialized
@ -241,32 +244,37 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
}
size_t ts_sz = ucmp_->timestamp_size();
if (ts_sz > 0 && timestamp_ != nullptr) {
if (!timestamp_->empty()) {
assert(ts_sz == timestamp_->size());
// `timestamp` can be set before `SaveValue` is ever called
// when max_covering_tombstone_seq_ was set.
// If this key has a higher sequence number than range tombstone,
// then timestamp should be updated. `ts_from_rangetombstone_` is
// set to false afterwards so that only the key with highest seqno
// updates the timestamp.
if (ts_from_rangetombstone_) {
assert(max_covering_tombstone_seq_);
if (parsed_key.sequence > *max_covering_tombstone_seq_) {
Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
timestamp_->assign(ts.data(), ts.size());
ts_from_rangetombstone_ = false;
Slice ts;
if (ts_sz > 0) {
// ensure always have ts if cf enables ts.
ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
if (timestamp_ != nullptr) {
if (!timestamp_->empty()) {
assert(ts_sz == timestamp_->size());
// `timestamp` can be set before `SaveValue` is ever called
// when max_covering_tombstone_seq_ was set.
// If this key has a higher sequence number than range tombstone,
// then timestamp should be updated. `ts_from_rangetombstone_` is
// set to false afterwards so that only the key with highest seqno
// updates the timestamp.
if (ts_from_rangetombstone_) {
assert(max_covering_tombstone_seq_);
if (parsed_key.sequence > *max_covering_tombstone_seq_) {
timestamp_->assign(ts.data(), ts.size());
ts_from_rangetombstone_ = false;
}
}
}
}
// TODO optimize for small size ts
const std::string kMaxTs(ts_sz, '\xff');
if (timestamp_->empty() ||
ucmp_->CompareTimestamp(*timestamp_, kMaxTs) == 0) {
Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
timestamp_->assign(ts.data(), ts.size());
// TODO optimize for small size ts
const std::string kMaxTs(ts_sz, '\xff');
if (timestamp_->empty() ||
ucmp_->CompareTimestamp(*timestamp_, kMaxTs) == 0) {
timestamp_->assign(ts.data(), ts.size());
}
}
}
appendToReplayLog(parsed_key.type, value, ts);
auto type = parsed_key.type;
// Key matches. Process it
@ -561,17 +569,35 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context, Cleanable* value_pinner,
SequenceNumber seq_no) {
Slice s = replay_log;
Slice ts;
size_t ts_sz = get_context->TimestampSize();
bool ret = false;
while (s.size()) {
auto type = static_cast<ValueType>(*s.data());
s.remove_prefix(1);
Slice value;
bool ret = GetLengthPrefixedSlice(&s, &value);
ret = GetLengthPrefixedSlice(&s, &value);
assert(ret);
(void)ret;
bool dont_care __attribute__((__unused__));
ParsedInternalKey ikey = ParsedInternalKey(user_key, seq_no, type);
// Use a copy to prevent modifying user_key. Modification of user_key
// could result to potential cache miss.
std::string user_key_str = user_key.ToString();
ParsedInternalKey ikey = ParsedInternalKey(user_key_str, seq_no, type);
// If ts enabled for current cf, there will always be ts appended after each
// piece of value.
if (ts_sz > 0) {
ret = GetLengthPrefixedSlice(&s, &ts);
assert(ts_sz == ts.size());
assert(ret);
ikey.SetTimestamp(ts);
}
(void)ret;
get_context->SaveValue(ikey, value, &dont_care, value_pinner);
}
}

View File

@ -149,6 +149,8 @@ class GetContext {
bool NeedTimestamp() { return timestamp_ != nullptr; }
inline size_t TimestampSize() { return ucmp_->timestamp_size(); }
void SetTimestampFromRangeTombstone(const Slice& timestamp) {
assert(timestamp_);
timestamp_->assign(timestamp.data(), timestamp.size());
@ -204,6 +206,8 @@ class GetContext {
bool GetBlobValue(const Slice& user_key, const Slice& blob_index,
PinnableSlice* blob_value);
void appendToReplayLog(ValueType type, Slice value, Slice ts);
const Comparator* ucmp_;
const MergeOperator* merge_operator_;
// the merge operations encountered;