mirror of https://github.com/facebook/rocksdb.git
Track full_history_ts_low per SuperVersion (#11784)
Summary: As discussed in https://github.com/facebook/rocksdb/issues/11730 , this PR tracks the effective `full_history_ts_low` per SuperVersion and update existing sanity checks for `ReadOptions.timestamp >= full_history_ts_low` to use this per SuperVersion `full_history_ts_low` instead. This also means the check is moved to happen after acquiring SuperVersion. There are two motivations for this: 1) Each time `full_history_ts_low` really come into effect to collapse history, a new SuperVersion is always installed, because it would involve either a Flush or Compaction, both of which change the LSM tree shape. We can take advantage of this to ensure that as long as this sanity check is passed, even if `full_history_ts_low` can be concurrently increased and collapse some history above the requested `ReadOptions.timestamp`, a read request won’t have visibility to that part of history through this SuperVersion that it already acquired. 2) the existing sanity check uses `ColumnFamilyData::GetFullHistoryTsLow` without locking the db mutex, which is the mutex all `IncreaseFullHistoryTsLow` operation is using when mutating this field. So there is a race condition. This also solve the race condition on the read path. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11784 Test Plan: `make all check` // Checks success scenario really provide the read consistency attribute as mentioned above. `./db_with_timestamp_basic_test --gtest_filter=*FullHistoryTsLowSanityCheckPassReadIsConsistent*` // Checks failure scenario cleans up SuperVersion properly. `./db_with_timestamp_basic_test --gtest_filter=*FullHistoryTsLowSanityCheckFail*` `./db_secondary_test --gtest_filter=*FullHistoryTsLowSanityCheckFail*` `./db_readonly_with_timestamp_test --gtest_filter=*FullHistoryTsLowSanitchCheckFail*` Reviewed By: ltamasi Differential Revision: D48894795 Pulled By: jowlyzhang fbshipit-source-id: 1f801fe8e1bc8e63ca76c03cbdbd0974e5ff5bf6
This commit is contained in:
parent
3285ba7a29
commit
39a4ff2cab
|
@ -96,9 +96,13 @@ class DBBlobIndexTest : public DBTestBase {
|
|||
}
|
||||
|
||||
ArenaWrappedDBIter* GetBlobIterator() {
|
||||
return dbfull()->NewIteratorImpl(
|
||||
ReadOptions(), cfd(), dbfull()->GetLatestSequenceNumber(),
|
||||
nullptr /*read_callback*/, true /*expose_blob_index*/);
|
||||
ColumnFamilyData* column_family = cfd();
|
||||
DBImpl* db_impl = dbfull();
|
||||
return db_impl->NewIteratorImpl(
|
||||
ReadOptions(), column_family,
|
||||
column_family->GetReferencedSuperVersion(db_impl),
|
||||
db_impl->GetLatestSequenceNumber(), nullptr /*read_callback*/,
|
||||
true /*expose_blob_index*/);
|
||||
}
|
||||
|
||||
Options GetTestOptions() {
|
||||
|
|
|
@ -476,6 +476,7 @@ void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
|
|||
mem = new_mem;
|
||||
imm = new_imm;
|
||||
current = new_current;
|
||||
full_history_ts_low = cfd->GetFullHistoryTsLow();
|
||||
cfd->Ref();
|
||||
mem->Ref();
|
||||
imm->Ref();
|
||||
|
|
|
@ -211,6 +211,12 @@ struct SuperVersion {
|
|||
// Version number of the current SuperVersion
|
||||
uint64_t version_number;
|
||||
WriteStallCondition write_stall_condition;
|
||||
// Each time `full_history_ts_low` collapses history, a new SuperVersion is
|
||||
// installed. This field tracks the effective `full_history_ts_low` for that
|
||||
// SuperVersion, to be used by read APIs for sanity checks. This field is
|
||||
// immutable once SuperVersion is installed. For column family that doesn't
|
||||
// enable UDT feature, this is an empty string.
|
||||
std::string full_history_ts_low;
|
||||
|
||||
// should be called outside the mutex
|
||||
SuperVersion() = default;
|
||||
|
|
|
@ -59,12 +59,18 @@ Status CompactedDBImpl::Get(const ReadOptions& _read_options,
|
|||
|
||||
assert(user_comparator_);
|
||||
if (read_options.timestamp) {
|
||||
const Status s =
|
||||
FailIfTsMismatchCf(DefaultColumnFamily(), *(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
Status s =
|
||||
FailIfTsMismatchCf(DefaultColumnFamily(), *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
if (read_options.timestamp->size() > 0) {
|
||||
s = FailIfReadCollapsedHistory(cfd_, cfd_->GetSuperVersion(),
|
||||
*(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const Status s = FailIfCfHasTs(DefaultColumnFamily());
|
||||
if (!s.ok()) {
|
||||
|
@ -133,11 +139,17 @@ std::vector<Status> CompactedDBImpl::MultiGet(
|
|||
|
||||
if (read_options.timestamp) {
|
||||
Status s =
|
||||
FailIfTsMismatchCf(DefaultColumnFamily(), *(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
FailIfTsMismatchCf(DefaultColumnFamily(), *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return std::vector<Status>(num_keys, s);
|
||||
}
|
||||
if (read_options.timestamp->size() > 0) {
|
||||
s = FailIfReadCollapsedHistory(cfd_, cfd_->GetSuperVersion(),
|
||||
*(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return std::vector<Status>(num_keys, s);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Status s = FailIfCfHasTs(DefaultColumnFamily());
|
||||
if (!s.ok()) {
|
||||
|
|
|
@ -2012,8 +2012,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
|
|||
|
||||
if (read_options.timestamp) {
|
||||
const Status s = FailIfTsMismatchCf(get_impl_options.column_family,
|
||||
*(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
*(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -2060,7 +2059,16 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
|
|||
|
||||
// Acquire SuperVersion
|
||||
SuperVersion* sv = GetAndRefSuperVersion(cfd);
|
||||
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||
const Status s =
|
||||
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
ReturnAndCleanupSuperVersion(cfd, sv);
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT_CALLBACK("DBImpl::GetImpl:AfterAcquireSv", nullptr);
|
||||
TEST_SYNC_POINT("DBImpl::GetImpl:1");
|
||||
TEST_SYNC_POINT("DBImpl::GetImpl:2");
|
||||
|
||||
|
@ -2336,8 +2344,7 @@ std::vector<Status> DBImpl::MultiGet(
|
|||
assert(column_family[i]);
|
||||
if (read_options.timestamp) {
|
||||
stat_list[i] =
|
||||
FailIfTsMismatchCf(column_family[i], *(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
FailIfTsMismatchCf(column_family[i], *(read_options.timestamp));
|
||||
if (!stat_list[i].ok()) {
|
||||
should_fail = true;
|
||||
}
|
||||
|
@ -2369,8 +2376,6 @@ std::vector<Status> DBImpl::MultiGet(
|
|||
}
|
||||
}
|
||||
|
||||
SequenceNumber consistent_seqnum;
|
||||
|
||||
UnorderedMap<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
|
||||
column_family.size());
|
||||
for (auto cf : column_family) {
|
||||
|
@ -2388,10 +2393,21 @@ std::vector<Status> DBImpl::MultiGet(
|
|||
[](UnorderedMap<uint32_t, MultiGetColumnFamilyData>::iterator&
|
||||
cf_iter) { return &cf_iter->second; };
|
||||
|
||||
bool unref_only =
|
||||
SequenceNumber consistent_seqnum;
|
||||
bool unref_only;
|
||||
Status status =
|
||||
MultiCFSnapshot<UnorderedMap<uint32_t, MultiGetColumnFamilyData>>(
|
||||
read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
|
||||
&consistent_seqnum);
|
||||
&consistent_seqnum, &unref_only);
|
||||
|
||||
if (!status.ok()) {
|
||||
for (auto& s : stat_list) {
|
||||
if (s.ok()) {
|
||||
s = status;
|
||||
}
|
||||
}
|
||||
return stat_list;
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum1");
|
||||
TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum2");
|
||||
|
@ -2522,21 +2538,49 @@ std::vector<Status> DBImpl::MultiGet(
|
|||
}
|
||||
|
||||
template <class T>
|
||||
bool DBImpl::MultiCFSnapshot(
|
||||
Status DBImpl::MultiCFSnapshot(
|
||||
const ReadOptions& read_options, ReadCallback* callback,
|
||||
std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
|
||||
iter_deref_func,
|
||||
T* cf_list, SequenceNumber* snapshot) {
|
||||
T* cf_list, SequenceNumber* snapshot, bool* unref_only) {
|
||||
PERF_TIMER_GUARD(get_snapshot_time);
|
||||
|
||||
assert(unref_only);
|
||||
*unref_only = false;
|
||||
Status s = Status::OK();
|
||||
const bool check_read_ts =
|
||||
read_options.timestamp && read_options.timestamp->size() > 0;
|
||||
// unref_only set to true means the SuperVersion to be cleaned up is acquired
|
||||
// directly via ColumnFamilyData instead of thread local.
|
||||
const auto sv_cleanup_func = [&]() -> void {
|
||||
for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
|
||||
++cf_iter) {
|
||||
auto node = iter_deref_func(cf_iter);
|
||||
SuperVersion* super_version = node->super_version;
|
||||
ColumnFamilyData* cfd = node->cfd;
|
||||
if (super_version != nullptr) {
|
||||
if (*unref_only) {
|
||||
super_version->Unref();
|
||||
} else {
|
||||
ReturnAndCleanupSuperVersion(cfd, super_version);
|
||||
}
|
||||
}
|
||||
node->super_version = nullptr;
|
||||
}
|
||||
};
|
||||
|
||||
bool last_try = false;
|
||||
if (cf_list->size() == 1) {
|
||||
// Fast path for a single column family. We can simply get the thread loca
|
||||
// Fast path for a single column family. We can simply get the thread local
|
||||
// super version
|
||||
auto cf_iter = cf_list->begin();
|
||||
auto node = iter_deref_func(cf_iter);
|
||||
node->super_version = GetAndRefSuperVersion(node->cfd);
|
||||
if (read_options.snapshot != nullptr) {
|
||||
if (check_read_ts) {
|
||||
s = FailIfReadCollapsedHistory(node->cfd, node->super_version,
|
||||
*(read_options.timestamp));
|
||||
}
|
||||
if (s.ok() && read_options.snapshot != nullptr) {
|
||||
// Note: In WritePrepared txns this is not necessary but not harmful
|
||||
// either. Because prep_seq > snapshot => commit_seq > snapshot so if
|
||||
// a snapshot is specified we should be fine with skipping seq numbers
|
||||
|
@ -2550,7 +2594,7 @@ bool DBImpl::MultiCFSnapshot(
|
|||
if (callback) {
|
||||
*snapshot = std::max(*snapshot, callback->max_visible_seq());
|
||||
}
|
||||
} else {
|
||||
} else if (s.ok()) {
|
||||
// Since we get and reference the super version before getting
|
||||
// the snapshot number, without a mutex protection, it is possible
|
||||
// that a memtable switch happened in the middle and not all the
|
||||
|
@ -2564,26 +2608,17 @@ bool DBImpl::MultiCFSnapshot(
|
|||
*snapshot = GetLastPublishedSequence();
|
||||
}
|
||||
} else {
|
||||
// If we end up with the same issue of memtable geting sealed during 2
|
||||
// If we end up with the same issue of memtable getting sealed during 2
|
||||
// consecutive retries, it means the write rate is very high. In that case
|
||||
// its probably ok to take the mutex on the 3rd try so we can succeed for
|
||||
// sure
|
||||
// it's probably ok to take the mutex on the 3rd try so we can succeed for
|
||||
// sure.
|
||||
constexpr int num_retries = 3;
|
||||
for (int i = 0; i < num_retries; ++i) {
|
||||
last_try = (i == num_retries - 1);
|
||||
bool retry = false;
|
||||
|
||||
if (i > 0) {
|
||||
for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
|
||||
++cf_iter) {
|
||||
auto node = iter_deref_func(cf_iter);
|
||||
SuperVersion* super_version = node->super_version;
|
||||
ColumnFamilyData* cfd = node->cfd;
|
||||
if (super_version != nullptr) {
|
||||
ReturnAndCleanupSuperVersion(cfd, super_version);
|
||||
}
|
||||
node->super_version = nullptr;
|
||||
}
|
||||
sv_cleanup_func();
|
||||
}
|
||||
if (read_options.snapshot == nullptr) {
|
||||
if (last_try) {
|
||||
|
@ -2607,6 +2642,19 @@ bool DBImpl::MultiCFSnapshot(
|
|||
node->super_version = node->cfd->GetSuperVersion()->Ref();
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
|
||||
if (check_read_ts) {
|
||||
s = FailIfReadCollapsedHistory(node->cfd, node->super_version,
|
||||
*(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
// If read timestamp check failed, a.k.a ReadOptions.timestamp <
|
||||
// super_version.full_history_ts_low. There is no need to continue
|
||||
// because this check will keep failing for the same and newer
|
||||
// SuperVersions, instead we fail fast and ask user to provide
|
||||
// a higher read timestamp.
|
||||
retry = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (read_options.snapshot != nullptr || last_try) {
|
||||
// If user passed a snapshot, then we don't care if a memtable is
|
||||
// sealed or compaction happens because the snapshot would ensure
|
||||
|
@ -2638,8 +2686,11 @@ bool DBImpl::MultiCFSnapshot(
|
|||
|
||||
// Keep track of bytes that we read for statistics-recording later
|
||||
PERF_TIMER_STOP(get_snapshot_time);
|
||||
|
||||
return last_try;
|
||||
*unref_only = last_try;
|
||||
if (!s.ok()) {
|
||||
sv_cleanup_func();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
|
||||
|
@ -2689,8 +2740,7 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
|
|||
ColumnFamilyHandle* cfh = column_families[i];
|
||||
assert(cfh);
|
||||
if (read_options.timestamp) {
|
||||
statuses[i] = FailIfTsMismatchCf(cfh, *(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
statuses[i] = FailIfTsMismatchCf(cfh, *(read_options.timestamp));
|
||||
if (!statuses[i].ok()) {
|
||||
should_fail = true;
|
||||
}
|
||||
|
@ -2773,10 +2823,20 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
|
|||
};
|
||||
|
||||
SequenceNumber consistent_seqnum;
|
||||
bool unref_only = MultiCFSnapshot<
|
||||
bool unref_only;
|
||||
Status s = MultiCFSnapshot<
|
||||
autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
|
||||
read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
|
||||
&consistent_seqnum);
|
||||
&consistent_seqnum, &unref_only);
|
||||
|
||||
if (!s.ok()) {
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
if (statuses[i].ok()) {
|
||||
statuses[i] = s;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
GetWithTimestampReadCallback timestamp_read_callback(0);
|
||||
ReadCallback* read_callback = nullptr;
|
||||
|
@ -2785,7 +2845,6 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
|
|||
read_callback = ×tamp_read_callback;
|
||||
}
|
||||
|
||||
Status s;
|
||||
auto cf_iter = multiget_cf_data.begin();
|
||||
for (; cf_iter != multiget_cf_data.end(); ++cf_iter) {
|
||||
s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys,
|
||||
|
@ -2961,9 +3020,13 @@ void DBImpl::MultiGetWithCallback(
|
|||
|
||||
size_t num_keys = sorted_keys->size();
|
||||
SequenceNumber consistent_seqnum;
|
||||
bool unref_only = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
|
||||
bool unref_only;
|
||||
Status s = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
|
||||
read_options, callback, iter_deref_lambda, &multiget_cf_data,
|
||||
&consistent_seqnum);
|
||||
&consistent_seqnum, &unref_only);
|
||||
if (!s.ok()) {
|
||||
return;
|
||||
}
|
||||
#ifndef NDEBUG
|
||||
assert(!unref_only);
|
||||
#else
|
||||
|
@ -2998,9 +3061,9 @@ void DBImpl::MultiGetWithCallback(
|
|||
read_callback = ×tamp_read_callback;
|
||||
}
|
||||
|
||||
Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
|
||||
multiget_cf_data[0].super_version, consistent_seqnum,
|
||||
read_callback);
|
||||
s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
|
||||
multiget_cf_data[0].super_version, consistent_seqnum,
|
||||
read_callback);
|
||||
assert(s.ok() || s.IsTimedOut() || s.IsAborted());
|
||||
ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
|
||||
multiget_cf_data[0].super_version);
|
||||
|
@ -3470,8 +3533,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
|
|||
|
||||
if (read_options.timestamp) {
|
||||
const Status s =
|
||||
FailIfTsMismatchCf(column_family, *(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
FailIfTsMismatchCf(column_family, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return NewErrorIterator(s);
|
||||
}
|
||||
|
@ -3486,8 +3548,16 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
|
|||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
assert(cfd != nullptr);
|
||||
ReadCallback* read_callback = nullptr; // No read callback provided.
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||
const Status s =
|
||||
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
CleanupSuperVersion(sv);
|
||||
return NewErrorIterator(s);
|
||||
}
|
||||
}
|
||||
if (read_options.tailing) {
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||
auto iter = new ForwardIterator(this, read_options, cfd, sv,
|
||||
/* allow_unprepared_value */ true);
|
||||
result = NewDBIterator(
|
||||
|
@ -3499,7 +3569,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
|
|||
// Note: no need to consider the special case of
|
||||
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
|
||||
// WritePreparedTxnDB
|
||||
result = NewIteratorImpl(read_options, cfd,
|
||||
result = NewIteratorImpl(read_options, cfd, sv,
|
||||
(read_options.snapshot != nullptr)
|
||||
? read_options.snapshot->GetSequenceNumber()
|
||||
: kMaxSequenceNumber,
|
||||
|
@ -3508,14 +3578,10 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
|
|||
return result;
|
||||
}
|
||||
|
||||
ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
|
||||
ColumnFamilyData* cfd,
|
||||
SequenceNumber snapshot,
|
||||
ReadCallback* read_callback,
|
||||
bool expose_blob_index,
|
||||
bool allow_refresh) {
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||
|
||||
ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
|
||||
const ReadOptions& read_options, ColumnFamilyData* cfd, SuperVersion* sv,
|
||||
SequenceNumber snapshot, ReadCallback* read_callback,
|
||||
bool expose_blob_index, bool allow_refresh) {
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:1");
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:2");
|
||||
|
||||
|
@ -3615,8 +3681,7 @@ Status DBImpl::NewIterators(
|
|||
if (read_options.timestamp) {
|
||||
for (auto* cf : column_families) {
|
||||
assert(cf);
|
||||
const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -3634,10 +3699,27 @@ Status DBImpl::NewIterators(
|
|||
ReadCallback* read_callback = nullptr; // No read callback provided.
|
||||
iterators->clear();
|
||||
iterators->reserve(column_families.size());
|
||||
autovector<std::tuple<ColumnFamilyData*, SuperVersion*>> cfd_to_sv;
|
||||
const bool check_read_ts =
|
||||
read_options.timestamp && read_options.timestamp->size() > 0;
|
||||
for (auto cfh : column_families) {
|
||||
auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||
cfd_to_sv.emplace_back(cfd, sv);
|
||||
if (check_read_ts) {
|
||||
const Status s =
|
||||
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
for (auto prev_entry : cfd_to_sv) {
|
||||
CleanupSuperVersion(std::get<1>(prev_entry));
|
||||
}
|
||||
return s;
|
||||
}
|
||||
}
|
||||
}
|
||||
assert(cfd_to_sv.size() == column_families.size());
|
||||
if (read_options.tailing) {
|
||||
for (auto cfh : column_families) {
|
||||
auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||
for (auto [cfd, sv] : cfd_to_sv) {
|
||||
auto iter = new ForwardIterator(this, read_options, cfd, sv,
|
||||
/* allow_unprepared_value */ true);
|
||||
iterators->push_back(NewDBIterator(
|
||||
|
@ -3653,12 +3735,9 @@ Status DBImpl::NewIterators(
|
|||
auto snapshot = read_options.snapshot != nullptr
|
||||
? read_options.snapshot->GetSequenceNumber()
|
||||
: versions_->LastSequence();
|
||||
for (size_t i = 0; i < column_families.size(); ++i) {
|
||||
auto* cfd =
|
||||
static_cast_with_check<ColumnFamilyHandleImpl>(column_families[i])
|
||||
->cfd();
|
||||
for (auto [cfd, sv] : cfd_to_sv) {
|
||||
iterators->push_back(
|
||||
NewIteratorImpl(read_options, cfd, snapshot, read_callback));
|
||||
NewIteratorImpl(read_options, cfd, sv, snapshot, read_callback));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -648,7 +648,7 @@ class DBImpl : public DB {
|
|||
|
||||
// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
|
||||
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
|
||||
ColumnFamilyData* cfd,
|
||||
ColumnFamilyData* cfd, SuperVersion* sv,
|
||||
SequenceNumber snapshot,
|
||||
ReadCallback* read_callback,
|
||||
bool expose_blob_index = false,
|
||||
|
@ -1543,8 +1543,18 @@ class DBImpl : public DB {
|
|||
void SetDbSessionId();
|
||||
|
||||
Status FailIfCfHasTs(const ColumnFamilyHandle* column_family) const;
|
||||
Status FailIfTsMismatchCf(ColumnFamilyHandle* column_family, const Slice& ts,
|
||||
bool ts_for_read) const;
|
||||
Status FailIfTsMismatchCf(ColumnFamilyHandle* column_family,
|
||||
const Slice& ts) const;
|
||||
|
||||
// Check that the read timestamp `ts` is at or above the `full_history_ts_low`
|
||||
// timestamp in a `SuperVersion`. It's necessary to do this check after
|
||||
// grabbing the SuperVersion. If the check passed, the referenced SuperVersion
|
||||
// this read holds on to can ensure the read won't be affected if
|
||||
// `full_history_ts_low` is increased concurrently, and this achieves that
|
||||
// without explicitly locking by piggybacking the SuperVersion.
|
||||
Status FailIfReadCollapsedHistory(const ColumnFamilyData* cfd,
|
||||
const SuperVersion* sv,
|
||||
const Slice& ts) const;
|
||||
|
||||
// recovery_ctx stores the context about version edits and
|
||||
// LogAndApplyForRecovery persist all those edits to new Manifest after
|
||||
|
@ -2312,15 +2322,18 @@ class DBImpl : public DB {
|
|||
// If callback is non-null, the callback is refreshed with the snapshot
|
||||
// sequence number
|
||||
//
|
||||
// A return value of true indicates that the SuperVersions were obtained
|
||||
// from the ColumnFamilyData, whereas false indicates they are thread
|
||||
// local
|
||||
// `unref_only` being set to true indicates that the SuperVersions were
|
||||
// obtained from the ColumnFamilyData, whereas false indicates they are thread
|
||||
// local.
|
||||
// A non-OK status will be returned if for a column family that enables
|
||||
// user-defined timestamp feature, the specified `ReadOptions.timestamp`
|
||||
// attemps to read collapsed history.
|
||||
template <class T>
|
||||
bool MultiCFSnapshot(
|
||||
Status MultiCFSnapshot(
|
||||
const ReadOptions& read_options, ReadCallback* callback,
|
||||
std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
|
||||
iter_deref_func,
|
||||
T* cf_list, SequenceNumber* snapshot);
|
||||
T* cf_list, SequenceNumber* snapshot, bool* unref_only);
|
||||
|
||||
// The actual implementation of the batching MultiGet. The caller is expected
|
||||
// to have acquired the SuperVersion and pass in a snapshot sequence number
|
||||
|
@ -2829,8 +2842,7 @@ inline Status DBImpl::FailIfCfHasTs(
|
|||
}
|
||||
|
||||
inline Status DBImpl::FailIfTsMismatchCf(ColumnFamilyHandle* column_family,
|
||||
const Slice& ts,
|
||||
bool ts_for_read) const {
|
||||
const Slice& ts) const {
|
||||
if (!column_family) {
|
||||
return Status::InvalidArgument("column family handle cannot be null");
|
||||
}
|
||||
|
@ -2850,20 +2862,28 @@ inline Status DBImpl::FailIfTsMismatchCf(ColumnFamilyHandle* column_family,
|
|||
<< ts_sz << " given";
|
||||
return Status::InvalidArgument(oss.str());
|
||||
}
|
||||
if (ts_for_read) {
|
||||
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
|
||||
auto cfd = cfh->cfd();
|
||||
std::string current_ts_low = cfd->GetFullHistoryTsLow();
|
||||
if (!current_ts_low.empty() &&
|
||||
ucmp->CompareTimestamp(ts, current_ts_low) < 0) {
|
||||
std::stringstream oss;
|
||||
oss << "Read timestamp: " << ts.ToString(true)
|
||||
<< " is smaller than full_history_ts_low: "
|
||||
<< Slice(current_ts_low).ToString(true) << std::endl;
|
||||
return Status::InvalidArgument(oss.str());
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
inline Status DBImpl::FailIfReadCollapsedHistory(const ColumnFamilyData* cfd,
|
||||
const SuperVersion* sv,
|
||||
const Slice& ts) const {
|
||||
// Reaching to this point means the timestamp size matching sanity check in
|
||||
// `DBImpl::FailIfTsMismatchCf` already passed. So we skip that and assume
|
||||
// column family has the same user-defined timestamp format as `ts`.
|
||||
const Comparator* const ucmp = cfd->user_comparator();
|
||||
assert(ucmp);
|
||||
const std::string& full_history_ts_low = sv->full_history_ts_low;
|
||||
assert(full_history_ts_low.empty() ||
|
||||
full_history_ts_low.size() == ts.size());
|
||||
if (!full_history_ts_low.empty() &&
|
||||
ucmp->CompareTimestamp(ts, full_history_ts_low) < 0) {
|
||||
std::stringstream oss;
|
||||
oss << "Read timestamp: " << ts.ToString(true)
|
||||
<< " is smaller than full_history_ts_low: "
|
||||
<< Slice(full_history_ts_low).ToString(true) << std::endl;
|
||||
return Status::InvalidArgument(oss.str());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
|
|
@ -58,8 +58,7 @@ Status DBImplReadOnly::Get(const ReadOptions& _read_options,
|
|||
assert(column_family);
|
||||
if (read_options.timestamp) {
|
||||
const Status s =
|
||||
FailIfTsMismatchCf(column_family, *(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
FailIfTsMismatchCf(column_family, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -92,6 +91,13 @@ Status DBImplReadOnly::Get(const ReadOptions& _read_options,
|
|||
}
|
||||
}
|
||||
SuperVersion* super_version = cfd->GetSuperVersion();
|
||||
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||
s = FailIfReadCollapsedHistory(cfd, super_version,
|
||||
*(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
MergeContext merge_context;
|
||||
SequenceNumber max_covering_tombstone_seq = 0;
|
||||
LookupKey lkey(key, snapshot, read_options.timestamp);
|
||||
|
@ -137,8 +143,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& _read_options,
|
|||
assert(column_family);
|
||||
if (read_options.timestamp) {
|
||||
const Status s =
|
||||
FailIfTsMismatchCf(column_family, *(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
FailIfTsMismatchCf(column_family, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return NewErrorIterator(s);
|
||||
}
|
||||
|
@ -151,6 +156,14 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& _read_options,
|
|||
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
|
||||
auto cfd = cfh->cfd();
|
||||
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
|
||||
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||
const Status s = FailIfReadCollapsedHistory(cfd, super_version,
|
||||
*(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
cfd->GetSuperVersion()->Unref();
|
||||
return NewErrorIterator(s);
|
||||
}
|
||||
}
|
||||
SequenceNumber latest_snapshot = versions_->LastSequence();
|
||||
SequenceNumber read_seq =
|
||||
read_options.snapshot != nullptr
|
||||
|
@ -177,8 +190,7 @@ Status DBImplReadOnly::NewIterators(
|
|||
if (read_options.timestamp) {
|
||||
for (auto* cf : column_families) {
|
||||
assert(cf);
|
||||
const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -206,9 +218,27 @@ Status DBImplReadOnly::NewIterators(
|
|||
->number_
|
||||
: latest_snapshot;
|
||||
|
||||
autovector<std::tuple<ColumnFamilyData*, SuperVersion*>> cfd_to_sv;
|
||||
|
||||
const bool check_read_ts =
|
||||
read_options.timestamp && read_options.timestamp->size() > 0;
|
||||
for (auto cfh : column_families) {
|
||||
auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
|
||||
auto* sv = cfd->GetSuperVersion()->Ref();
|
||||
cfd_to_sv.emplace_back(cfd, sv);
|
||||
if (check_read_ts) {
|
||||
const Status s =
|
||||
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
for (auto prev_entry : cfd_to_sv) {
|
||||
std::get<1>(prev_entry)->Unref();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
}
|
||||
}
|
||||
assert(cfd_to_sv.size() == column_families.size());
|
||||
for (auto [cfd, sv] : cfd_to_sv) {
|
||||
auto* db_iter = NewArenaWrappedDbIterator(
|
||||
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
|
||||
sv->current, read_seq,
|
||||
|
|
|
@ -384,8 +384,8 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
|
|||
|
||||
assert(column_family);
|
||||
if (read_options.timestamp) {
|
||||
const Status s = FailIfTsMismatchCf(
|
||||
column_family, *(read_options.timestamp), /*ts_for_read=*/true);
|
||||
const Status s =
|
||||
FailIfTsMismatchCf(column_family, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -412,6 +412,14 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
|
|||
}
|
||||
// Acquire SuperVersion
|
||||
SuperVersion* super_version = GetAndRefSuperVersion(cfd);
|
||||
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||
const Status s = FailIfReadCollapsedHistory(cfd, super_version,
|
||||
*(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
ReturnAndCleanupSuperVersion(cfd, super_version);
|
||||
return s;
|
||||
}
|
||||
}
|
||||
SequenceNumber snapshot = versions_->LastSequence();
|
||||
GetWithTimestampReadCallback read_cb(snapshot);
|
||||
MergeContext merge_context;
|
||||
|
@ -491,8 +499,7 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& _read_options,
|
|||
assert(column_family);
|
||||
if (read_options.timestamp) {
|
||||
const Status s =
|
||||
FailIfTsMismatchCf(column_family, *(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
FailIfTsMismatchCf(column_family, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return NewErrorIterator(s);
|
||||
}
|
||||
|
@ -516,17 +523,25 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& _read_options,
|
|||
Status::NotSupported("snapshot not supported in secondary mode"));
|
||||
} else {
|
||||
SequenceNumber snapshot(kMaxSequenceNumber);
|
||||
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||
const Status s =
|
||||
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
CleanupSuperVersion(sv);
|
||||
return NewErrorIterator(s);
|
||||
}
|
||||
}
|
||||
result = NewIteratorImpl(read_options, cfd, sv, snapshot, read_callback);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
|
||||
const ReadOptions& read_options, ColumnFamilyData* cfd,
|
||||
SequenceNumber snapshot, ReadCallback* read_callback,
|
||||
bool expose_blob_index, bool allow_refresh) {
|
||||
SuperVersion* super_version, SequenceNumber snapshot,
|
||||
ReadCallback* read_callback, bool expose_blob_index, bool allow_refresh) {
|
||||
assert(nullptr != cfd);
|
||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
|
||||
assert(snapshot == kMaxSequenceNumber);
|
||||
snapshot = versions_->LastSequence();
|
||||
assert(snapshot != kMaxSequenceNumber);
|
||||
|
@ -572,8 +587,7 @@ Status DBImplSecondary::NewIterators(
|
|||
if (read_options.timestamp) {
|
||||
for (auto* cf : column_families) {
|
||||
assert(cf);
|
||||
const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp),
|
||||
/*ts_for_read=*/true);
|
||||
const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -597,10 +611,28 @@ Status DBImplSecondary::NewIterators(
|
|||
return Status::NotSupported("snapshot not supported in secondary mode");
|
||||
} else {
|
||||
SequenceNumber read_seq(kMaxSequenceNumber);
|
||||
autovector<std::tuple<ColumnFamilyData*, SuperVersion*>> cfd_to_sv;
|
||||
const bool check_read_ts =
|
||||
read_options.timestamp && read_options.timestamp->size() > 0;
|
||||
for (auto cfh : column_families) {
|
||||
ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||
cfd_to_sv.emplace_back(cfd, sv);
|
||||
if (check_read_ts) {
|
||||
const Status s =
|
||||
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
for (auto prev_entry : cfd_to_sv) {
|
||||
CleanupSuperVersion(std::get<1>(prev_entry));
|
||||
}
|
||||
return s;
|
||||
}
|
||||
}
|
||||
}
|
||||
assert(cfd_to_sv.size() == column_families.size());
|
||||
for (auto [cfd, sv] : cfd_to_sv) {
|
||||
iterators->push_back(
|
||||
NewIteratorImpl(read_options, cfd, read_seq, read_callback));
|
||||
NewIteratorImpl(read_options, cfd, sv, read_seq, read_callback));
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
|
|
|
@ -122,7 +122,7 @@ class DBImplSecondary : public DBImpl {
|
|||
ColumnFamilyHandle* column_family) override;
|
||||
|
||||
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options,
|
||||
ColumnFamilyData* cfd,
|
||||
ColumnFamilyData* cfd, SuperVersion* sv,
|
||||
SequenceNumber snapshot,
|
||||
ReadCallback* read_callback,
|
||||
bool expose_blob_index = false,
|
||||
|
|
|
@ -30,7 +30,7 @@ Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
|
|||
|
||||
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, const Slice& ts, const Slice& val) {
|
||||
const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false);
|
||||
const Status s = FailIfTsMismatchCf(column_family, ts);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
|
|||
|
||||
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, const Slice& ts, const Slice& val) {
|
||||
const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false);
|
||||
const Status s = FailIfTsMismatchCf(column_family, ts);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -83,7 +83,7 @@ Status DBImpl::Delete(const WriteOptions& write_options,
|
|||
Status DBImpl::Delete(const WriteOptions& write_options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& ts) {
|
||||
const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false);
|
||||
const Status s = FailIfTsMismatchCf(column_family, ts);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -103,7 +103,7 @@ Status DBImpl::SingleDelete(const WriteOptions& write_options,
|
|||
Status DBImpl::SingleDelete(const WriteOptions& write_options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& ts) {
|
||||
const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false);
|
||||
const Status s = FailIfTsMismatchCf(column_family, ts);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -124,7 +124,7 @@ Status DBImpl::DeleteRange(const WriteOptions& write_options,
|
|||
ColumnFamilyHandle* column_family,
|
||||
const Slice& begin_key, const Slice& end_key,
|
||||
const Slice& ts) {
|
||||
const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false);
|
||||
const Status s = FailIfTsMismatchCf(column_family, ts);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
|
|
@ -107,7 +107,10 @@ class DBIteratorTest : public DBIteratorBaseTest,
|
|||
read_callbacks_.push_back(
|
||||
std::unique_ptr<DummyReadCallback>(read_callback));
|
||||
}
|
||||
return dbfull()->NewIteratorImpl(read_options, cfd, seq, read_callback);
|
||||
DBImpl* db_impl = dbfull();
|
||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl);
|
||||
return db_impl->NewIteratorImpl(read_options, cfd, super_version, seq,
|
||||
read_callback);
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -3130,8 +3133,10 @@ TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) {
|
|||
static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
|
||||
->cfd();
|
||||
// The iterator are suppose to see data before seq1.
|
||||
Iterator* iter =
|
||||
dbfull()->NewIteratorImpl(ReadOptions(), cfd, seq2, &callback1);
|
||||
DBImpl* db_impl = dbfull();
|
||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl);
|
||||
Iterator* iter = db_impl->NewIteratorImpl(ReadOptions(), cfd, super_version,
|
||||
seq2, &callback1);
|
||||
|
||||
// Seek
|
||||
// The latest value of "foo" before seq1 is "v3"
|
||||
|
@ -3209,7 +3214,9 @@ TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) {
|
|||
SequenceNumber seq4 = db_->GetLatestSequenceNumber();
|
||||
|
||||
// The iterator is suppose to see data before seq3.
|
||||
iter = dbfull()->NewIteratorImpl(ReadOptions(), cfd, seq4, &callback2);
|
||||
super_version = cfd->GetReferencedSuperVersion(db_impl);
|
||||
iter = db_impl->NewIteratorImpl(ReadOptions(), cfd, super_version, seq4,
|
||||
&callback2);
|
||||
// Seek to "z", which is visible.
|
||||
iter->Seek("z");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
|
|
|
@ -336,6 +336,53 @@ TEST_F(DBReadOnlyTestWithTimestamp, Iterators) {
|
|||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBReadOnlyTestWithTimestamp, FullHistoryTsLowSanityCheckFail) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
// Use UDT in memtable only feature for this test, so we can control that
|
||||
// newly set `full_history_ts_low` collapse history when Flush happens.
|
||||
options.persist_user_defined_timestamps = false;
|
||||
options.allow_concurrent_memtable_write = false;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
std::string write_ts;
|
||||
PutFixed64(&write_ts, 1);
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "foo", write_ts, "val1"));
|
||||
|
||||
std::string full_history_ts_low;
|
||||
PutFixed64(&full_history_ts_low, 3);
|
||||
ASSERT_OK(db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
|
||||
full_history_ts_low));
|
||||
ASSERT_OK(Flush(0));
|
||||
|
||||
// Reopen the database in read only mode to test its timestamp support.
|
||||
Close();
|
||||
ASSERT_OK(ReadOnlyReopen(options));
|
||||
|
||||
// Reading below full_history_ts_low fails a sanity check.
|
||||
std::string read_ts;
|
||||
PutFixed64(&read_ts, 2);
|
||||
Slice read_ts_slice = read_ts;
|
||||
ReadOptions read_opts;
|
||||
read_opts.timestamp = &read_ts_slice;
|
||||
|
||||
// Get()
|
||||
std::string value;
|
||||
ASSERT_TRUE(db_->Get(read_opts, "foo", &value).IsInvalidArgument());
|
||||
// NewIterator()
|
||||
std::unique_ptr<Iterator> iter(
|
||||
db_->NewIterator(read_opts, db_->DefaultColumnFamily()));
|
||||
ASSERT_TRUE(iter->status().IsInvalidArgument());
|
||||
|
||||
// NewIterators()
|
||||
std::vector<ColumnFamilyHandle*> cfhs = {db_->DefaultColumnFamily()};
|
||||
std::vector<Iterator*> iterators;
|
||||
ASSERT_TRUE(
|
||||
db_->NewIterators(read_opts, cfhs, &iterators).IsInvalidArgument());
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBReadOnlyTestWithTimestamp, IteratorsReadTimestampSizeMismatch) {
|
||||
const int kNumKeysPerFile = 128;
|
||||
const uint64_t kMaxKey = 1024;
|
||||
|
|
|
@ -1561,6 +1561,55 @@ TEST_F(DBSecondaryTestWithTimestamp, IteratorsReadTimestampSizeMismatch) {
|
|||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBSecondaryTestWithTimestamp, FullHistoryTsLowSanityCheckFail) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
// Use UDT in memtable only feature for this test, so we can control that
|
||||
// newly set `full_history_ts_low` collapse history when Flush happens.
|
||||
options.persist_user_defined_timestamps = false;
|
||||
options.allow_concurrent_memtable_write = false;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
std::string write_ts;
|
||||
PutFixed64(&write_ts, 1);
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "foo", write_ts, "val1"));
|
||||
|
||||
std::string full_history_ts_low;
|
||||
PutFixed64(&full_history_ts_low, 3);
|
||||
ASSERT_OK(db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
|
||||
full_history_ts_low));
|
||||
ASSERT_OK(Flush(0));
|
||||
|
||||
// Reopen the database as secondary instance to test its timestamp support.
|
||||
Close();
|
||||
options.max_open_files = -1;
|
||||
ASSERT_OK(ReopenAsSecondary(options));
|
||||
|
||||
// Reading below full_history_ts_low fails a sanity check.
|
||||
std::string read_ts;
|
||||
PutFixed64(&read_ts, 2);
|
||||
Slice read_ts_slice = read_ts;
|
||||
ReadOptions read_opts;
|
||||
read_opts.timestamp = &read_ts_slice;
|
||||
|
||||
// Get()
|
||||
std::string value;
|
||||
ASSERT_TRUE(db_->Get(read_opts, "foo", &value).IsInvalidArgument());
|
||||
|
||||
// NewIterator()
|
||||
std::unique_ptr<Iterator> iter(
|
||||
db_->NewIterator(read_opts, db_->DefaultColumnFamily()));
|
||||
ASSERT_TRUE(iter->status().IsInvalidArgument());
|
||||
|
||||
// NewIterators()
|
||||
std::vector<ColumnFamilyHandle*> cfhs = {db_->DefaultColumnFamily()};
|
||||
std::vector<Iterator*> iterators;
|
||||
ASSERT_TRUE(
|
||||
db_->NewIterators(read_opts, cfhs, &iterators).IsInvalidArgument());
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBSecondaryTestWithTimestamp,
|
||||
IteratorsReadTimestampSpecifiedWithoutWriteTimestamp) {
|
||||
const int kNumKeysPerFile = 128;
|
||||
|
|
|
@ -3416,6 +3416,225 @@ TEST_F(DBBasicTestWithTimestamp, EnableDisableUDT) {
|
|||
Close();
|
||||
}
|
||||
|
||||
// Tests that as long as the
|
||||
// `ReadOptions.timestamp >= SuperVersion.full_history_ts_low` sanity check
|
||||
// passes. The read will be consistent even if the column family's
|
||||
// full_history_ts_low is concurrently increased and collapsed some history
|
||||
// above `ReadOptions.timestamp`.
|
||||
TEST_F(DBBasicTestWithTimestamp,
|
||||
FullHistoryTsLowSanityCheckPassReadIsConsistent) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
// Use UDT in memtable only feature for this test, so we can control that
|
||||
// newly set `full_history_ts_low` collapse history when Flush happens.
|
||||
options.persist_user_defined_timestamps = false;
|
||||
options.allow_concurrent_memtable_write = false;
|
||||
DestroyAndReopen(options);
|
||||
std::string min_ts;
|
||||
PutFixed64(&min_ts, 0);
|
||||
|
||||
// Write two versions of the key (1, v1), (3, v3), and always read with
|
||||
// timestamp 2.
|
||||
std::string write_ts;
|
||||
PutFixed64(&write_ts, 1);
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "foo", write_ts, "val1"));
|
||||
|
||||
std::string read_ts;
|
||||
PutFixed64(&read_ts, 2);
|
||||
Slice read_ts_slice = read_ts;
|
||||
ReadOptions read_opts;
|
||||
read_opts.timestamp = &read_ts_slice;
|
||||
|
||||
// First read, no full_history_ts_low set, sanity check pass.
|
||||
std::string value;
|
||||
std::string timestamp;
|
||||
ASSERT_OK(db_->Get(read_opts, "foo", &value, ×tamp));
|
||||
ASSERT_EQ("val1", value);
|
||||
ASSERT_EQ(write_ts, timestamp);
|
||||
|
||||
std::string full_history_ts_low;
|
||||
std::string marked_ts_low;
|
||||
PutFixed64(&full_history_ts_low, 2);
|
||||
marked_ts_low = full_history_ts_low;
|
||||
ASSERT_OK(db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
|
||||
full_history_ts_low));
|
||||
ASSERT_OK(Flush(0));
|
||||
|
||||
// Write the (3, v3) entry after flush, otherwise with UDT in memtable only
|
||||
// the previous Flush(0) with full_history_ts_low = 2 will be postponed
|
||||
// waiting for (3, v3) to expire too.
|
||||
write_ts.clear();
|
||||
PutFixed64(&write_ts, 3);
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "foo", write_ts, "val3"));
|
||||
|
||||
// Second read:
|
||||
// ReadOptions.timestamp(2) >= SuperVersion.full_history_ts_low(2),
|
||||
// and ReadOptions.timestamp(2) >= ColumnFamilyData.full_history_ts_low(2).
|
||||
// history below 2 is collapsed. Reading at 2 or above 2 is ok.
|
||||
// Sanity check pass. Read return consistent value, but timestamp is already
|
||||
// collapsed.
|
||||
ASSERT_OK(db_->Get(read_opts, "foo", &value, ×tamp));
|
||||
ASSERT_EQ("val1", value);
|
||||
ASSERT_EQ(min_ts, timestamp);
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::GetImpl:AfterAcquireSv", [&](void* /*arg*/) {
|
||||
// Concurrently increasing full_history_ts_low and flush to create a
|
||||
// new SuperVersion
|
||||
std::string current_ts_low;
|
||||
ASSERT_OK(db_->GetFullHistoryTsLow(db_->DefaultColumnFamily(),
|
||||
¤t_ts_low));
|
||||
if (current_ts_low.empty() || current_ts_low != marked_ts_low) {
|
||||
return;
|
||||
}
|
||||
full_history_ts_low.clear();
|
||||
PutFixed64(&full_history_ts_low, 4);
|
||||
ASSERT_OK(db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
|
||||
full_history_ts_low));
|
||||
ASSERT_OK(Flush(0));
|
||||
});
|
||||
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Third read:
|
||||
// ReadOptions.timestamp(2) >= SuperVersion.full_history_ts_low(2),
|
||||
// but ReadOptions.timestamp(2) < ColumnFamilyData.full_history_ts_low(4).
|
||||
// History below 4 is collapsed in the newly installed SuperVersion. But the
|
||||
// SuperVersion attached to this read still has the history below 4 available.
|
||||
// Sanity check pass. Read return consistent value, timestamp is collapsed.
|
||||
ASSERT_OK(db_->Get(read_opts, "foo", &value, ×tamp));
|
||||
ASSERT_EQ("val1", value);
|
||||
ASSERT_EQ(min_ts, timestamp);
|
||||
|
||||
// Fourth read:
|
||||
// ReadOptions.timestamp(2) < SuperVersion.full_history_ts_low(4).
|
||||
// Sanity check fails. Had it succeeded, the read would return "v3",
|
||||
// which is inconsistent.
|
||||
ASSERT_TRUE(
|
||||
db_->Get(read_opts, "foo", &value, ×tamp).IsInvalidArgument());
|
||||
Close();
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
// Tests that in cases when
|
||||
// `ReadOptions.timestamp >= SuperVersion.full_history_ts_low` sanity check
|
||||
// fails. The referenced SuperVersion is dereferenced and cleaned up properly
|
||||
// for all read APIs that involves this sanity check.
|
||||
TEST_F(DBBasicTestWithTimestamp, FullHistoryTsLowSanityCheckFail) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
// Use UDT in memtable only feature for this test, so we can control that
|
||||
// newly set `full_history_ts_low` collapse history when Flush happens.
|
||||
options.persist_user_defined_timestamps = false;
|
||||
options.allow_concurrent_memtable_write = false;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
ColumnFamilyHandle* handle2 = nullptr;
|
||||
Status s = db_->CreateColumnFamily(options, "data", &handle2);
|
||||
ASSERT_OK(s);
|
||||
|
||||
std::string write_ts;
|
||||
PutFixed64(&write_ts, 1);
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "foo", write_ts, "val1"));
|
||||
ASSERT_OK(db_->Put(WriteOptions(), handle2, "foo", write_ts, "val1"));
|
||||
|
||||
std::string full_history_ts_low;
|
||||
PutFixed64(&full_history_ts_low, 3);
|
||||
ASSERT_OK(db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
|
||||
full_history_ts_low));
|
||||
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handle2, full_history_ts_low));
|
||||
ASSERT_OK(Flush(0));
|
||||
ASSERT_OK(db_->Flush(FlushOptions(), handle2));
|
||||
|
||||
std::string read_ts;
|
||||
PutFixed64(&read_ts, 2);
|
||||
Slice read_ts_slice = read_ts;
|
||||
ReadOptions read_opts;
|
||||
read_opts.timestamp = &read_ts_slice;
|
||||
|
||||
// Get()
|
||||
std::string value;
|
||||
ASSERT_TRUE(db_->Get(read_opts, "foo", &value).IsInvalidArgument());
|
||||
|
||||
// MultiGet()
|
||||
std::vector<ColumnFamilyHandle*> cfhs = {db_->DefaultColumnFamily(), handle2};
|
||||
{
|
||||
std::vector<std::string> key_vals = {"foo", "foo"};
|
||||
std::vector<Slice> keys;
|
||||
std::vector<std::string> values;
|
||||
for (size_t j = 0; j < 2; ++j) {
|
||||
keys.push_back(key_vals[j]);
|
||||
}
|
||||
|
||||
std::vector<Status> statuses =
|
||||
db_->MultiGet(read_opts, cfhs, keys, &values);
|
||||
for (auto status : statuses) {
|
||||
ASSERT_TRUE(status.IsInvalidArgument());
|
||||
}
|
||||
}
|
||||
|
||||
// MultiGet with only one column family
|
||||
{
|
||||
std::vector<ColumnFamilyHandle*> one_cfh = {db_->DefaultColumnFamily()};
|
||||
std::vector<std::string> key_vals = {"foo"};
|
||||
std::vector<Slice> keys;
|
||||
std::vector<std::string> values;
|
||||
for (size_t j = 0; j < 1; ++j) {
|
||||
keys.push_back(key_vals[j]);
|
||||
}
|
||||
|
||||
std::vector<Status> statuses =
|
||||
db_->MultiGet(read_opts, one_cfh, keys, &values);
|
||||
for (auto status : statuses) {
|
||||
ASSERT_TRUE(status.IsInvalidArgument());
|
||||
}
|
||||
}
|
||||
|
||||
// Overloaded version of MultiGet
|
||||
ColumnFamilyHandle* column_families[] = {db_->DefaultColumnFamily(), handle2};
|
||||
{
|
||||
Slice keys[] = {"foo", "foo"};
|
||||
PinnableSlice values[] = {PinnableSlice(), PinnableSlice()};
|
||||
Status statuses[] = {Status::OK(), Status::OK()};
|
||||
db_->MultiGet(read_opts, /*num_keys=*/2, &column_families[0], &keys[0],
|
||||
&values[0], &statuses[0], /*sorted_input=*/false);
|
||||
for (auto status : statuses) {
|
||||
ASSERT_TRUE(status.IsInvalidArgument());
|
||||
}
|
||||
}
|
||||
|
||||
// Overloaded versions of MultiGet with one column family
|
||||
{
|
||||
ColumnFamilyHandle* one_column_family[] = {db_->DefaultColumnFamily()};
|
||||
Slice keys[] = {"foo"};
|
||||
PinnableSlice values[] = {PinnableSlice()};
|
||||
Status statuses[] = {Status::OK()};
|
||||
db_->MultiGet(read_opts, /*num_keys=*/1, &one_column_family[0], &keys[0],
|
||||
&values[0], &statuses[0], /*sorted_input=*/false);
|
||||
for (auto status : statuses) {
|
||||
ASSERT_TRUE(status.IsInvalidArgument());
|
||||
}
|
||||
}
|
||||
|
||||
// NewIterator()
|
||||
std::unique_ptr<Iterator> iter(
|
||||
db_->NewIterator(read_opts, db_->DefaultColumnFamily()));
|
||||
ASSERT_TRUE(iter->status().IsInvalidArgument());
|
||||
std::unique_ptr<Iterator> iter2(db_->NewIterator(read_opts, handle2));
|
||||
ASSERT_TRUE(iter2->status().IsInvalidArgument());
|
||||
|
||||
// NewIterators()
|
||||
std::vector<Iterator*> iterators;
|
||||
ASSERT_TRUE(
|
||||
db_->NewIterators(read_opts, cfhs, &iterators).IsInvalidArgument());
|
||||
delete handle2;
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTestWithTimestamp,
|
||||
GCPreserveRangeTombstoneWhenNoOrSmallFullHistoryLow) {
|
||||
Options options = CurrentOptions();
|
||||
|
|
|
@ -2101,8 +2101,9 @@ Iterator* BlobDBImpl::NewIterator(const ReadOptions& _read_options) {
|
|||
own_snapshot = new ManagedSnapshot(db_);
|
||||
snapshot = own_snapshot->snapshot();
|
||||
}
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl_);
|
||||
auto* iter = db_impl_->NewIteratorImpl(
|
||||
read_options, cfd, snapshot->GetSequenceNumber(),
|
||||
read_options, cfd, sv, snapshot->GetSequenceNumber(),
|
||||
nullptr /*read_callback*/, true /*expose_blob_index*/);
|
||||
return new BlobDBIterator(own_snapshot, iter, this, clock_, statistics_);
|
||||
}
|
||||
|
|
|
@ -183,8 +183,8 @@ inline Status WriteCommittedTxn::GetForUpdateImpl(
|
|||
value, exclusive, do_validate);
|
||||
}
|
||||
} else {
|
||||
Status s = db_impl_->FailIfTsMismatchCf(
|
||||
column_family, *(read_options.timestamp), /*ts_for_read=*/true);
|
||||
Status s =
|
||||
db_impl_->FailIfTsMismatchCf(column_family, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
|
|
@ -413,9 +413,10 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& _read_options,
|
|||
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
|
||||
auto* state =
|
||||
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
|
||||
auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfd, snapshot_seq,
|
||||
&state->callback, expose_blob_index,
|
||||
allow_refresh);
|
||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_);
|
||||
auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfd, super_version,
|
||||
snapshot_seq, &state->callback,
|
||||
expose_blob_index, allow_refresh);
|
||||
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
|
||||
return db_iter;
|
||||
}
|
||||
|
@ -461,8 +462,9 @@ Status WritePreparedTxnDB::NewIterators(
|
|||
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
|
||||
auto* state =
|
||||
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
|
||||
auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfd, snapshot_seq,
|
||||
&state->callback,
|
||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_);
|
||||
auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfd, super_version,
|
||||
snapshot_seq, &state->callback,
|
||||
expose_blob_index, allow_refresh);
|
||||
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
|
||||
iterators->push_back(db_iter);
|
||||
|
|
|
@ -472,9 +472,10 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& _read_options,
|
|||
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
|
||||
auto* state =
|
||||
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
|
||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_);
|
||||
auto* db_iter = db_impl_->NewIteratorImpl(
|
||||
read_options, cfd, state->MaxVisibleSeq(), &state->callback,
|
||||
expose_blob_index, allow_refresh);
|
||||
read_options, cfd, super_version, state->MaxVisibleSeq(),
|
||||
&state->callback, expose_blob_index, allow_refresh);
|
||||
db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
|
||||
return db_iter;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue