Revert "Fix a race condition in persisting options"

This reverts commit 2fa3ed5180. It breaks RocksDB lite build
This commit is contained in:
sdong 2015-12-07 17:09:12 -08:00
parent 2fa3ed5180
commit f307036bde
5 changed files with 54 additions and 153 deletions

View file

@ -68,9 +68,7 @@ class ColumnFamilyTest : public testing::Test {
void Close() {
for (auto h : handles_) {
if (h) {
delete h;
}
delete h;
}
handles_.clear();
names_.clear();
@ -1262,81 +1260,6 @@ TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
namespace {
std::atomic<int> test_stage(0);
const int kMainThreadStartPersistingOptionsFile = 1;
const int kChildThreadFinishDroppingColumnFamily = 2;
const int kChildThreadWaitingMainThreadPersistOptions = 3;
void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id,
std::vector<Comparator*> comparators) {
while (test_stage < kMainThreadStartPersistingOptionsFile) {
Env::Default()->SleepForMicroseconds(100);
}
cf_test->DropColumnFamilies({cf_id});
delete comparators[cf_id];
comparators[cf_id] = nullptr;
test_stage = kChildThreadFinishDroppingColumnFamily;
}
} // namespace
TEST_F(ColumnFamilyTest, CreateAndDropRace) {
const int kCfCount = 5;
std::vector<ColumnFamilyOptions> cf_opts;
std::vector<Comparator*> comparators;
for (int i = 0; i < kCfCount; ++i) {
cf_opts.emplace_back();
comparators.push_back(new test::SimpleSuffixReverseComparator());
cf_opts.back().comparator = comparators.back();
}
db_options_.create_if_missing = true;
db_options_.create_missing_column_families = true;
auto main_thread_id = std::this_thread::get_id();
rocksdb::SyncPoint::GetInstance()->SetCallBack("PersistRocksDBOptions:start",
[&](void* arg) {
auto current_thread_id = std::this_thread::get_id();
// If it's the main thread hitting this sync-point, then it
// will be blocked until some other thread update the test_stage.
if (main_thread_id == current_thread_id) {
test_stage = kMainThreadStartPersistingOptionsFile;
while (test_stage < kChildThreadFinishDroppingColumnFamily) {
Env::Default()->SleepForMicroseconds(100);
}
}
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::EnterUnbatched:Wait", [&](void* arg) {
// This means a thread doing DropColumnFamily() is waiting for
// other thread to finish persisting options.
// In such case, we update the test_stage to unblock the main thread.
test_stage = kChildThreadWaitingMainThreadPersistOptions;
// Note that based on the test setting, this must not be the
// main thread.
ASSERT_NE(main_thread_id, std::this_thread::get_id());
});
// Create a database with four column families
Open({"default", "one", "two", "three"},
{cf_opts[0], cf_opts[1], cf_opts[2], cf_opts[3]});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Start a thread that will drop the first column family
// and its comparator
std::thread drop_cf_thread(DropSingleColumnFamily, this, 1, comparators);
DropColumnFamilies({2});
drop_cf_thread.join();
Close();
Destroy();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace rocksdb
int main(int argc, char** argv) {

View file

@ -1930,21 +1930,24 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
MutableCFOptions new_options;
Status s;
Status persist_options_status;
{
InstrumentedMutexLock l(&mutex_);
s = cfd->SetOptions(options_map);
if (s.ok()) {
new_options = *cfd->GetLatestMutableCFOptions();
}
if (s.ok()) {
// Persist RocksDB options under the single write thread
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
persist_options_status = WriteOptionsFile();
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
Status persist_options_status = WriteOptionsFile();
if (!persist_options_status.ok()) {
if (db_options_.fail_if_options_file_error) {
s = Status::IOError(
"SetOptions succeeded, but unable to persist options",
persist_options_status.ToString());
}
Warn(db_options_.info_log,
"Unable to persist options in SetOptions() -- %s",
persist_options_status.ToString().c_str());
}
}
@ -1960,16 +1963,6 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
db_options_.info_log, "[%s] SetOptions succeeded",
cfd->GetName().c_str());
new_options.Dump(db_options_.info_log.get());
if (!persist_options_status.ok()) {
if (db_options_.fail_if_options_file_error) {
s = Status::IOError(
"SetOptions succeeded, but unable to persist options",
persist_options_status.ToString());
}
Warn(db_options_.info_log,
"Unable to persist options in SetOptions() -- %s",
persist_options_status.ToString().c_str());
}
} else {
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"[%s] SetOptions failed", cfd->GetName().c_str());
@ -3453,7 +3446,6 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) {
Status s;
Status persist_options_status;
*handle = nullptr;
s = CheckCompressionSupported(cf_options);
@ -3486,12 +3478,6 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
s = versions_->LogAndApply(
nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), &edit,
&mutex_, directories_.GetDbDir(), false, &cf_options);
if (s.ok()) {
// If the column family was created successfully, we then persist
// the updated RocksDB options under the same single write thread
persist_options_status = WriteOptionsFile();
}
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
@ -3519,8 +3505,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
// this is outside the mutex
if (s.ok()) {
NewThreadStatusCfInfo(
reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
Status persist_options_status = WriteOptionsFile();
if (!persist_options_status.ok()) {
if (db_options_.fail_if_options_file_error) {
s = Status::IOError(
@ -3532,6 +3517,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
"Unable to persist options in CreateColumnFamily() -- %s",
persist_options_status.ToString().c_str());
}
NewThreadStatusCfInfo(
reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
}
return s;
}
@ -3550,7 +3537,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
edit.SetColumnFamily(cfd->GetID());
Status s;
Status options_persist_status;
{
InstrumentedMutexLock l(&mutex_);
if (cfd->IsDropped()) {
@ -3562,11 +3548,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
write_thread_.EnterUnbatched(&w, &mutex_);
s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_);
if (s.ok()) {
// If the column family was dropped successfully, we then persist
// the updated RocksDB options under the same single write thread
options_persist_status = WriteOptionsFile();
}
write_thread_.ExitUnbatched(&w);
}
@ -3593,9 +3574,7 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
mutable_cf_options->max_write_buffer_number;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Dropped column family with id %u\n", cfd->GetID());
auto options_persist_status = WriteOptionsFile();
if (!options_persist_status.ok()) {
if (db_options_.fail_if_options_file_error) {
s = Status::IOError(
@ -3607,6 +3586,9 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
"Unable to persist options in DropColumnFamily() -- %s",
options_persist_status.ToString().c_str());
}
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Dropped column family with id %u\n",
cfd->GetID());
} else {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Dropping column family with id %u FAILED -- %s\n",
@ -5042,12 +5024,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
}
TEST_SYNC_POINT("DBImpl::Open:Opened");
Status persist_options_status;
if (s.ok()) {
// Persist RocksDB Options before scheduling the compaction.
// The WriteOptionsFile() will release and lock the mutex internally.
persist_options_status = impl->WriteOptionsFile();
*dbptr = impl;
impl->opened_successfully_ = true;
impl->MaybeScheduleFlushOrCompaction();
@ -5058,6 +5035,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
Log(InfoLogLevel::INFO_LEVEL, impl->db_options_.info_log, "DB pointer %p",
impl);
LogFlush(impl->db_options_.info_log);
auto persist_options_status = impl->WriteOptionsFile();
if (!persist_options_status.ok()) {
if (db_options.fail_if_options_file_error) {
s = Status::IOError(
@ -5186,34 +5165,38 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
Status DBImpl::WriteOptionsFile() {
#ifndef ROCKSDB_LITE
mutex_.AssertHeld();
std::string file_name;
Status s = WriteOptionsToTempFile(&file_name);
if (!s.ok()) {
return s;
}
s = RenameTempFileToOptionsFile(file_name);
return s;
#else
return Status::OK();
#endif // !ROCKSDB_LITE
}
Status DBImpl::WriteOptionsToTempFile(std::string* file_name) {
#ifndef ROCKSDB_LITE
std::vector<std::string> cf_names;
std::vector<ColumnFamilyOptions> cf_opts;
// This part requires mutex to protect the column family options
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
{
InstrumentedMutexLock l(&mutex_);
// This part requires mutex to protect the column family options
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cf_names.push_back(cfd->GetName());
cf_opts.push_back(BuildColumnFamilyOptions(
*cfd->options(), *cfd->GetLatestMutableCFOptions()));
}
cf_names.push_back(cfd->GetName());
cf_opts.push_back(BuildColumnFamilyOptions(
*cfd->options(), *cfd->GetLatestMutableCFOptions()));
}
*file_name = TempOptionsFileName(GetName(), versions_->NewFileNumber());
// Unlock during expensive operations. New writes cannot get here
// because the single write thread ensures all new writes get queued.
mutex_.Unlock();
std::string file_name =
TempOptionsFileName(GetName(), versions_->NewFileNumber());
Status s = PersistRocksDBOptions(GetDBOptions(), cf_names, cf_opts, file_name,
GetEnv());
if (s.ok()) {
s = RenameTempFileToOptionsFile(file_name);
}
mutex_.Lock();
Status s = PersistRocksDBOptions(GetDBOptions(), cf_names, cf_opts,
*file_name, GetEnv());
return s;
#else
return Status::OK();
@ -5241,6 +5224,8 @@ void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
Status DBImpl::DeleteObsoleteOptionsFiles() {
#ifndef ROCKSDB_LITE
options_files_mutex_.AssertHeld();
std::vector<std::string> filenames;
// use ordered map to store keep the filenames sorted from the newest
// to the oldest.
@ -5272,6 +5257,7 @@ Status DBImpl::DeleteObsoleteOptionsFiles() {
Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
#ifndef ROCKSDB_LITE
InstrumentedMutexLock l(&options_files_mutex_);
Status s;
std::string options_file_name =
OptionsFileName(GetName(), versions_->NewFileNumber());

View file

@ -405,14 +405,10 @@ class DBImpl : public DB {
SuperVersion* super_version,
Arena* arena);
// Except in DB::Open(), WriteOptionsFile can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used.
// 2. db_mutex is held
// The following options file related functions should not be
// called while DB mutex is held.
Status WriteOptionsFile();
// The following two functions can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used.
// 2. db_mutex is NOT held
Status WriteOptionsToTempFile(std::string* file_name);
Status RenameTempFileToOptionsFile(const std::string& file_name);
Status DeleteObsoleteOptionsFiles();

View file

@ -4,7 +4,6 @@
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/write_thread.h"
#include "util/sync_point.h"
namespace rocksdb {
@ -189,7 +188,6 @@ void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
LinkOne(w, &wait_needed);
if (wait_needed) {
mu->Unlock();
TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
Await(w);
mu->Lock();
}

View file

@ -17,7 +17,6 @@
#include "rocksdb/db.h"
#include "util/options_helper.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "port/port.h"
@ -35,7 +34,6 @@ Status PersistRocksDBOptions(const DBOptions& db_opt,
const std::vector<std::string>& cf_names,
const std::vector<ColumnFamilyOptions>& cf_opts,
const std::string& file_name, Env* env) {
TEST_SYNC_POINT("PersistRocksDBOptions:start");
if (cf_names.size() != cf_opts.size()) {
return Status::InvalidArgument(
"cf_names.size() and cf_opts.size() must be the same");