diff --git a/db/db_impl/compacted_db_impl.cc b/db/db_impl/compacted_db_impl.cc index ccb366c533..482a2e1f6a 100644 --- a/db/db_impl/compacted_db_impl.cc +++ b/db/db_impl/compacted_db_impl.cc @@ -60,6 +60,13 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, return s; } } + + // Clear the timestamps for returning results so that we can distinguish + // between tombstone or key that has never been written + if (timestamp) { + timestamp->clear(); + } + GetWithTimestampReadCallback read_cb(kMaxSequenceNumber); std::string* ts = user_comparator_->timestamp_size() > 0 ? timestamp : nullptr; @@ -114,6 +121,14 @@ std::vector CompactedDBImpl::MultiGet( } } + // Clear the timestamps for returning results so that we can distinguish + // between tombstone or key that has never been written + if (timestamps) { + for (auto& ts : *timestamps) { + ts.clear(); + } + } + GetWithTimestampReadCallback read_cb(kMaxSequenceNumber); autovector reader_list; for (const auto& key : keys) { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index f82f20f33a..2faa22ada8 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1749,6 +1749,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, } } + // Clear the timestamps for returning results so that we can distinguish + // between tombstone or key that has never been written + if (get_impl_options.timestamp) { + get_impl_options.timestamp->clear(); + } + GetWithTimestampReadCallback read_cb(0); // Will call Refresh PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock); @@ -2566,6 +2572,16 @@ Status DBImpl::MultiGetImpl( PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock); StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET); + assert(sorted_keys); + // Clear the timestamps for returning results so that we can distinguish + // between tombstone or key that has never been written + for (auto* kctx : *sorted_keys) { + assert(kctx); + if (kctx->timestamp) { + kctx->timestamp->clear(); + } + } + // For each of the given keys, apply the entire "get" process as follows: // First look in the memtable, then in the immutable memtable (if any). // s is both in/out. When in, s could either be OK or MergeInProgress. @@ -4517,6 +4533,8 @@ Status DBImpl::GetLatestSequenceForKey( *timestamp != std::string(ts_sz, '\xff')) || (*seq == kMaxSequenceNumber && timestamp->empty())); + TEST_SYNC_POINT_CALLBACK("DBImpl::GetLatestSequenceForKey:mem", timestamp); + if (*seq != kMaxSequenceNumber) { // Found a sequence number, no need to check immutable memtables *found_record_for_key = true; @@ -4581,6 +4599,7 @@ Status DBImpl::GetLatestSequenceForKey( (*seq != kMaxSequenceNumber && *timestamp != std::string(ts_sz, '\xff')) || (*seq == kMaxSequenceNumber && timestamp->empty())); + if (*seq != kMaxSequenceNumber) { // Found a sequence number, no need to check SST files assert(0 == ts_sz || *timestamp != std::string(ts_sz, '\xff')); diff --git a/db/db_impl/db_impl_readonly.cc b/db/db_impl/db_impl_readonly.cc index b014f75ce6..5be89bce24 100644 --- a/db/db_impl/db_impl_readonly.cc +++ b/db/db_impl/db_impl_readonly.cc @@ -58,6 +58,13 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options, return s; } } + + // Clear the timestamps for returning results so that we can distinguish + // between tombstone or key that has never been written + if (timestamp) { + timestamp->clear(); + } + const Comparator* ucmp = column_family->GetComparator(); assert(ucmp); std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr; diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 1c56ccd82b..bec1fe2f75 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -361,6 +361,12 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options, } } + // Clear the timestamp for returning results so that we can distinguish + // between tombstone or key that has never been written later. + if (timestamp) { + timestamp->clear(); + } + auto cfh = static_cast(column_family); ColumnFamilyData* cfd = cfh->cfd(); if (tracer_) { diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 009dbbc108..473cf0ce7b 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -230,6 +230,10 @@ TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) { ASSERT_OK(s); ASSERT_EQ("v1", value); + std::string key_ts; + ASSERT_TRUE(db_->Get(ropts, "k3", &value, &key_ts).IsNotFound()); + ASSERT_EQ(Timestamp(2, 0), key_ts); + Close(); } @@ -543,23 +547,29 @@ TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) { options.comparator = &test_cmp; DestroyAndReopen(options); auto check_value_by_ts = [](DB* db, Slice key, std::string readTs, - Status status, std::string checkValue) { + Status status, std::string checkValue, + std::string expected_ts) { ReadOptions ropts; Slice ts = readTs; ropts.timestamp = &ts; std::string value; - Status s = db->Get(ropts, key, &value); + std::string key_ts; + Status s = db->Get(ropts, key, &value, &key_ts); ASSERT_TRUE(s == status); if (s.ok()) { ASSERT_EQ(checkValue, value); } + if (s.ok() || s.IsNotFound()) { + ASSERT_EQ(expected_ts, key_ts); + } }; // Construct data of different versions with different ts ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(2, 0), "v1")); ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(4, 0), "v2")); ASSERT_OK(db_->Delete(WriteOptions(), "k1", Timestamp(5, 0))); ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(6, 0), "v3")); - check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3"); + check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3", + Timestamp(6, 0)); ASSERT_OK(Flush()); Close(); @@ -572,13 +582,15 @@ TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) { // Trim data whose version > Timestamp(5, 0), read(k1, ts(7)) <- NOT_FOUND. ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families, &handles_, &db_, Timestamp(5, 0))); - check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), ""); + check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), "", + Timestamp(5, 0)); Close(); // Trim data whose timestamp > Timestamp(4, 0), read(k1, ts(7)) <- v2 ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families, &handles_, &db_, Timestamp(4, 0))); - check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2"); + check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2", + Timestamp(4, 0)); Close(); } @@ -1210,12 +1222,27 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetWithFastLocalBloom) { std::vector keys(batch_size); std::vector values(batch_size); std::vector statuses(batch_size); + std::vector timestamps(batch_size); keys[0] = "foo"; ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(), - statuses.data()); + timestamps.data(), statuses.data(), true); ASSERT_OK(statuses[0]); + ASSERT_EQ(Timestamp(1, 0), timestamps[0]); + for (auto& elem : values) { + elem.Reset(); + } + + ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0))); + ts = Timestamp(3, 0); + read_ts = ts; + read_opts.timestamp = &read_ts; + db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(), + timestamps.data(), statuses.data(), true); + ASSERT_TRUE(statuses[0].IsNotFound()); + ASSERT_EQ(Timestamp(2, 0), timestamps[0]); + Close(); } @@ -1251,12 +1278,31 @@ TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetWithPrefix) { std::vector keys(batch_size); std::vector values(batch_size); std::vector statuses(batch_size); + std::vector timestamps(batch_size); keys[0] = "foo"; ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(), - statuses.data()); + timestamps.data(), statuses.data(), true); ASSERT_OK(statuses[0]); + ASSERT_EQ(Timestamp(1, 0), timestamps[0]); + for (auto& elem : values) { + elem.Reset(); + } + + ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0))); + // TODO re-enable after fixing a bug of kHashSearch + if (GetParam() != BlockBasedTableOptions::IndexType::kHashSearch) { + ASSERT_OK(Flush()); + } + + ts = Timestamp(3, 0); + read_ts = ts; + db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(), + timestamps.data(), statuses.data(), true); + ASSERT_TRUE(statuses[0].IsNotFound()); + ASSERT_EQ(Timestamp(2, 0), timestamps[0]); + Close(); } @@ -1507,8 +1553,10 @@ TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) { ts = Timestamp(3, 0); read_ts = ts; read_opts.timestamp = &read_ts; - s = db_->Get(read_opts, "a", &value); + std::string key_ts; + s = db_->Get(read_opts, "a", &value, &key_ts); ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ(Timestamp(3, 0), key_ts); // Time-travel to the past before deletion ts = Timestamp(2, 0); @@ -1613,9 +1661,8 @@ TEST_P(DBBasicTestWithTimestampFilterPrefixSettings, GetAndMultiGet) { std::unique_ptr it1(db_->NewIterator(read_opts)); ASSERT_NE(nullptr, it1); ASSERT_OK(it1->status()); - // TODO(zjay) Fix seek with prefix - // it1->Seek(keys[i]); - // ASSERT_TRUE(it1->Valid()); + it1->Seek(keys[i]); + ASSERT_TRUE(it1->Valid()); } for (int i = 2; i < 4; i++) { @@ -2400,12 +2447,18 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) { ropts.timestamp = &ts; for (uint64_t i = 0; i != static_cast(kNumKeysPerFile); ++i) { std::string value; - Status s = db_->Get(ropts, Key1(i), &value); + std::string key_ts; + Status s = db_->Get(ropts, Key1(i), &value, &key_ts); if ((i % 3) == 2) { ASSERT_OK(s); ASSERT_EQ("new_value_2", value); + ASSERT_EQ(Timestamp(5, 0), key_ts); + } else if ((i % 3) == 1) { + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ(Timestamp(5, 0), key_ts); } else { ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ(Timestamp(3, 0), key_ts); } } } diff --git a/db/memtable.cc b/db/memtable.cc index 6a4d2e127c..4b609cae7e 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -813,6 +813,10 @@ static bool SaveValue(void* arg, const char* entry) { } } else { *(s->status) = Status::NotFound(); + if (ts_sz > 0 && s->timestamp != nullptr) { + Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); + s->timestamp->assign(ts.data(), ts.size()); + } } *(s->found_final_value) = true; return false; diff --git a/table/get_context.cc b/table/get_context.cc index c61f82b1aa..a8163f19d3 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -334,6 +334,11 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(state_ == kNotFound || state_ == kMerge); if (kNotFound == state_) { state_ = kDeleted; + size_t ts_sz = ucmp_->timestamp_size(); + if (ts_sz > 0 && timestamp_ != nullptr) { + Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz); + timestamp_->assign(ts.data(), ts.size()); + } } else if (kMerge == state_) { state_ = kFound; Merge(nullptr); diff --git a/table/multiget_context.h b/table/multiget_context.h index ca29816f57..188681480d 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -137,6 +137,9 @@ class MultiGetContext { sorted_keys_[iter]->lkey->user_key(), read_opts.timestamp == nullptr ? 0 : read_opts.timestamp->size()); sorted_keys_[iter]->ikey = sorted_keys_[iter]->lkey->internal_key(); + sorted_keys_[iter]->timestamp = (*sorted_keys)[begin + iter]->timestamp; + sorted_keys_[iter]->get_context = + (*sorted_keys)[begin + iter]->get_context; } } diff --git a/utilities/transactions/write_committed_transaction_ts_test.cc b/utilities/transactions/write_committed_transaction_ts_test.cc index 980289846c..0c1f659c33 100644 --- a/utilities/transactions/write_committed_transaction_ts_test.cc +++ b/utilities/transactions/write_committed_transaction_ts_test.cc @@ -498,6 +498,67 @@ TEST_P(WriteCommittedTxnWithTsTest, RefineReadTimestamp) { txn0.reset(); } +TEST_P(WriteCommittedTxnWithTsTest, CheckKeysForConflicts) { + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + ASSERT_OK(ReOpen()); + + std::unique_ptr txn1( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + assert(txn1); + + std::unique_ptr txn2( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + assert(txn2); + ASSERT_OK(txn2->Put("foo", "v0")); + ASSERT_OK(txn2->SetCommitTimestamp(10)); + ASSERT_OK(txn2->Commit()); + txn2.reset(); + + // txn1 takes a snapshot after txn2 commits. The writes of txn2 have + // a smaller seqno than txn1's snapshot, thus should not affect conflict + // checking. + txn1->SetSnapshot(); + + std::unique_ptr txn3( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + assert(txn3); + ASSERT_OK(txn3->SetReadTimestampForValidation(20)); + std::string dontcare; + ASSERT_OK(txn3->GetForUpdate(ReadOptions(), "foo", &dontcare)); + ASSERT_OK(txn3->SingleDelete("foo")); + ASSERT_OK(txn3->SetName("txn3")); + ASSERT_OK(txn3->Prepare()); + ASSERT_OK(txn3->SetCommitTimestamp(30)); + // txn3 reads at ts=20 > txn2's commit timestamp, and commits at ts=30. + // txn3 can commit successfully, leaving a tombstone with ts=30. + ASSERT_OK(txn3->Commit()); + txn3.reset(); + + bool called = false; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::GetLatestSequenceForKey:mem", [&](void* arg) { + auto* const ts_ptr = reinterpret_cast(arg); + assert(ts_ptr); + Slice ts_slc = *ts_ptr; + uint64_t last_ts = 0; + ASSERT_TRUE(GetFixed64(&ts_slc, &last_ts)); + ASSERT_EQ(30, last_ts); + called = true; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // txn1's read timestamp is 25 < 30 (commit timestamp of txn3). Therefore, + // the tombstone written by txn3 causes the conflict checking to fail. + ASSERT_OK(txn1->SetReadTimestampForValidation(25)); + ASSERT_TRUE(txn1->GetForUpdate(ReadOptions(), "foo", &dontcare).IsBusy()); + ASSERT_TRUE(called); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {