From 284be570c8de322cf27b66723a6cfb324ae37081 Mon Sep 17 00:00:00 2001 From: Venkatesh Radhakrishnan Date: Wed, 11 Mar 2015 10:31:02 -0700 Subject: [PATCH] Provide a mechanism to inform Rocksdb that it is shutting down Summary: Provide an API which enables users to infor Rocksdb that it is shutting down. Test Plan: db_test Reviewers: sdong, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D34617 --- db/compaction_job.cc | 2 + db/db_impl.cc | 115 ++++++------ db/db_impl.h | 2 + db/db_test.cc | 226 ++++++++++++++++++++++++ include/rocksdb/utilities/convenience.h | 1 + utilities/convenience/convenience.cc | 23 +++ 6 files changed, 317 insertions(+), 52 deletions(-) create mode 100644 utilities/convenience/convenience.cc diff --git a/db/compaction_job.cc b/db/compaction_job.cc index bbe576d1d8..6ae8a42bab 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -536,6 +536,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, compaction_filter = compaction_filter_from_factory.get(); } + TEST_SYNC_POINT("CompactionJob::Run:Inprogress"); + int64_t key_drop_user = 0; int64_t key_drop_newer_entry = 0; int64_t key_drop_obsolete = 0; diff --git a/db/db_impl.cc b/db/db_impl.cc index f005abd9f4..3bcbfd5ebc 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -257,6 +257,18 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) LogFlush(db_options_.info_log); } +void DBImpl::CancelAllBackgroundWork(bool wait) { + shutting_down_.store(true, std::memory_order_release); + if (!wait) { + return; + } + + // Wait for background work to finish + while (bg_compaction_scheduled_ || bg_flush_scheduled_ || notifying_events_) { + bg_cv_.Wait(); + } +} + DBImpl::~DBImpl() { EraseThreadStatusDbInfo(); mutex_.Lock(); @@ -273,12 +285,7 @@ DBImpl::~DBImpl() { } versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); } - - // Wait for background work to finish - shutting_down_.store(true, std::memory_order_release); - while (bg_compaction_scheduled_ || bg_flush_scheduled_ || notifying_events_) { - bg_cv_.Wait(); - } + CancelAllBackgroundWork(true); listeners_.clear(); flush_scheduler_.Clear(); @@ -1871,8 +1878,13 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer) { mutex_.AssertHeld(); - if (!bg_error_.ok()) { - return bg_error_; + Status status = bg_error_; + if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { + status = Status::ShutdownInProgress(); + } + + if (!status.ok()) { + return status; } ColumnFamilyData* cfd = nullptr; @@ -1893,7 +1905,6 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context, break; } - Status status; if (cfd != nullptr) { const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); @@ -1926,26 +1937,24 @@ void DBImpl::BackgroundCallFlush() { CaptureCurrentFileNumberInPendingOutputs(); Status s; - if (!shutting_down_.load(std::memory_order_acquire)) { - s = BackgroundFlush(&madeProgress, &job_context, &log_buffer); - if (!s.ok()) { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - uint64_t error_cnt = - default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - mutex_.Unlock(); - Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, - "Waiting after background flush error: %s" - "Accumulated background error counts: %" PRIu64, - s.ToString().c_str(), error_cnt); - log_buffer.FlushBufferToLog(); - LogFlush(db_options_.info_log); - env_->SleepForMicroseconds(1000000); - mutex_.Lock(); - } + s = BackgroundFlush(&madeProgress, &job_context, &log_buffer); + if (!s.ok() && !s.IsShutdownInProgress()) { + // Wait a little bit before retrying background flush in + // case this is an environmental problem and we do not want to + // chew up resources for failed flushes for the duration of + // the problem. + uint64_t error_cnt = + default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, + "Waiting after background flush error: %s" + "Accumulated background error counts: %" PRIu64, + s.ToString().c_str(), error_cnt); + log_buffer.FlushBufferToLog(); + LogFlush(db_options_.info_log); + env_->SleepForMicroseconds(1000000); + mutex_.Lock(); } ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); @@ -1995,26 +2004,24 @@ void DBImpl::BackgroundCallCompaction() { assert(bg_compaction_scheduled_); Status s; - if (!shutting_down_.load(std::memory_order_acquire)) { - s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer); - if (!s.ok()) { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - uint64_t error_cnt = + s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer); + if (!s.ok() && !s.IsShutdownInProgress()) { + // Wait a little bit before retrying background compaction in + // case this is an environmental problem and we do not want to + // chew up resources for failed compactions for the duration of + // the problem. + uint64_t error_cnt = default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - mutex_.Unlock(); - log_buffer.FlushBufferToLog(); - Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, - "Waiting after background compaction error: %s, " - "Accumulated background error counts: %" PRIu64, - s.ToString().c_str(), error_cnt); - LogFlush(db_options_.info_log); - env_->SleepForMicroseconds(1000000); - mutex_.Lock(); - } + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + log_buffer.FlushBufferToLog(); + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, + "Waiting after background compaction error: %s, " + "Accumulated background error counts: %" PRIu64, + s.ToString().c_str(), error_cnt); + LogFlush(db_options_.info_log); + env_->SleepForMicroseconds(1000000); + mutex_.Lock(); } ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); @@ -2071,14 +2078,19 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, bool is_manual = (manual_compaction_ != nullptr) && (manual_compaction_->in_progress == false); - if (!bg_error_.ok()) { + Status status = bg_error_; + if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { + status = Status::ShutdownInProgress(); + } + + if (!status.ok()) { if (is_manual) { - manual_compaction_->status = bg_error_; + manual_compaction_->status = status; manual_compaction_->done = true; manual_compaction_->in_progress = false; manual_compaction_ = nullptr; } - return bg_error_; + return status; } if (is_manual) { @@ -2189,7 +2201,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, } } - Status status; if (!c) { // Nothing to do LogToBuffer(log_buffer, "Compaction nothing to do"); diff --git a/db/db_impl.h b/db/db_impl.h index b0366ee0b9..d11957df9a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -265,6 +265,8 @@ class DBImpl : public DB { const SnapshotList& snapshots() const { return snapshots_; } + void CancelAllBackgroundWork(bool wait = false); + protected: Env* const env_; const std::string dbname_; diff --git a/db/db_test.cc b/db/db_test.cc index c48769f51b..5569501456 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -37,6 +37,7 @@ #include "rocksdb/thread_status.h" #include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/checkpoint.h" +#include "rocksdb/utilities/convenience.h" #include "table/block_based_table_factory.h" #include "table/plain_table_factory.h" #include "util/hash.h" @@ -10181,6 +10182,231 @@ TEST(DBTest, ThreadStatusSingleCompaction) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST(DBTest, PreShutdownManualCompaction) { + Options options = CurrentOptions(); + options.max_background_flushes = 0; + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_EQ(dbfull()->MaxMemCompactionLevel(), 2) + << "Need to update this test to match kMaxMemCompactLevel"; + + // iter - 0 with 7 levels + // iter - 1 with 3 levels + for (int iter = 0; iter < 2; ++iter) { + MakeTables(3, "p", "q", 1); + ASSERT_EQ("1,1,1", FilesPerLevel(1)); + + // Compaction range falls before files + Compact(1, "", "c"); + ASSERT_EQ("1,1,1", FilesPerLevel(1)); + + // Compaction range falls after files + Compact(1, "r", "z"); + ASSERT_EQ("1,1,1", FilesPerLevel(1)); + + // Compaction range overlaps files + Compact(1, "p1", "p9"); + ASSERT_EQ("0,0,1", FilesPerLevel(1)); + + // Populate a different range + MakeTables(3, "c", "e", 1); + ASSERT_EQ("1,1,2", FilesPerLevel(1)); + + // Compact just the new range + Compact(1, "b", "f"); + ASSERT_EQ("0,0,2", FilesPerLevel(1)); + + // Compact all + MakeTables(1, "a", "z", 1); + ASSERT_EQ("0,1,2", FilesPerLevel(1)); + CancelAllBackgroundWork(db_); + db_->CompactRange(handles_[1], nullptr, nullptr); + ASSERT_EQ("0,1,2", FilesPerLevel(1)); + + if (iter == 0) { + options = CurrentOptions(); + options.max_background_flushes = 0; + options.num_levels = 3; + options.create_if_missing = true; + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + } + } +} + +TEST(DBTest, PreShutdownMultipleCompaction) { + const int kTestKeySize = 16; + const int kTestValueSize = 984; + const int kEntrySize = kTestKeySize + kTestValueSize; + const int kEntriesPerBuffer = 10; + const int kNumL0Files = 4; + + const int kHighPriCount = 3; + const int kLowPriCount = 5; + env_->SetBackgroundThreads(kHighPriCount, Env::HIGH); + env_->SetBackgroundThreads(kLowPriCount, Env::LOW); + + Options options; + options.create_if_missing = true; + options.write_buffer_size = kEntrySize * kEntriesPerBuffer; + options.compaction_style = kCompactionStyleLevel; + options.target_file_size_base = options.write_buffer_size; + options.max_bytes_for_level_base = + options.target_file_size_base * kNumL0Files; + options.compression = kNoCompression; + options = CurrentOptions(options); + options.env = env_; + options.enable_thread_tracking = true; + options.level0_file_num_compaction_trigger = kNumL0Files; + options.max_bytes_for_level_multiplier = 2; + options.max_background_compactions = kLowPriCount; + + TryReopen(options); + Random rnd(301); + + std::vector thread_list; + // Delay both flush and compaction + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"FlushJob::Run:Start", "CompactionJob::Run:Start"}, + {"CompactionJob::Run:Start", + "DBTest::PreShutdownMultipleCompaction:Preshutdown"}, + {"DBTest::PreShutdownMultipleCompaction:Preshutdown", + "CompactionJob::Run:End"}, + {"CompactionJob::Run:End", + "DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown"}}); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Make rocksdb busy + int key = 0; + int max_operation_count[ThreadStatus::NUM_OP_TYPES] = {0}; + // check how many threads are doing compaction using GetThreadList + int operation_count[ThreadStatus::NUM_OP_TYPES] = {0}; + for (int file = 0; file < 64 * kNumL0Files; ++file) { + for (int k = 0; k < kEntriesPerBuffer; ++k) { + ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize))); + } + + Status s = env_->GetThreadList(&thread_list); + for (auto thread : thread_list) { + operation_count[thread.operation_type]++; + } + + // Record the max number of compactions at a time. + for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) { + if (max_operation_count[i] < operation_count[i]) { + max_operation_count[i] = operation_count[i]; + } + } + // Speed up the test + if (max_operation_count[ThreadStatus::OP_FLUSH] > 1 && + max_operation_count[ThreadStatus::OP_COMPACTION] > + 0.6 * options.max_background_compactions) { + break; + } + } + + TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown"); + ASSERT_GE(max_operation_count[ThreadStatus::OP_COMPACTION], 1); + CancelAllBackgroundWork(db_); + TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown"); + dbfull()->TEST_WaitForCompact(); + // Record the number of compactions at a time. + for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) { + operation_count[i] = 0; + } + Status s = env_->GetThreadList(&thread_list); + for (auto thread : thread_list) { + operation_count[thread.operation_type]++; + } + ASSERT_EQ(operation_count[ThreadStatus::OP_COMPACTION], 0); +} + +TEST(DBTest, PreShutdownCompactionMiddle) { + const int kTestKeySize = 16; + const int kTestValueSize = 984; + const int kEntrySize = kTestKeySize + kTestValueSize; + const int kEntriesPerBuffer = 10; + const int kNumL0Files = 4; + + const int kHighPriCount = 3; + const int kLowPriCount = 5; + env_->SetBackgroundThreads(kHighPriCount, Env::HIGH); + env_->SetBackgroundThreads(kLowPriCount, Env::LOW); + + Options options; + options.create_if_missing = true; + options.write_buffer_size = kEntrySize * kEntriesPerBuffer; + options.compaction_style = kCompactionStyleLevel; + options.target_file_size_base = options.write_buffer_size; + options.max_bytes_for_level_base = + options.target_file_size_base * kNumL0Files; + options.compression = kNoCompression; + options = CurrentOptions(options); + options.env = env_; + options.enable_thread_tracking = true; + options.level0_file_num_compaction_trigger = kNumL0Files; + options.max_bytes_for_level_multiplier = 2; + options.max_background_compactions = kLowPriCount; + + TryReopen(options); + Random rnd(301); + + std::vector thread_list; + // Delay both flush and compaction + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBTest::PreShutdownMultipleCompaction:Preshutdown", + "CompactionJob::Run:Inprogress"}, + {"CompactionJob::Run:Inprogress", "CompactionJob::Run:End"}, + {"CompactionJob::Run:End", + "DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown"}}); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Make rocksdb busy + int key = 0; + int max_operation_count[ThreadStatus::NUM_OP_TYPES] = {0}; + // check how many threads are doing compaction using GetThreadList + int operation_count[ThreadStatus::NUM_OP_TYPES] = {0}; + for (int file = 0; file < 64 * kNumL0Files; ++file) { + for (int k = 0; k < kEntriesPerBuffer; ++k) { + ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize))); + } + + Status s = env_->GetThreadList(&thread_list); + for (auto thread : thread_list) { + operation_count[thread.operation_type]++; + } + + // Record the max number of compactions at a time. + for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) { + if (max_operation_count[i] < operation_count[i]) { + max_operation_count[i] = operation_count[i]; + } + } + // Speed up the test + if (max_operation_count[ThreadStatus::OP_FLUSH] > 1 && + max_operation_count[ThreadStatus::OP_COMPACTION] > + 0.6 * options.max_background_compactions) { + break; + } + } + + ASSERT_GE(max_operation_count[ThreadStatus::OP_COMPACTION], 1); + CancelAllBackgroundWork(db_); + TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:Preshutdown"); + TEST_SYNC_POINT("DBTest::PreShutdownMultipleCompaction:VerifyPreshutdown"); + dbfull()->TEST_WaitForCompact(); + // Record the number of compactions at a time. + for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) { + operation_count[i] = 0; + } + Status s = env_->GetThreadList(&thread_list); + for (auto thread : thread_list) { + operation_count[thread.operation_type]++; + } + ASSERT_EQ(operation_count[ThreadStatus::OP_COMPACTION], 0); +} + #endif // ROCKSDB_USING_THREAD_STATUS TEST(DBTest, DynamicLevelMaxBytesBase) { diff --git a/include/rocksdb/utilities/convenience.h b/include/rocksdb/utilities/convenience.h index c9a05657a8..e729dcf080 100644 --- a/include/rocksdb/utilities/convenience.h +++ b/include/rocksdb/utilities/convenience.h @@ -56,6 +56,7 @@ Status GetBlockBasedTableOptionsFromString( Status GetOptionsFromString(const Options& base_options, const std::string& opts_str, Options* new_options); +void CancelAllBackgroundWork(DB* db); #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/utilities/convenience/convenience.cc b/utilities/convenience/convenience.cc new file mode 100644 index 0000000000..55303f06b3 --- /dev/null +++ b/utilities/convenience/convenience.cc @@ -0,0 +1,23 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2012 Facebook. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef ROCKSDB_LITE + +#include "rocksdb/utilities/convenience.h" + +#include "db/db_impl.h" + +namespace rocksdb { + +void CancelAllBackgroundWork(DB* db) { + (dynamic_cast(db))->CancelAllBackgroundWork(false); +} +} // namespace rocksdb + +#endif // ROCKSDB_LITE