mirror of https://github.com/facebook/rocksdb.git
Point-lookup returns timestamps of Delete and SingleDelete (#10056)
Summary: If caller specifies a non-null `timestamp` argument in `DB::Get()` or a non-null `timestamps` in `DB::MultiGet()`, RocksDB will return the timestamps of the point tombstones. Note: DeleteRange is still unsupported. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10056 Test Plan: make check Reviewed By: ltamasi Differential Revision: D36677956 Pulled By: riversand963 fbshipit-source-id: 2d7af02cc7237b1829cd269086ea895a49d501ae
This commit is contained in:
parent
4bdcc80192
commit
3e02c6e05a
|
@ -60,6 +60,13 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
|
|||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the timestamps for returning results so that we can distinguish
|
||||
// between tombstone or key that has never been written
|
||||
if (timestamp) {
|
||||
timestamp->clear();
|
||||
}
|
||||
|
||||
GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
|
||||
std::string* ts =
|
||||
user_comparator_->timestamp_size() > 0 ? timestamp : nullptr;
|
||||
|
@ -114,6 +121,14 @@ std::vector<Status> CompactedDBImpl::MultiGet(
|
|||
}
|
||||
}
|
||||
|
||||
// Clear the timestamps for returning results so that we can distinguish
|
||||
// between tombstone or key that has never been written
|
||||
if (timestamps) {
|
||||
for (auto& ts : *timestamps) {
|
||||
ts.clear();
|
||||
}
|
||||
}
|
||||
|
||||
GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
|
||||
autovector<TableReader*, 16> reader_list;
|
||||
for (const auto& key : keys) {
|
||||
|
|
|
@ -1749,6 +1749,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
|
|||
}
|
||||
}
|
||||
|
||||
// Clear the timestamps for returning results so that we can distinguish
|
||||
// between tombstone or key that has never been written
|
||||
if (get_impl_options.timestamp) {
|
||||
get_impl_options.timestamp->clear();
|
||||
}
|
||||
|
||||
GetWithTimestampReadCallback read_cb(0); // Will call Refresh
|
||||
|
||||
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
|
||||
|
@ -2566,6 +2572,16 @@ Status DBImpl::MultiGetImpl(
|
|||
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
|
||||
StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
|
||||
|
||||
assert(sorted_keys);
|
||||
// Clear the timestamps for returning results so that we can distinguish
|
||||
// between tombstone or key that has never been written
|
||||
for (auto* kctx : *sorted_keys) {
|
||||
assert(kctx);
|
||||
if (kctx->timestamp) {
|
||||
kctx->timestamp->clear();
|
||||
}
|
||||
}
|
||||
|
||||
// For each of the given keys, apply the entire "get" process as follows:
|
||||
// First look in the memtable, then in the immutable memtable (if any).
|
||||
// s is both in/out. When in, s could either be OK or MergeInProgress.
|
||||
|
@ -4517,6 +4533,8 @@ Status DBImpl::GetLatestSequenceForKey(
|
|||
*timestamp != std::string(ts_sz, '\xff')) ||
|
||||
(*seq == kMaxSequenceNumber && timestamp->empty()));
|
||||
|
||||
TEST_SYNC_POINT_CALLBACK("DBImpl::GetLatestSequenceForKey:mem", timestamp);
|
||||
|
||||
if (*seq != kMaxSequenceNumber) {
|
||||
// Found a sequence number, no need to check immutable memtables
|
||||
*found_record_for_key = true;
|
||||
|
@ -4581,6 +4599,7 @@ Status DBImpl::GetLatestSequenceForKey(
|
|||
(*seq != kMaxSequenceNumber &&
|
||||
*timestamp != std::string(ts_sz, '\xff')) ||
|
||||
(*seq == kMaxSequenceNumber && timestamp->empty()));
|
||||
|
||||
if (*seq != kMaxSequenceNumber) {
|
||||
// Found a sequence number, no need to check SST files
|
||||
assert(0 == ts_sz || *timestamp != std::string(ts_sz, '\xff'));
|
||||
|
|
|
@ -58,6 +58,13 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
|
|||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the timestamps for returning results so that we can distinguish
|
||||
// between tombstone or key that has never been written
|
||||
if (timestamp) {
|
||||
timestamp->clear();
|
||||
}
|
||||
|
||||
const Comparator* ucmp = column_family->GetComparator();
|
||||
assert(ucmp);
|
||||
std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr;
|
||||
|
|
|
@ -361,6 +361,12 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
|
|||
}
|
||||
}
|
||||
|
||||
// Clear the timestamp for returning results so that we can distinguish
|
||||
// between tombstone or key that has never been written later.
|
||||
if (timestamp) {
|
||||
timestamp->clear();
|
||||
}
|
||||
|
||||
auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
if (tracer_) {
|
||||
|
|
|
@ -230,6 +230,10 @@ TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) {
|
|||
ASSERT_OK(s);
|
||||
ASSERT_EQ("v1", value);
|
||||
|
||||
std::string key_ts;
|
||||
ASSERT_TRUE(db_->Get(ropts, "k3", &value, &key_ts).IsNotFound());
|
||||
ASSERT_EQ(Timestamp(2, 0), key_ts);
|
||||
|
||||
Close();
|
||||
}
|
||||
|
||||
|
@ -543,23 +547,29 @@ TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) {
|
|||
options.comparator = &test_cmp;
|
||||
DestroyAndReopen(options);
|
||||
auto check_value_by_ts = [](DB* db, Slice key, std::string readTs,
|
||||
Status status, std::string checkValue) {
|
||||
Status status, std::string checkValue,
|
||||
std::string expected_ts) {
|
||||
ReadOptions ropts;
|
||||
Slice ts = readTs;
|
||||
ropts.timestamp = &ts;
|
||||
std::string value;
|
||||
Status s = db->Get(ropts, key, &value);
|
||||
std::string key_ts;
|
||||
Status s = db->Get(ropts, key, &value, &key_ts);
|
||||
ASSERT_TRUE(s == status);
|
||||
if (s.ok()) {
|
||||
ASSERT_EQ(checkValue, value);
|
||||
}
|
||||
if (s.ok() || s.IsNotFound()) {
|
||||
ASSERT_EQ(expected_ts, key_ts);
|
||||
}
|
||||
};
|
||||
// Construct data of different versions with different ts
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(2, 0), "v1"));
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(4, 0), "v2"));
|
||||
ASSERT_OK(db_->Delete(WriteOptions(), "k1", Timestamp(5, 0)));
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(6, 0), "v3"));
|
||||
check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3");
|
||||
check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3",
|
||||
Timestamp(6, 0));
|
||||
ASSERT_OK(Flush());
|
||||
Close();
|
||||
|
||||
|
@ -572,13 +582,15 @@ TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) {
|
|||
// Trim data whose version > Timestamp(5, 0), read(k1, ts(7)) <- NOT_FOUND.
|
||||
ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
|
||||
&handles_, &db_, Timestamp(5, 0)));
|
||||
check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), "");
|
||||
check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), "",
|
||||
Timestamp(5, 0));
|
||||
Close();
|
||||
|
||||
// Trim data whose timestamp > Timestamp(4, 0), read(k1, ts(7)) <- v2
|
||||
ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
|
||||
&handles_, &db_, Timestamp(4, 0)));
|
||||
check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2");
|
||||
check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2",
|
||||
Timestamp(4, 0));
|
||||
Close();
|
||||
}
|
||||
|
||||
|
@ -1210,12 +1222,27 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetWithFastLocalBloom) {
|
|||
std::vector<Slice> keys(batch_size);
|
||||
std::vector<PinnableSlice> values(batch_size);
|
||||
std::vector<Status> statuses(batch_size);
|
||||
std::vector<std::string> timestamps(batch_size);
|
||||
keys[0] = "foo";
|
||||
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
|
||||
db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
|
||||
statuses.data());
|
||||
timestamps.data(), statuses.data(), true);
|
||||
|
||||
ASSERT_OK(statuses[0]);
|
||||
ASSERT_EQ(Timestamp(1, 0), timestamps[0]);
|
||||
for (auto& elem : values) {
|
||||
elem.Reset();
|
||||
}
|
||||
|
||||
ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0)));
|
||||
ts = Timestamp(3, 0);
|
||||
read_ts = ts;
|
||||
read_opts.timestamp = &read_ts;
|
||||
db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
|
||||
timestamps.data(), statuses.data(), true);
|
||||
ASSERT_TRUE(statuses[0].IsNotFound());
|
||||
ASSERT_EQ(Timestamp(2, 0), timestamps[0]);
|
||||
|
||||
Close();
|
||||
}
|
||||
|
||||
|
@ -1251,12 +1278,31 @@ TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetWithPrefix) {
|
|||
std::vector<Slice> keys(batch_size);
|
||||
std::vector<PinnableSlice> values(batch_size);
|
||||
std::vector<Status> statuses(batch_size);
|
||||
std::vector<std::string> timestamps(batch_size);
|
||||
keys[0] = "foo";
|
||||
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
|
||||
db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
|
||||
statuses.data());
|
||||
timestamps.data(), statuses.data(), true);
|
||||
|
||||
ASSERT_OK(statuses[0]);
|
||||
ASSERT_EQ(Timestamp(1, 0), timestamps[0]);
|
||||
for (auto& elem : values) {
|
||||
elem.Reset();
|
||||
}
|
||||
|
||||
ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0)));
|
||||
// TODO re-enable after fixing a bug of kHashSearch
|
||||
if (GetParam() != BlockBasedTableOptions::IndexType::kHashSearch) {
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
|
||||
ts = Timestamp(3, 0);
|
||||
read_ts = ts;
|
||||
db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
|
||||
timestamps.data(), statuses.data(), true);
|
||||
ASSERT_TRUE(statuses[0].IsNotFound());
|
||||
ASSERT_EQ(Timestamp(2, 0), timestamps[0]);
|
||||
|
||||
Close();
|
||||
}
|
||||
|
||||
|
@ -1507,8 +1553,10 @@ TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) {
|
|||
ts = Timestamp(3, 0);
|
||||
read_ts = ts;
|
||||
read_opts.timestamp = &read_ts;
|
||||
s = db_->Get(read_opts, "a", &value);
|
||||
std::string key_ts;
|
||||
s = db_->Get(read_opts, "a", &value, &key_ts);
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
ASSERT_EQ(Timestamp(3, 0), key_ts);
|
||||
|
||||
// Time-travel to the past before deletion
|
||||
ts = Timestamp(2, 0);
|
||||
|
@ -1613,9 +1661,8 @@ TEST_P(DBBasicTestWithTimestampFilterPrefixSettings, GetAndMultiGet) {
|
|||
std::unique_ptr<Iterator> it1(db_->NewIterator(read_opts));
|
||||
ASSERT_NE(nullptr, it1);
|
||||
ASSERT_OK(it1->status());
|
||||
// TODO(zjay) Fix seek with prefix
|
||||
// it1->Seek(keys[i]);
|
||||
// ASSERT_TRUE(it1->Valid());
|
||||
it1->Seek(keys[i]);
|
||||
ASSERT_TRUE(it1->Valid());
|
||||
}
|
||||
|
||||
for (int i = 2; i < 4; i++) {
|
||||
|
@ -2400,12 +2447,18 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) {
|
|||
ropts.timestamp = &ts;
|
||||
for (uint64_t i = 0; i != static_cast<uint64_t>(kNumKeysPerFile); ++i) {
|
||||
std::string value;
|
||||
Status s = db_->Get(ropts, Key1(i), &value);
|
||||
std::string key_ts;
|
||||
Status s = db_->Get(ropts, Key1(i), &value, &key_ts);
|
||||
if ((i % 3) == 2) {
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("new_value_2", value);
|
||||
ASSERT_EQ(Timestamp(5, 0), key_ts);
|
||||
} else if ((i % 3) == 1) {
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
ASSERT_EQ(Timestamp(5, 0), key_ts);
|
||||
} else {
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
ASSERT_EQ(Timestamp(3, 0), key_ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -813,6 +813,10 @@ static bool SaveValue(void* arg, const char* entry) {
|
|||
}
|
||||
} else {
|
||||
*(s->status) = Status::NotFound();
|
||||
if (ts_sz > 0 && s->timestamp != nullptr) {
|
||||
Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
|
||||
s->timestamp->assign(ts.data(), ts.size());
|
||||
}
|
||||
}
|
||||
*(s->found_final_value) = true;
|
||||
return false;
|
||||
|
|
|
@ -334,6 +334,11 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
|
|||
assert(state_ == kNotFound || state_ == kMerge);
|
||||
if (kNotFound == state_) {
|
||||
state_ = kDeleted;
|
||||
size_t ts_sz = ucmp_->timestamp_size();
|
||||
if (ts_sz > 0 && timestamp_ != nullptr) {
|
||||
Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
|
||||
timestamp_->assign(ts.data(), ts.size());
|
||||
}
|
||||
} else if (kMerge == state_) {
|
||||
state_ = kFound;
|
||||
Merge(nullptr);
|
||||
|
|
|
@ -137,6 +137,9 @@ class MultiGetContext {
|
|||
sorted_keys_[iter]->lkey->user_key(),
|
||||
read_opts.timestamp == nullptr ? 0 : read_opts.timestamp->size());
|
||||
sorted_keys_[iter]->ikey = sorted_keys_[iter]->lkey->internal_key();
|
||||
sorted_keys_[iter]->timestamp = (*sorted_keys)[begin + iter]->timestamp;
|
||||
sorted_keys_[iter]->get_context =
|
||||
(*sorted_keys)[begin + iter]->get_context;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -498,6 +498,67 @@ TEST_P(WriteCommittedTxnWithTsTest, RefineReadTimestamp) {
|
|||
txn0.reset();
|
||||
}
|
||||
|
||||
TEST_P(WriteCommittedTxnWithTsTest, CheckKeysForConflicts) {
|
||||
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
ASSERT_OK(ReOpen());
|
||||
|
||||
std::unique_ptr<Transaction> txn1(
|
||||
db->BeginTransaction(WriteOptions(), TransactionOptions()));
|
||||
assert(txn1);
|
||||
|
||||
std::unique_ptr<Transaction> txn2(
|
||||
db->BeginTransaction(WriteOptions(), TransactionOptions()));
|
||||
assert(txn2);
|
||||
ASSERT_OK(txn2->Put("foo", "v0"));
|
||||
ASSERT_OK(txn2->SetCommitTimestamp(10));
|
||||
ASSERT_OK(txn2->Commit());
|
||||
txn2.reset();
|
||||
|
||||
// txn1 takes a snapshot after txn2 commits. The writes of txn2 have
|
||||
// a smaller seqno than txn1's snapshot, thus should not affect conflict
|
||||
// checking.
|
||||
txn1->SetSnapshot();
|
||||
|
||||
std::unique_ptr<Transaction> txn3(
|
||||
db->BeginTransaction(WriteOptions(), TransactionOptions()));
|
||||
assert(txn3);
|
||||
ASSERT_OK(txn3->SetReadTimestampForValidation(20));
|
||||
std::string dontcare;
|
||||
ASSERT_OK(txn3->GetForUpdate(ReadOptions(), "foo", &dontcare));
|
||||
ASSERT_OK(txn3->SingleDelete("foo"));
|
||||
ASSERT_OK(txn3->SetName("txn3"));
|
||||
ASSERT_OK(txn3->Prepare());
|
||||
ASSERT_OK(txn3->SetCommitTimestamp(30));
|
||||
// txn3 reads at ts=20 > txn2's commit timestamp, and commits at ts=30.
|
||||
// txn3 can commit successfully, leaving a tombstone with ts=30.
|
||||
ASSERT_OK(txn3->Commit());
|
||||
txn3.reset();
|
||||
|
||||
bool called = false;
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::GetLatestSequenceForKey:mem", [&](void* arg) {
|
||||
auto* const ts_ptr = reinterpret_cast<std::string*>(arg);
|
||||
assert(ts_ptr);
|
||||
Slice ts_slc = *ts_ptr;
|
||||
uint64_t last_ts = 0;
|
||||
ASSERT_TRUE(GetFixed64(&ts_slc, &last_ts));
|
||||
ASSERT_EQ(30, last_ts);
|
||||
called = true;
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// txn1's read timestamp is 25 < 30 (commit timestamp of txn3). Therefore,
|
||||
// the tombstone written by txn3 causes the conflict checking to fail.
|
||||
ASSERT_OK(txn1->SetReadTimestampForValidation(25));
|
||||
ASSERT_TRUE(txn1->GetForUpdate(ReadOptions(), "foo", &dontcare).IsBusy());
|
||||
ASSERT_TRUE(called);
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
|
Loading…
Reference in New Issue