Provide a mechanism to inform Rocksdb that it is shutting down

Summary:
Provide an API which enables users to infor Rocksdb that it is
shutting down.

Test Plan: db_test

Reviewers: sdong, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D34617
This commit is contained in:
Venkatesh Radhakrishnan 2015-03-11 10:31:02 -07:00
parent 2ddf53b2ca
commit 284be570c8
6 changed files with 317 additions and 52 deletions

View File

@ -536,6 +536,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
compaction_filter = compaction_filter_from_factory.get();
}
TEST_SYNC_POINT("CompactionJob::Run:Inprogress");
int64_t key_drop_user = 0;
int64_t key_drop_newer_entry = 0;
int64_t key_drop_obsolete = 0;

View File

@ -257,6 +257,18 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
LogFlush(db_options_.info_log);
}
void DBImpl::CancelAllBackgroundWork(bool wait) {
shutting_down_.store(true, std::memory_order_release);
if (!wait) {
return;
}
// Wait for background work to finish
while (bg_compaction_scheduled_ || bg_flush_scheduled_ || notifying_events_) {
bg_cv_.Wait();
}
}
DBImpl::~DBImpl() {
EraseThreadStatusDbInfo();
mutex_.Lock();
@ -273,12 +285,7 @@ DBImpl::~DBImpl() {
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
}
// Wait for background work to finish
shutting_down_.store(true, std::memory_order_release);
while (bg_compaction_scheduled_ || bg_flush_scheduled_ || notifying_events_) {
bg_cv_.Wait();
}
CancelAllBackgroundWork(true);
listeners_.clear();
flush_scheduler_.Clear();
@ -1871,8 +1878,13 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer) {
mutex_.AssertHeld();
if (!bg_error_.ok()) {
return bg_error_;
Status status = bg_error_;
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
}
if (!status.ok()) {
return status;
}
ColumnFamilyData* cfd = nullptr;
@ -1893,7 +1905,6 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context,
break;
}
Status status;
if (cfd != nullptr) {
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
@ -1926,26 +1937,24 @@ void DBImpl::BackgroundCallFlush() {
CaptureCurrentFileNumberInPendingOutputs();
Status s;
if (!shutting_down_.load(std::memory_order_acquire)) {
s = BackgroundFlush(&madeProgress, &job_context, &log_buffer);
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
uint64_t error_cnt =
default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Waiting after background flush error: %s"
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
log_buffer.FlushBufferToLog();
LogFlush(db_options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
s = BackgroundFlush(&madeProgress, &job_context, &log_buffer);
if (!s.ok() && !s.IsShutdownInProgress()) {
// Wait a little bit before retrying background flush in
// case this is an environmental problem and we do not want to
// chew up resources for failed flushes for the duration of
// the problem.
uint64_t error_cnt =
default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Waiting after background flush error: %s"
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
log_buffer.FlushBufferToLog();
LogFlush(db_options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
@ -1995,26 +2004,24 @@ void DBImpl::BackgroundCallCompaction() {
assert(bg_compaction_scheduled_);
Status s;
if (!shutting_down_.load(std::memory_order_acquire)) {
s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer);
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
uint64_t error_cnt =
s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer);
if (!s.ok() && !s.IsShutdownInProgress()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
uint64_t error_cnt =
default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
log_buffer.FlushBufferToLog();
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Waiting after background compaction error: %s, "
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
LogFlush(db_options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
log_buffer.FlushBufferToLog();
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Waiting after background compaction error: %s, "
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
LogFlush(db_options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
@ -2071,14 +2078,19 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
bool is_manual = (manual_compaction_ != nullptr) &&
(manual_compaction_->in_progress == false);
if (!bg_error_.ok()) {
Status status = bg_error_;
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
}
if (!status.ok()) {
if (is_manual) {
manual_compaction_->status = bg_error_;
manual_compaction_->status = status;
manual_compaction_->done = true;
manual_compaction_->in_progress = false;
manual_compaction_ = nullptr;
}
return bg_error_;
return status;
}
if (is_manual) {
@ -2189,7 +2201,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
}
}
Status status;
if (!c) {
// Nothing to do
LogToBuffer(log_buffer, "Compaction nothing to do");

View File

@ -265,6 +265,8 @@ class DBImpl : public DB {
const SnapshotList& snapshots() const { return snapshots_; }
void CancelAllBackgroundWork(bool wait = false);
protected:
Env* const env_;
const std::string dbname_;

View File

@ -37,6 +37,7 @@
#include "rocksdb/thread_status.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/convenience.h"
#include "table/block_based_table_factory.h"
#include "table/plain_table_factory.h"
#include "util/hash.h"
@ -10181,6 +10182,231 @@ TEST(DBTest, ThreadStatusSingleCompaction) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST(DBTest, PreShutdownManualCompaction) {
Options options = CurrentOptions();
options.max_background_flushes = 0;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(dbfull()->MaxMemCompactionLevel(), 2)
<< "Need to update this test to match kMaxMemCompactLevel";
// iter - 0 with 7 levels
// iter - 1 with 3 levels
for (int iter = 0; iter < 2; ++iter) {
MakeTables(3, "p", "q", 1);
ASSERT_EQ("1,1,1", FilesPerLevel(1));
// Compaction range falls before files
Compact(1, "", "c");
ASSERT_EQ("1,1,1", FilesPerLevel(1));
// Compaction range falls after files
Compact(1, "r", "z");
ASSERT_EQ("1,1,1", FilesPerLevel(1));
// Compaction range overlaps files
Compact(1, "p1", "p9");
ASSERT_EQ("0,0,1", FilesPerLevel(1));
// Populate a different range
MakeTables(3, "c", "e", 1);
ASSERT_EQ("1,1,2", FilesPerLevel(1));
// Compact just the new range
Compact(1, "b", "f");
ASSERT_EQ("0,0,2", FilesPerLevel(1));
// Compact all
MakeTables(1, "a", "z", 1);
ASSERT_EQ("0,1,2", FilesPerLevel(1));
CancelAllBackgroundWork(db_);
db_->CompactRange(handles_[1], nullptr, nullptr);
ASSERT_EQ("0,1,2", FilesPerLevel(1));
if (iter == 0) {
options = CurrentOptions();
options.max_background_flushes = 0;
options.num_levels = 3;
options.create_if_missing = true;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
}
}
}
TEST(DBTest, PreShutdownMultipleCompaction) {
const int kTestKeySize = 16;
const int kTestValueSize = 984;
const int kEntrySize = kTestKeySize + kTestValueSize;
const int kEntriesPerBuffer = 10;
const int kNumL0Files = 4;
const int kHighPriCount = 3;
const int kLowPriCount = 5;
env_->SetBackgroundThreads(kHighPriCount, Env::HIGH);
env_->SetBackgroundThreads(kLowPriCount, Env::LOW);
Options options;
options.create_if_missing = true;
options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
options.compaction_style = kCompactionStyleLevel;
options.target_file_size_base = options.write_buffer_size;
options.max_bytes_for_level_base =
options.target_file_size_base * kNumL0Files;
options.compression = kNoCompression;
options = CurrentOptions(options);
options.env = env_;
options.enable_thread_tracking = true;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.max_bytes_for_level_multiplier = 2;
options.max_background_compactions = kLowPriCount;
TryReopen(options);
Random rnd(301);
std::vector<ThreadStatus> thread_list;
// Delay both flush and compaction
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"FlushJob::Run:Start", "CompactionJob::Run:Start"},
{"CompactionJob::Run:Start",
"DBTest::PreShutdownMultipleCompaction:Preshutdown"},
{"DBTest::PreShutdownMultipleCompaction:Preshutdown",
"CompactionJob::Run:End"},
{"CompactionJob::Run:End",
"DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Make rocksdb busy
int key = 0;
int max_operation_count[ThreadStatus::NUM_OP_TYPES] = {0};
// check how many threads are doing compaction using GetThreadList
int operation_count[ThreadStatus::NUM_OP_TYPES] = {0};
for (int file = 0; file < 64 * kNumL0Files; ++file) {
for (int k = 0; k < kEntriesPerBuffer; ++k) {
ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize)));
}
Status s = env_->GetThreadList(&thread_list);
for (auto thread : thread_list) {
operation_count[thread.operation_type]++;
}
// Record the max number of compactions at a time.
for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) {
if (max_operation_count[i] < operation_count[i]) {
max_operation_count[i] = operation_count[i];
}
}
// Speed up the test
if (max_operation_count[ThreadStatus::OP_FLUSH] > 1 &&
max_operation_count[ThreadStatus::OP_COMPACTION] >
0.6 * options.max_background_compactions) {
break;
}
}
TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
ASSERT_GE(max_operation_count[ThreadStatus::OP_COMPACTION], 1);
CancelAllBackgroundWork(db_);
TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown");
dbfull()->TEST_WaitForCompact();
// Record the number of compactions at a time.
for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) {
operation_count[i] = 0;
}
Status s = env_->GetThreadList(&thread_list);
for (auto thread : thread_list) {
operation_count[thread.operation_type]++;
}
ASSERT_EQ(operation_count[ThreadStatus::OP_COMPACTION], 0);
}
TEST(DBTest, PreShutdownCompactionMiddle) {
const int kTestKeySize = 16;
const int kTestValueSize = 984;
const int kEntrySize = kTestKeySize + kTestValueSize;
const int kEntriesPerBuffer = 10;
const int kNumL0Files = 4;
const int kHighPriCount = 3;
const int kLowPriCount = 5;
env_->SetBackgroundThreads(kHighPriCount, Env::HIGH);
env_->SetBackgroundThreads(kLowPriCount, Env::LOW);
Options options;
options.create_if_missing = true;
options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
options.compaction_style = kCompactionStyleLevel;
options.target_file_size_base = options.write_buffer_size;
options.max_bytes_for_level_base =
options.target_file_size_base * kNumL0Files;
options.compression = kNoCompression;
options = CurrentOptions(options);
options.env = env_;
options.enable_thread_tracking = true;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.max_bytes_for_level_multiplier = 2;
options.max_background_compactions = kLowPriCount;
TryReopen(options);
Random rnd(301);
std::vector<ThreadStatus> thread_list;
// Delay both flush and compaction
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBTest::PreShutdownMultipleCompaction:Preshutdown",
"CompactionJob::Run:Inprogress"},
{"CompactionJob::Run:Inprogress", "CompactionJob::Run:End"},
{"CompactionJob::Run:End",
"DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Make rocksdb busy
int key = 0;
int max_operation_count[ThreadStatus::NUM_OP_TYPES] = {0};
// check how many threads are doing compaction using GetThreadList
int operation_count[ThreadStatus::NUM_OP_TYPES] = {0};
for (int file = 0; file < 64 * kNumL0Files; ++file) {
for (int k = 0; k < kEntriesPerBuffer; ++k) {
ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize)));
}
Status s = env_->GetThreadList(&thread_list);
for (auto thread : thread_list) {
operation_count[thread.operation_type]++;
}
// Record the max number of compactions at a time.
for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) {
if (max_operation_count[i] < operation_count[i]) {
max_operation_count[i] = operation_count[i];
}
}
// Speed up the test
if (max_operation_count[ThreadStatus::OP_FLUSH] > 1 &&
max_operation_count[ThreadStatus::OP_COMPACTION] >
0.6 * options.max_background_compactions) {
break;
}
}
ASSERT_GE(max_operation_count[ThreadStatus::OP_COMPACTION], 1);
CancelAllBackgroundWork(db_);
TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown");
TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown");
dbfull()->TEST_WaitForCompact();
// Record the number of compactions at a time.
for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) {
operation_count[i] = 0;
}
Status s = env_->GetThreadList(&thread_list);
for (auto thread : thread_list) {
operation_count[thread.operation_type]++;
}
ASSERT_EQ(operation_count[ThreadStatus::OP_COMPACTION], 0);
}
#endif // ROCKSDB_USING_THREAD_STATUS
TEST(DBTest, DynamicLevelMaxBytesBase) {

View File

@ -56,6 +56,7 @@ Status GetBlockBasedTableOptionsFromString(
Status GetOptionsFromString(const Options& base_options,
const std::string& opts_str, Options* new_options);
void CancelAllBackgroundWork(DB* db);
#endif // ROCKSDB_LITE
} // namespace rocksdb

View File

@ -0,0 +1,23 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2012 Facebook.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/convenience.h"
#include "db/db_impl.h"
namespace rocksdb {
void CancelAllBackgroundWork(DB* db) {
(dynamic_cast<DBImpl*>(db))->CancelAllBackgroundWork(false);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE