diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 6fa4373c2b..25bc0b36f6 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -2473,7 +2473,10 @@ void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id, } } // anonymous namespace -TEST_P(ColumnFamilyTest, CreateAndDropRace) { +// This test attempts to set up a race condition in a way that is no longer +// possible, causing the test to hang. If DBImpl::options_mutex_ is removed +// in the future, this test might become relevant again. +TEST_P(ColumnFamilyTest, DISABLED_CreateAndDropRace) { const int kCfCount = 5; std::vector cf_opts; std::vector comparators; @@ -2535,6 +2538,53 @@ TEST_P(ColumnFamilyTest, CreateAndDropRace) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_P(ColumnFamilyTest, CreateAndDropPeriodicRace) { + // This is a mini-stress test looking for inconsistency between the set of + // CFs in the DB, particularly whether any use preserve_internal_time_seconds, + // and whether that is accurately reflected in the periodic task setup. + constexpr size_t kNumThreads = 12; + std::vector threads; + bool last_cf_on = Random::GetTLSInstance()->OneIn(2); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::RegisterRecordSeqnoTimeWorker:BeforePeriodicTaskType", + [&](void* /*arg*/) { std::this_thread::yield(); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_EQ(column_family_options_.preserve_internal_time_seconds, 0U); + ColumnFamilyOptions other_opts = column_family_options_; + ColumnFamilyOptions last_opts = column_family_options_; + (last_cf_on ? last_opts : other_opts).preserve_internal_time_seconds = + 1000000; + Open(); + + for (size_t i = 0; i < kNumThreads; i++) { + threads.emplace_back([this, &other_opts, i]() { + ColumnFamilyHandle* cfh; + ASSERT_OK(db_->CreateColumnFamily(other_opts, std::to_string(i), &cfh)); + ASSERT_OK(db_->DropColumnFamily(cfh)); + ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh)); + }); + } + + ColumnFamilyHandle* last_cfh; + ASSERT_OK(db_->CreateColumnFamily(last_opts, "last", &last_cfh)); + + for (auto& t : threads) { + t.join(); + } + + bool task_enabled = dbfull()->TEST_GetPeriodicTaskScheduler().TEST_HasTask( + PeriodicTaskType::kRecordSeqnoTime); + ASSERT_EQ(last_cf_on, task_enabled); + + ASSERT_OK(db_->DropColumnFamily(last_cfh)); + ASSERT_OK(db_->DestroyColumnFamilyHandle(last_cfh)); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) { const uint64_t kBaseRate = 800000u; db_options_.delayed_write_rate = kBaseRate; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 0b8d21790e..140d334ce4 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -817,6 +817,10 @@ Status DBImpl::StartPeriodicTaskScheduler() { } Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) { + if (!from_db_open) { + options_mutex_.AssertHeld(); + } + uint64_t min_preserve_seconds = std::numeric_limits::max(); uint64_t max_preserve_seconds = std::numeric_limits::min(); bool mapping_was_empty = false; @@ -840,11 +844,6 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) { } mapping_was_empty = seqno_to_time_mapping_.Empty(); } - // FIXME: because we released the db mutex, there's a race here where - // if e.g. I create or drop two column families in parallel, I might end up - // with the periodic task scheduler in the wrong state. We don't want to - // just keep holding the mutex, however, because of global timer and mutex - // in PeriodicTaskScheduler. uint64_t seqno_time_cadence = 0; if (min_preserve_seconds != std::numeric_limits::max()) { @@ -855,6 +854,9 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) { SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF; } + TEST_SYNC_POINT_CALLBACK( + "DBImpl::RegisterRecordSeqnoTimeWorker:BeforePeriodicTaskType", nullptr); + Status s; if (seqno_time_cadence == 0) { s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kRecordSeqnoTime); @@ -918,6 +920,7 @@ Status DBImpl::CancelPeriodicTaskScheduler() { // esitmate the total size of stats_history_ size_t DBImpl::EstimateInMemoryStatsHistorySize() const { + stats_history_mutex_.AssertHeld(); size_t size_total = sizeof(std::map>); if (stats_history_.size() == 0) return size_total; @@ -1208,6 +1211,7 @@ Status DBImpl::SetOptions( return Status::InvalidArgument("empty input"); } + InstrumentedMutexLock ol(&options_mutex_); MutableCFOptions new_options; Status s; Status persist_options_status; @@ -1266,6 +1270,7 @@ Status DBImpl::SetDBOptions( return Status::InvalidArgument("empty input"); } + InstrumentedMutexLock ol(&options_mutex_); MutableDBOptions new_options; Status s; Status persist_options_status = Status::OK(); @@ -3362,6 +3367,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family, ColumnFamilyHandle** handle) { assert(handle != nullptr); + InstrumentedMutexLock ol(&options_mutex_); Status s = CreateColumnFamilyImpl(cf_options, column_family, handle); if (s.ok()) { s.UpdateIfOk(WrapUpCreateColumnFamilies({&cf_options})); @@ -3374,6 +3380,7 @@ Status DBImpl::CreateColumnFamilies( const std::vector& column_family_names, std::vector* handles) { assert(handles != nullptr); + InstrumentedMutexLock ol(&options_mutex_); handles->clear(); size_t num_cf = column_family_names.size(); Status s; @@ -3397,6 +3404,7 @@ Status DBImpl::CreateColumnFamilies( const std::vector& column_families, std::vector* handles) { assert(handles != nullptr); + InstrumentedMutexLock ol(&options_mutex_); handles->clear(); size_t num_cf = column_families.size(); Status s; @@ -3423,6 +3431,7 @@ Status DBImpl::CreateColumnFamilies( Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, const std::string& column_family_name, ColumnFamilyHandle** handle) { + options_mutex_.AssertHeld(); // TODO: plumb Env::IOActivity const ReadOptions read_options; Status s; @@ -3514,6 +3523,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { assert(column_family != nullptr); + InstrumentedMutexLock ol(&options_mutex_); Status s = DropColumnFamilyImpl(column_family); if (s.ok()) { s = WriteOptionsFile(true /*need_mutex_lock*/, @@ -3524,6 +3534,7 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { Status DBImpl::DropColumnFamilies( const std::vector& column_families) { + InstrumentedMutexLock ol(&options_mutex_); Status s; bool success_once = false; for (auto* handle : column_families) { @@ -5164,6 +5175,8 @@ Status DestroyDB(const std::string& dbname, const Options& options, Status DBImpl::WriteOptionsFile(bool need_mutex_lock, bool need_enter_write_thread) { + options_mutex_.AssertHeld(); + WriteThread::Writer w; if (need_mutex_lock) { mutex_.Lock(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 5e7e87bb77..73f4d2e5d9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2379,9 +2379,19 @@ class DBImpl : public DB { // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; - // In addition to mutex_, log_write_mutex_ protected writes to stats_history_ + // Guards changes to DB and CF options to ensure consistency between + // * In-memory options objects + // * Settings in effect + // * Options file contents + // while allowing the DB mutex to be released during slow operations like + // persisting options file or modifying global periodic task timer. + // Always acquired *before* DB mutex when this one is applicable. + InstrumentedMutex options_mutex_; + + // Guards reads and writes to in-memory stats_history_. InstrumentedMutex stats_history_mutex_; - // In addition to mutex_, log_write_mutex_ protected writes to logs_ and + + // In addition to mutex_, log_write_mutex_ protects writes to logs_ and // logfile_number_. With two_write_queues it also protects alive_log_files_, // and log_empty_. Refer to the definition of each variable below for more // details. diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 670bc78872..8be960c7b1 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -311,6 +311,7 @@ SeqnoToTimeMapping DBImpl::TEST_GetSeqnoToTimeMapping() const { size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const { + InstrumentedMutexLock l(&const_cast(this)->stats_history_mutex_); return EstimateInMemoryStatsHistorySize(); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index d48f66ae54..89a9f33e17 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -871,7 +871,8 @@ Status DBImpl::PersistentStatsProcessFormatVersion() { if (s.ok()) { ColumnFamilyOptions cfo; OptimizeForPersistentStats(&cfo); - s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle); + s = CreateColumnFamilyImpl(cfo, kPersistentStatsColumnFamilyName, + &handle); } if (s.ok()) { persist_stats_cf_handle_ = static_cast(handle); @@ -924,7 +925,7 @@ Status DBImpl::InitPersistStatsColumnFamily() { ColumnFamilyHandle* handle = nullptr; ColumnFamilyOptions cfo; OptimizeForPersistentStats(&cfo); - s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle); + s = CreateColumnFamilyImpl(cfo, kPersistentStatsColumnFamilyName, &handle); persist_stats_cf_handle_ = static_cast(handle); mutex_.Lock(); } @@ -1988,6 +1989,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath(); RecoveryContext recovery_ctx; + impl->options_mutex_.Lock(); impl->mutex_.Lock(); // Handles create_if_missing, error_if_exists @@ -2124,7 +2126,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, // The WriteOptionsFile() will release and lock the mutex internally. persist_options_status = impl->WriteOptionsFile( false /*need_mutex_lock*/, false /*need_enter_write_thread*/); - *dbptr = impl; impl->opened_successfully_ = true; impl->DeleteObsoleteFiles(); @@ -2245,10 +2246,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { s = impl->StartPeriodicTaskScheduler(); } - if (s.ok()) { s = impl->RegisterRecordSeqnoTimeWorker(/*from_db_open=*/true); } + impl->options_mutex_.Unlock(); if (!s.ok()) { for (auto* h : *handles) { delete h; diff --git a/db/db_options_test.cc b/db/db_options_test.cc index e709dcaaaa..7e77ac55e9 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -19,6 +19,7 @@ #include "rocksdb/convenience.h" #include "rocksdb/rate_limiter.h" #include "rocksdb/stats_history.h" +#include "rocksdb/utilities/options_util.h" #include "test_util/mock_time_env.h" #include "test_util/sync_point.h" #include "test_util/testutil.h" @@ -741,6 +742,55 @@ TEST_F(DBOptionsTest, SetStatsDumpPeriodSec) { Close(); } +TEST_F(DBOptionsTest, SetStatsDumpPeriodSecRace) { + // This is a mini-stress test looking for inconsistency between the reported + // state of the option and the behavior in effect for the DB, after the last + // modification to that option (indefinite inconsistency). + std::vector threads; + for (int i = 0; i < 12; i++) { + threads.emplace_back([this, i]() { + ASSERT_OK(dbfull()->SetDBOptions( + {{"stats_dump_period_sec", i % 2 ? "100" : "0"}})); + }); + } + + for (auto& t : threads) { + t.join(); + } + + bool stats_dump_set = dbfull()->GetDBOptions().stats_dump_period_sec > 0; + bool task_enabled = dbfull()->TEST_GetPeriodicTaskScheduler().TEST_HasTask( + PeriodicTaskType::kDumpStats); + + ASSERT_EQ(stats_dump_set, task_enabled); +} + +TEST_F(DBOptionsTest, SetOptionsAndFileRace) { + // This is a mini-stress test looking for inconsistency between the reported + // state of the option and what is persisted in the options file, after the + // last modification to that option (indefinite inconsistency). + std::vector threads; + for (int i = 0; i < 12; i++) { + threads.emplace_back([this, i]() { + ASSERT_OK(dbfull()->SetOptions({{"ttl", std::to_string(i * 100)}})); + }); + } + + for (auto& t : threads) { + t.join(); + } + + auto setting_in_mem = dbfull()->GetOptions().ttl; + + std::vector cf_descs; + DBOptions db_options; + ConfigOptions cfg; + cfg.env = env_; + ASSERT_OK(LoadLatestOptions(cfg, dbname_, &db_options, &cf_descs, nullptr)); + ASSERT_EQ(cf_descs.size(), 1); + ASSERT_EQ(setting_in_mem, cf_descs[0].options.ttl); +} + TEST_F(DBOptionsTest, SetOptionsStatsPersistPeriodSec) { Options options; options.create_if_missing = true; diff --git a/monitoring/instrumented_mutex.h b/monitoring/instrumented_mutex.h index b97d2502e4..33e2427593 100644 --- a/monitoring/instrumented_mutex.h +++ b/monitoring/instrumented_mutex.h @@ -46,7 +46,7 @@ class InstrumentedMutex { void Unlock() { mutex_.Unlock(); } - void AssertHeld() { mutex_.AssertHeld(); } + void AssertHeld() const { mutex_.AssertHeld(); } private: void LockInternal(); diff --git a/port/port_example.h b/port/port_example.h index 794149a690..2a19ffee05 100644 --- a/port/port_example.h +++ b/port/port_example.h @@ -43,7 +43,7 @@ class Mutex { // Optionally crash if this thread does not hold this mutex. // The implementation must be fast, especially if NDEBUG is // defined. The implementation is allowed to skip all checks. - void AssertHeld(); + void AssertHeld() const; }; class CondVar { diff --git a/port/port_posix.cc b/port/port_posix.cc index 3872293b81..749ad5d607 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -100,7 +100,7 @@ bool Mutex::TryLock() { return ret; } -void Mutex::AssertHeld() { +void Mutex::AssertHeld() const { #ifndef NDEBUG assert(locked_); #endif diff --git a/port/port_posix.h b/port/port_posix.h index e498186041..95641c0c54 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -109,9 +109,9 @@ class Mutex { bool TryLock(); - // this will assert if the mutex is not locked - // it does NOT verify that mutex is held by a calling thread - void AssertHeld(); + // This will fail assertion if the mutex is not locked. + // It does NOT verify that mutex is held by a calling thread. + void AssertHeld() const; // Also implement std Lockable inline void lock() { Lock(); } @@ -139,7 +139,7 @@ class RWMutex { void WriteLock(); void ReadUnlock(); void WriteUnlock(); - void AssertHeld() {} + void AssertHeld() const {} private: pthread_rwlock_t mu_; // the underlying platform mutex diff --git a/port/win/port_win.h b/port/win/port_win.h index 621f053703..4aa10d0052 100644 --- a/port/win/port_win.h +++ b/port/win/port_win.h @@ -117,7 +117,7 @@ class Mutex { // this will assert if the mutex is not locked // it does NOT verify that mutex is held by a calling thread - void AssertHeld() { + void AssertHeld() const { #ifndef NDEBUG assert(locked_); #endif @@ -159,7 +159,7 @@ class RWMutex { void WriteUnlock() { ReleaseSRWLockExclusive(&srwLock_); } // Empty as in POSIX - void AssertHeld() {} + void AssertHeld() const {} private: SRWLOCK srwLock_; diff --git a/unreleased_history/bug_fixes/options_race.md b/unreleased_history/bug_fixes/options_race.md new file mode 100644 index 0000000000..42b89b8eab --- /dev/null +++ b/unreleased_history/bug_fixes/options_race.md @@ -0,0 +1 @@ +* Fixed a rare race bug involving a concurrent combination of Create/DropColumnFamily and/or Set(DB)Options that could lead to inconsistency between (a) the DB's reported options state, (b) the DB options in effect, and (c) the latest persisted OPTIONS file. diff --git a/util/distributed_mutex.h b/util/distributed_mutex.h index 1734269cc9..e3450d753e 100644 --- a/util/distributed_mutex.h +++ b/util/distributed_mutex.h @@ -28,7 +28,7 @@ class DMutex : public folly::DistributedMutex { explicit DMutex(bool IGNORED_adaptive = false) { (void)IGNORED_adaptive; } // currently no-op - void AssertHeld() {} + void AssertHeld() const {} }; using DMutexLock = std::lock_guard;