mirror of https://github.com/facebook/rocksdb.git
Use manifest to persist pre-allocated seqnos (#11995)
Summary: ... and other fixes for crash test after https://github.com/facebook/rocksdb/issues/11922. * When pre-allocating sequence numbers for establishing a time history, record that last sequence number in the manifest so that it is (most likely) restored on recovery even if no user writes were made or were recovered (e.g. no WAL). * When pre-allocating sequence numbers for establishing a time history, only do this for actually new DBs. * Remove the feature that ensures non-zero sequence number on creating the first column family with preserve/preclude option after initial DB::Open. Until fixed in a way compatible with the crash test, this creates a gap where some data written with active preserve/preclude option won't have a known associated time. Together, these ensure we don't upset the crash test by manipulating sequence numbers after initial DB creation (esp when re-opening with different options). (The crash test expects that the seqno after re-open corresponds to a known point in time from previous crash test operation, matching an expected DB state.) Follow-up work: * Re-fill the gap to ensure all data written under preserve/preclude settings have a known time estimate. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11995 Test Plan: Added to unit test SeqnoTimeTablePropTest.PrePopulateInDB Verified fixes two crash test scenarios: ## 1st reproducer First apply ``` diff --git a/db_stress_tool/expected_state.cc b/db_stress_tool/expected_state.cc index b483e154c..ef63b8d6c 100644 --- a/db_stress_tool/expected_state.cc +++ b/db_stress_tool/expected_state.cc @@ -333,6 +333,7 @@ Status FileExpectedStateManager::SaveAtAndAfter(DB* db) { s = NewFileTraceWriter(Env::Default(), soptions, trace_file_path, &trace_writer); } + if (getenv("CRASH")) assert(false); if (s.ok()) { TraceOptions trace_opts; trace_opts.filter |= kTraceFilterGet; ``` Then ``` mkdir -p /dev/shm/rocksdb_test/rocksdb_crashtest_expected mkdir -p /dev/shm/rocksdb_test/rocksdb_crashtest_whitebox rm -rf /dev/shm/rocksdb_test/rocksdb_crashtest_*/* CRASH=1 ./db_stress --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --destroy_db_initially=1 --manual_wal_flush_one_in=1000000 --clear_column_family_one_in=0 --preserve_internal_time_seconds=36000 ./db_stress --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --destroy_db_initially=0 --manual_wal_flush_one_in=1000000 --clear_column_family_one_in=0 --preserve_internal_time_seconds=0 ``` Without the fix you get ``` ... DB path: [/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox] (Re-)verified 34 unique IDs Error restoring historical expected values: Corruption: DB is older than any restorable expected state ``` ## 2nd reproducer First apply ``` diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 62ddead7b..f2654980f 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -1126,6 +1126,7 @@ void StressTest::OperateDb(ThreadState* thread) { // OPERATION write TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys, value); + if (getenv("CRASH")) assert(false); } else if (prob_op < del_bound) { assert(write_bound <= prob_op); // OPERATION delete ``` Then ``` rm -rf /dev/shm/rocksdb_test/rocksdb_crashtest_*/* CRASH=1 ./db_stress --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --destroy_db_initially=1 --manual_wal_flush_one_in=1000000 --clear_column_family_one_in=0 --disable_wal=1 --reopen=0 --preserve_internal_time_seconds=0 ./db_stress --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --destroy_db_initially=0 --manual_wal_flush_one_in=1000000 --clear_column_family_one_in=0 --disable_wal=1 --reopen=0 --preserve_internal_time_seconds=3600 ``` Without the fix you get ``` DB path: [/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox] (Re-)verified 34 unique IDs db_stress: db_stress_tool/expected_state.cc:380: virtual rocksdb::{anonymous}::ExpectedStateTraceRecordHandler::~ ExpectedStateTraceRecordHandler(): Assertion `IsDone()' failed. ``` Reviewed By: jowlyzhang Differential Revision: D50533346 Pulled By: pdillinger fbshipit-source-id: 1056be45c5b9e537c8c601b28c4b27431a782477
This commit is contained in:
parent
543191f2ea
commit
4155087746
|
@ -796,10 +796,8 @@ Status DBImpl::StartPeriodicTaskScheduler() {
|
|||
return s;
|
||||
}
|
||||
|
||||
Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
|
||||
if (!from_db_open) {
|
||||
options_mutex_.AssertHeld();
|
||||
}
|
||||
Status DBImpl::RegisterRecordSeqnoTimeWorker(bool is_new_db) {
|
||||
options_mutex_.AssertHeld();
|
||||
|
||||
uint64_t min_preserve_seconds = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t max_preserve_seconds = std::numeric_limits<uint64_t>::min();
|
||||
|
@ -853,7 +851,17 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
|
|||
// 2) In any DB, any data written after setting preserve/preclude options
|
||||
// must have a reasonable time estimate (so that we can accurately place
|
||||
// the data), which means at least one entry in seqno_to_time_mapping_.
|
||||
if (from_db_open && GetLatestSequenceNumber() == 0) {
|
||||
//
|
||||
// FIXME: We don't currently guarantee that if the first column family with
|
||||
// that setting is added or configured after initial DB::Open but before
|
||||
// the first user Write. Fixing this causes complications with the crash
|
||||
// test because if DB starts without preserve/preclude option, does some
|
||||
// user writes but all those writes are lost in crash, then re-opens with
|
||||
// preserve/preclude option, it sees seqno==1 which looks like one of the
|
||||
// user writes was recovered, when actually it was not.
|
||||
bool last_seqno_zero = GetLatestSequenceNumber() == 0;
|
||||
assert(!is_new_db || last_seqno_zero);
|
||||
if (is_new_db && last_seqno_zero) {
|
||||
// Pre-allocate seqnos and pre-populate historical mapping
|
||||
assert(mapping_was_empty);
|
||||
|
||||
|
@ -862,16 +870,31 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
|
|||
versions_->SetLastAllocatedSequence(kMax);
|
||||
versions_->SetLastPublishedSequence(kMax);
|
||||
versions_->SetLastSequence(kMax);
|
||||
|
||||
// And record in manifest, to avoid going backwards in seqno on re-open
|
||||
// (potentially with different options). Concurrency is simple because we
|
||||
// are in DB::Open
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
VersionEdit edit;
|
||||
edit.SetLastSequence(kMax);
|
||||
s = versions_->LogAndApplyToDefaultColumnFamily(
|
||||
{}, &edit, &mutex_, directories_.GetDbDir());
|
||||
if (!s.ok() && versions_->io_status().IsIOError()) {
|
||||
s = error_handler_.SetBGError(versions_->io_status(),
|
||||
BackgroundErrorReason::kManifestWrite);
|
||||
}
|
||||
}
|
||||
|
||||
// Pre-populate mappings for reserved sequence numbers.
|
||||
RecordSeqnoToTimeMapping(max_preserve_seconds);
|
||||
} else if (mapping_was_empty) {
|
||||
// To ensure there is at least one mapping, we need a non-zero sequence
|
||||
// number. Outside of DB::Open, we have to be careful.
|
||||
versions_->EnsureNonZeroSequence();
|
||||
assert(GetLatestSequenceNumber() > 0);
|
||||
|
||||
// Ensure at least one mapping (or log a warning)
|
||||
RecordSeqnoToTimeMapping(/*populate_historical_seconds=*/0);
|
||||
if (!last_seqno_zero) {
|
||||
// Ensure at least one mapping (or log a warning)
|
||||
RecordSeqnoToTimeMapping(/*populate_historical_seconds=*/0);
|
||||
} else {
|
||||
// FIXME (see limitation described above)
|
||||
}
|
||||
}
|
||||
|
||||
s = periodic_task_scheduler_.Register(
|
||||
|
@ -6493,7 +6516,7 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) {
|
|||
assert(unix_time > populate_historical_seconds);
|
||||
}
|
||||
} else {
|
||||
assert(seqno > 0);
|
||||
// FIXME: assert(seqno > 0);
|
||||
appended = seqno_to_time_mapping_.Append(seqno, unix_time);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1395,6 +1395,7 @@ class DBImpl : public DB {
|
|||
autovector<autovector<VersionEdit*>> edit_lists_;
|
||||
// files_to_delete_ contains sst files
|
||||
std::unordered_set<std::string> files_to_delete_;
|
||||
bool is_new_db_ = false;
|
||||
};
|
||||
|
||||
// Persist options to options file. Must be holding options_mutex_.
|
||||
|
@ -2168,7 +2169,7 @@ class DBImpl : public DB {
|
|||
// Cancel scheduled periodic tasks
|
||||
Status CancelPeriodicTaskScheduler();
|
||||
|
||||
Status RegisterRecordSeqnoTimeWorker(bool from_db_open);
|
||||
Status RegisterRecordSeqnoTimeWorker(bool is_new_db);
|
||||
|
||||
void PrintStatistics();
|
||||
|
||||
|
|
|
@ -418,7 +418,8 @@ Status DBImpl::Recover(
|
|||
uint64_t* recovered_seq, RecoveryContext* recovery_ctx) {
|
||||
mutex_.AssertHeld();
|
||||
|
||||
bool is_new_db = false;
|
||||
bool tmp_is_new_db = false;
|
||||
bool& is_new_db = recovery_ctx ? recovery_ctx->is_new_db_ : tmp_is_new_db;
|
||||
assert(db_lock_ == nullptr);
|
||||
std::vector<std::string> files_in_dbname;
|
||||
if (!read_only) {
|
||||
|
@ -2247,7 +2248,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
|||
s = impl->StartPeriodicTaskScheduler();
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = impl->RegisterRecordSeqnoTimeWorker(/*from_db_open=*/true);
|
||||
s = impl->RegisterRecordSeqnoTimeWorker(recovery_ctx.is_new_db_);
|
||||
}
|
||||
impl->options_mutex_.Unlock();
|
||||
if (!s.ok()) {
|
||||
|
|
|
@ -843,6 +843,10 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) {
|
|||
// interfere with the seqno-to-time mapping getting a starting entry.
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
} else {
|
||||
// FIXME: currently, starting entry after CreateColumnFamily requires
|
||||
// non-zero seqno
|
||||
ASSERT_OK(Delete("blah"));
|
||||
}
|
||||
|
||||
// Unfortunately, if we add a CF with preserve/preclude option after
|
||||
|
@ -899,6 +903,9 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) {
|
|||
}
|
||||
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
|
||||
ASSERT_EQ(sttm.Size(), 0);
|
||||
if (!with_write) {
|
||||
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);
|
||||
}
|
||||
|
||||
ASSERT_OK(ReadOnlyReopen(track_options));
|
||||
if (with_write) {
|
||||
|
@ -906,6 +913,16 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) {
|
|||
}
|
||||
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
|
||||
ASSERT_EQ(sttm.Size(), 0);
|
||||
if (!with_write) {
|
||||
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);
|
||||
|
||||
// And even if we re-open read-write, we do not get pre-population,
|
||||
// because that's only for new DBs.
|
||||
Reopen(track_options);
|
||||
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
|
||||
ASSERT_EQ(sttm.Size(), 0);
|
||||
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
// #### DB#5: Destroy and open with preserve/preclude option ####
|
||||
|
@ -987,6 +1004,13 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) {
|
|||
// Oldest tracking time maps to first pre-allocated seqno
|
||||
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time - kPreserveSecs), 1);
|
||||
|
||||
// Even after no writes and DB re-open without tracking options, sequence
|
||||
// numbers should not go backward into those that were pre-allocated.
|
||||
// (Future work: persist the mapping)
|
||||
ReopenWithColumnFamilies({"default", "one"},
|
||||
List({base_options, base_options}));
|
||||
ASSERT_EQ(latest_seqno, db_->GetLatestSequenceNumber());
|
||||
|
||||
Close();
|
||||
}
|
||||
|
||||
|
|
|
@ -7234,20 +7234,6 @@ Status VersionSet::VerifyFileMetadata(const ReadOptions& read_options,
|
|||
return status;
|
||||
}
|
||||
|
||||
void VersionSet::EnsureNonZeroSequence() {
|
||||
uint64_t expected = 0;
|
||||
// Update each from 0->1, in order, or abort if any becomes non-zero in
|
||||
// parallel
|
||||
if (last_allocated_sequence_.compare_exchange_strong(expected, 1)) {
|
||||
if (last_published_sequence_.compare_exchange_strong(expected, 1)) {
|
||||
(void)last_sequence_.compare_exchange_strong(expected, 1);
|
||||
}
|
||||
}
|
||||
assert(last_allocated_sequence_.load() > 0);
|
||||
assert(last_published_sequence_.load() > 0);
|
||||
assert(last_sequence_.load() > 0);
|
||||
}
|
||||
|
||||
ReactiveVersionSet::ReactiveVersionSet(
|
||||
const std::string& dbname, const ImmutableDBOptions* _db_options,
|
||||
const FileOptions& _file_options, Cache* table_cache,
|
||||
|
|
|
@ -1344,9 +1344,6 @@ class VersionSet {
|
|||
last_allocated_sequence_.store(s, std::memory_order_seq_cst);
|
||||
}
|
||||
|
||||
// Allocate a dummy sequence number as needed to ensure last is non-zero.
|
||||
void EnsureNonZeroSequence();
|
||||
|
||||
// Note: memory_order_release must be sufficient
|
||||
uint64_t FetchAddLastAllocatedSequence(uint64_t s) {
|
||||
return last_allocated_sequence_.fetch_add(s, std::memory_order_seq_cst);
|
||||
|
|
Loading…
Reference in New Issue