mirror of https://github.com/facebook/rocksdb.git
Fix race in options taking effect (#11929)
Summary: In follow-up to https://github.com/facebook/rocksdb/issues/11922, fix a race in functions like CreateColumnFamily and SetDBOptions where the DB reports one option setting but a different one is left in effect. To fix, we can add an extra mutex around these rare operations. We don't want to hold the DB mutex during I/O or other slow things because of the many purposes it serves, but a mutex more limited to these cases should be fine. I believe this would fix a write-write race in https://github.com/facebook/rocksdb/issues/10079 but not the read-write race. Intended follow-up to this: * Should be able to remove write thread synchronization from DBImpl::WriteOptionsFile Pull Request resolved: https://github.com/facebook/rocksdb/pull/11929 Test Plan: Added two mini-stress style regression tests that fail with >1% probability before this change: DBOptionsTest::SetStatsDumpPeriodSecRace ColumnFamilyTest::CreateAndDropPeriodicRace I haven't reproduced such an inconsistency between in-memory options and on disk latest options, but this change at least improves safety and adds a test anyway: DBOptionsTest::SetStatsDumpPeriodSecRace Reviewed By: ajkr Differential Revision: D50024506 Pulled By: pdillinger fbshipit-source-id: 1e99a9ed4d96fdcf3ac5061ec6b3cee78aecdda4
This commit is contained in:
parent
b2fe14817e
commit
d010b02e86
|
@ -2473,7 +2473,10 @@ void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id,
|
|||
}
|
||||
} // anonymous namespace
|
||||
|
||||
TEST_P(ColumnFamilyTest, CreateAndDropRace) {
|
||||
// This test attempts to set up a race condition in a way that is no longer
|
||||
// possible, causing the test to hang. If DBImpl::options_mutex_ is removed
|
||||
// in the future, this test might become relevant again.
|
||||
TEST_P(ColumnFamilyTest, DISABLED_CreateAndDropRace) {
|
||||
const int kCfCount = 5;
|
||||
std::vector<ColumnFamilyOptions> cf_opts;
|
||||
std::vector<Comparator*> comparators;
|
||||
|
@ -2535,6 +2538,53 @@ TEST_P(ColumnFamilyTest, CreateAndDropRace) {
|
|||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
TEST_P(ColumnFamilyTest, CreateAndDropPeriodicRace) {
|
||||
// This is a mini-stress test looking for inconsistency between the set of
|
||||
// CFs in the DB, particularly whether any use preserve_internal_time_seconds,
|
||||
// and whether that is accurately reflected in the periodic task setup.
|
||||
constexpr size_t kNumThreads = 12;
|
||||
std::vector<std::thread> threads;
|
||||
bool last_cf_on = Random::GetTLSInstance()->OneIn(2);
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::RegisterRecordSeqnoTimeWorker:BeforePeriodicTaskType",
|
||||
[&](void* /*arg*/) { std::this_thread::yield(); });
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_EQ(column_family_options_.preserve_internal_time_seconds, 0U);
|
||||
ColumnFamilyOptions other_opts = column_family_options_;
|
||||
ColumnFamilyOptions last_opts = column_family_options_;
|
||||
(last_cf_on ? last_opts : other_opts).preserve_internal_time_seconds =
|
||||
1000000;
|
||||
Open();
|
||||
|
||||
for (size_t i = 0; i < kNumThreads; i++) {
|
||||
threads.emplace_back([this, &other_opts, i]() {
|
||||
ColumnFamilyHandle* cfh;
|
||||
ASSERT_OK(db_->CreateColumnFamily(other_opts, std::to_string(i), &cfh));
|
||||
ASSERT_OK(db_->DropColumnFamily(cfh));
|
||||
ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
|
||||
});
|
||||
}
|
||||
|
||||
ColumnFamilyHandle* last_cfh;
|
||||
ASSERT_OK(db_->CreateColumnFamily(last_opts, "last", &last_cfh));
|
||||
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
bool task_enabled = dbfull()->TEST_GetPeriodicTaskScheduler().TEST_HasTask(
|
||||
PeriodicTaskType::kRecordSeqnoTime);
|
||||
ASSERT_EQ(last_cf_on, task_enabled);
|
||||
|
||||
ASSERT_OK(db_->DropColumnFamily(last_cfh));
|
||||
ASSERT_OK(db_->DestroyColumnFamilyHandle(last_cfh));
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) {
|
||||
const uint64_t kBaseRate = 800000u;
|
||||
db_options_.delayed_write_rate = kBaseRate;
|
||||
|
|
|
@ -817,6 +817,10 @@ Status DBImpl::StartPeriodicTaskScheduler() {
|
|||
}
|
||||
|
||||
Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
|
||||
if (!from_db_open) {
|
||||
options_mutex_.AssertHeld();
|
||||
}
|
||||
|
||||
uint64_t min_preserve_seconds = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t max_preserve_seconds = std::numeric_limits<uint64_t>::min();
|
||||
bool mapping_was_empty = false;
|
||||
|
@ -840,11 +844,6 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
|
|||
}
|
||||
mapping_was_empty = seqno_to_time_mapping_.Empty();
|
||||
}
|
||||
// FIXME: because we released the db mutex, there's a race here where
|
||||
// if e.g. I create or drop two column families in parallel, I might end up
|
||||
// with the periodic task scheduler in the wrong state. We don't want to
|
||||
// just keep holding the mutex, however, because of global timer and mutex
|
||||
// in PeriodicTaskScheduler.
|
||||
|
||||
uint64_t seqno_time_cadence = 0;
|
||||
if (min_preserve_seconds != std::numeric_limits<uint64_t>::max()) {
|
||||
|
@ -855,6 +854,9 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
|
|||
SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF;
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"DBImpl::RegisterRecordSeqnoTimeWorker:BeforePeriodicTaskType", nullptr);
|
||||
|
||||
Status s;
|
||||
if (seqno_time_cadence == 0) {
|
||||
s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kRecordSeqnoTime);
|
||||
|
@ -918,6 +920,7 @@ Status DBImpl::CancelPeriodicTaskScheduler() {
|
|||
|
||||
// esitmate the total size of stats_history_
|
||||
size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
|
||||
stats_history_mutex_.AssertHeld();
|
||||
size_t size_total =
|
||||
sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
|
||||
if (stats_history_.size() == 0) return size_total;
|
||||
|
@ -1208,6 +1211,7 @@ Status DBImpl::SetOptions(
|
|||
return Status::InvalidArgument("empty input");
|
||||
}
|
||||
|
||||
InstrumentedMutexLock ol(&options_mutex_);
|
||||
MutableCFOptions new_options;
|
||||
Status s;
|
||||
Status persist_options_status;
|
||||
|
@ -1266,6 +1270,7 @@ Status DBImpl::SetDBOptions(
|
|||
return Status::InvalidArgument("empty input");
|
||||
}
|
||||
|
||||
InstrumentedMutexLock ol(&options_mutex_);
|
||||
MutableDBOptions new_options;
|
||||
Status s;
|
||||
Status persist_options_status = Status::OK();
|
||||
|
@ -3362,6 +3367,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
|
|||
const std::string& column_family,
|
||||
ColumnFamilyHandle** handle) {
|
||||
assert(handle != nullptr);
|
||||
InstrumentedMutexLock ol(&options_mutex_);
|
||||
Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
|
||||
if (s.ok()) {
|
||||
s.UpdateIfOk(WrapUpCreateColumnFamilies({&cf_options}));
|
||||
|
@ -3374,6 +3380,7 @@ Status DBImpl::CreateColumnFamilies(
|
|||
const std::vector<std::string>& column_family_names,
|
||||
std::vector<ColumnFamilyHandle*>* handles) {
|
||||
assert(handles != nullptr);
|
||||
InstrumentedMutexLock ol(&options_mutex_);
|
||||
handles->clear();
|
||||
size_t num_cf = column_family_names.size();
|
||||
Status s;
|
||||
|
@ -3397,6 +3404,7 @@ Status DBImpl::CreateColumnFamilies(
|
|||
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||
std::vector<ColumnFamilyHandle*>* handles) {
|
||||
assert(handles != nullptr);
|
||||
InstrumentedMutexLock ol(&options_mutex_);
|
||||
handles->clear();
|
||||
size_t num_cf = column_families.size();
|
||||
Status s;
|
||||
|
@ -3423,6 +3431,7 @@ Status DBImpl::CreateColumnFamilies(
|
|||
Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
|
||||
const std::string& column_family_name,
|
||||
ColumnFamilyHandle** handle) {
|
||||
options_mutex_.AssertHeld();
|
||||
// TODO: plumb Env::IOActivity
|
||||
const ReadOptions read_options;
|
||||
Status s;
|
||||
|
@ -3514,6 +3523,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
|
|||
|
||||
Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
||||
assert(column_family != nullptr);
|
||||
InstrumentedMutexLock ol(&options_mutex_);
|
||||
Status s = DropColumnFamilyImpl(column_family);
|
||||
if (s.ok()) {
|
||||
s = WriteOptionsFile(true /*need_mutex_lock*/,
|
||||
|
@ -3524,6 +3534,7 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
|||
|
||||
Status DBImpl::DropColumnFamilies(
|
||||
const std::vector<ColumnFamilyHandle*>& column_families) {
|
||||
InstrumentedMutexLock ol(&options_mutex_);
|
||||
Status s;
|
||||
bool success_once = false;
|
||||
for (auto* handle : column_families) {
|
||||
|
@ -5164,6 +5175,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,
|
|||
|
||||
Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
|
||||
bool need_enter_write_thread) {
|
||||
options_mutex_.AssertHeld();
|
||||
|
||||
WriteThread::Writer w;
|
||||
if (need_mutex_lock) {
|
||||
mutex_.Lock();
|
||||
|
|
|
@ -2379,9 +2379,19 @@ class DBImpl : public DB {
|
|||
// Lock over the persistent DB state. Non-nullptr iff successfully acquired.
|
||||
FileLock* db_lock_;
|
||||
|
||||
// In addition to mutex_, log_write_mutex_ protected writes to stats_history_
|
||||
// Guards changes to DB and CF options to ensure consistency between
|
||||
// * In-memory options objects
|
||||
// * Settings in effect
|
||||
// * Options file contents
|
||||
// while allowing the DB mutex to be released during slow operations like
|
||||
// persisting options file or modifying global periodic task timer.
|
||||
// Always acquired *before* DB mutex when this one is applicable.
|
||||
InstrumentedMutex options_mutex_;
|
||||
|
||||
// Guards reads and writes to in-memory stats_history_.
|
||||
InstrumentedMutex stats_history_mutex_;
|
||||
// In addition to mutex_, log_write_mutex_ protected writes to logs_ and
|
||||
|
||||
// In addition to mutex_, log_write_mutex_ protects writes to logs_ and
|
||||
// logfile_number_. With two_write_queues it also protects alive_log_files_,
|
||||
// and log_empty_. Refer to the definition of each variable below for more
|
||||
// details.
|
||||
|
|
|
@ -311,6 +311,7 @@ SeqnoToTimeMapping DBImpl::TEST_GetSeqnoToTimeMapping() const {
|
|||
|
||||
|
||||
size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const {
|
||||
InstrumentedMutexLock l(&const_cast<DBImpl*>(this)->stats_history_mutex_);
|
||||
return EstimateInMemoryStatsHistorySize();
|
||||
}
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
|
|
@ -871,7 +871,8 @@ Status DBImpl::PersistentStatsProcessFormatVersion() {
|
|||
if (s.ok()) {
|
||||
ColumnFamilyOptions cfo;
|
||||
OptimizeForPersistentStats(&cfo);
|
||||
s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
|
||||
s = CreateColumnFamilyImpl(cfo, kPersistentStatsColumnFamilyName,
|
||||
&handle);
|
||||
}
|
||||
if (s.ok()) {
|
||||
persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
|
||||
|
@ -924,7 +925,7 @@ Status DBImpl::InitPersistStatsColumnFamily() {
|
|||
ColumnFamilyHandle* handle = nullptr;
|
||||
ColumnFamilyOptions cfo;
|
||||
OptimizeForPersistentStats(&cfo);
|
||||
s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
|
||||
s = CreateColumnFamilyImpl(cfo, kPersistentStatsColumnFamilyName, &handle);
|
||||
persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
|
||||
mutex_.Lock();
|
||||
}
|
||||
|
@ -1988,6 +1989,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
|||
|
||||
impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
|
||||
RecoveryContext recovery_ctx;
|
||||
impl->options_mutex_.Lock();
|
||||
impl->mutex_.Lock();
|
||||
|
||||
// Handles create_if_missing, error_if_exists
|
||||
|
@ -2124,7 +2126,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
|||
// The WriteOptionsFile() will release and lock the mutex internally.
|
||||
persist_options_status = impl->WriteOptionsFile(
|
||||
false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
|
||||
|
||||
*dbptr = impl;
|
||||
impl->opened_successfully_ = true;
|
||||
impl->DeleteObsoleteFiles();
|
||||
|
@ -2245,10 +2246,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
|||
if (s.ok()) {
|
||||
s = impl->StartPeriodicTaskScheduler();
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
s = impl->RegisterRecordSeqnoTimeWorker(/*from_db_open=*/true);
|
||||
}
|
||||
impl->options_mutex_.Unlock();
|
||||
if (!s.ok()) {
|
||||
for (auto* h : *handles) {
|
||||
delete h;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "rocksdb/convenience.h"
|
||||
#include "rocksdb/rate_limiter.h"
|
||||
#include "rocksdb/stats_history.h"
|
||||
#include "rocksdb/utilities/options_util.h"
|
||||
#include "test_util/mock_time_env.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "test_util/testutil.h"
|
||||
|
@ -741,6 +742,55 @@ TEST_F(DBOptionsTest, SetStatsDumpPeriodSec) {
|
|||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBOptionsTest, SetStatsDumpPeriodSecRace) {
|
||||
// This is a mini-stress test looking for inconsistency between the reported
|
||||
// state of the option and the behavior in effect for the DB, after the last
|
||||
// modification to that option (indefinite inconsistency).
|
||||
std::vector<std::thread> threads;
|
||||
for (int i = 0; i < 12; i++) {
|
||||
threads.emplace_back([this, i]() {
|
||||
ASSERT_OK(dbfull()->SetDBOptions(
|
||||
{{"stats_dump_period_sec", i % 2 ? "100" : "0"}}));
|
||||
});
|
||||
}
|
||||
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
bool stats_dump_set = dbfull()->GetDBOptions().stats_dump_period_sec > 0;
|
||||
bool task_enabled = dbfull()->TEST_GetPeriodicTaskScheduler().TEST_HasTask(
|
||||
PeriodicTaskType::kDumpStats);
|
||||
|
||||
ASSERT_EQ(stats_dump_set, task_enabled);
|
||||
}
|
||||
|
||||
TEST_F(DBOptionsTest, SetOptionsAndFileRace) {
|
||||
// This is a mini-stress test looking for inconsistency between the reported
|
||||
// state of the option and what is persisted in the options file, after the
|
||||
// last modification to that option (indefinite inconsistency).
|
||||
std::vector<std::thread> threads;
|
||||
for (int i = 0; i < 12; i++) {
|
||||
threads.emplace_back([this, i]() {
|
||||
ASSERT_OK(dbfull()->SetOptions({{"ttl", std::to_string(i * 100)}}));
|
||||
});
|
||||
}
|
||||
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
auto setting_in_mem = dbfull()->GetOptions().ttl;
|
||||
|
||||
std::vector<ColumnFamilyDescriptor> cf_descs;
|
||||
DBOptions db_options;
|
||||
ConfigOptions cfg;
|
||||
cfg.env = env_;
|
||||
ASSERT_OK(LoadLatestOptions(cfg, dbname_, &db_options, &cf_descs, nullptr));
|
||||
ASSERT_EQ(cf_descs.size(), 1);
|
||||
ASSERT_EQ(setting_in_mem, cf_descs[0].options.ttl);
|
||||
}
|
||||
|
||||
TEST_F(DBOptionsTest, SetOptionsStatsPersistPeriodSec) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
|
|
|
@ -46,7 +46,7 @@ class InstrumentedMutex {
|
|||
|
||||
void Unlock() { mutex_.Unlock(); }
|
||||
|
||||
void AssertHeld() { mutex_.AssertHeld(); }
|
||||
void AssertHeld() const { mutex_.AssertHeld(); }
|
||||
|
||||
private:
|
||||
void LockInternal();
|
||||
|
|
|
@ -43,7 +43,7 @@ class Mutex {
|
|||
// Optionally crash if this thread does not hold this mutex.
|
||||
// The implementation must be fast, especially if NDEBUG is
|
||||
// defined. The implementation is allowed to skip all checks.
|
||||
void AssertHeld();
|
||||
void AssertHeld() const;
|
||||
};
|
||||
|
||||
class CondVar {
|
||||
|
|
|
@ -100,7 +100,7 @@ bool Mutex::TryLock() {
|
|||
return ret;
|
||||
}
|
||||
|
||||
void Mutex::AssertHeld() {
|
||||
void Mutex::AssertHeld() const {
|
||||
#ifndef NDEBUG
|
||||
assert(locked_);
|
||||
#endif
|
||||
|
|
|
@ -109,9 +109,9 @@ class Mutex {
|
|||
|
||||
bool TryLock();
|
||||
|
||||
// this will assert if the mutex is not locked
|
||||
// it does NOT verify that mutex is held by a calling thread
|
||||
void AssertHeld();
|
||||
// This will fail assertion if the mutex is not locked.
|
||||
// It does NOT verify that mutex is held by a calling thread.
|
||||
void AssertHeld() const;
|
||||
|
||||
// Also implement std Lockable
|
||||
inline void lock() { Lock(); }
|
||||
|
@ -139,7 +139,7 @@ class RWMutex {
|
|||
void WriteLock();
|
||||
void ReadUnlock();
|
||||
void WriteUnlock();
|
||||
void AssertHeld() {}
|
||||
void AssertHeld() const {}
|
||||
|
||||
private:
|
||||
pthread_rwlock_t mu_; // the underlying platform mutex
|
||||
|
|
|
@ -117,7 +117,7 @@ class Mutex {
|
|||
|
||||
// this will assert if the mutex is not locked
|
||||
// it does NOT verify that mutex is held by a calling thread
|
||||
void AssertHeld() {
|
||||
void AssertHeld() const {
|
||||
#ifndef NDEBUG
|
||||
assert(locked_);
|
||||
#endif
|
||||
|
@ -159,7 +159,7 @@ class RWMutex {
|
|||
void WriteUnlock() { ReleaseSRWLockExclusive(&srwLock_); }
|
||||
|
||||
// Empty as in POSIX
|
||||
void AssertHeld() {}
|
||||
void AssertHeld() const {}
|
||||
|
||||
private:
|
||||
SRWLOCK srwLock_;
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
* Fixed a rare race bug involving a concurrent combination of Create/DropColumnFamily and/or Set(DB)Options that could lead to inconsistency between (a) the DB's reported options state, (b) the DB options in effect, and (c) the latest persisted OPTIONS file.
|
|
@ -28,7 +28,7 @@ class DMutex : public folly::DistributedMutex {
|
|||
explicit DMutex(bool IGNORED_adaptive = false) { (void)IGNORED_adaptive; }
|
||||
|
||||
// currently no-op
|
||||
void AssertHeld() {}
|
||||
void AssertHeld() const {}
|
||||
};
|
||||
using DMutexLock = std::lock_guard<folly::DistributedMutex>;
|
||||
|
||||
|
|
Loading…
Reference in New Issue