From 4203431e71ff9752f66bde116901592f8602d977 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 3 Jul 2014 23:03:24 +0200 Subject: [PATCH 1/7] Fix mac os compile error --- db/version_set.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/version_set.cc b/db/version_set.cc index b68923bc07..507a018bbb 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1472,7 +1472,7 @@ class VersionSet::Builder { } } if (!found) { - fprintf(stderr, "not found %ld\n", number); + fprintf(stderr, "not found %" PRIu64 "\n", number); } assert(found); #endif From d4d338de33da7fa95336afa15c46cf61c2325c3a Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 3 Jul 2014 15:47:02 -0700 Subject: [PATCH 2/7] Add timeout_hint_us to WriteOptions and introduce Status::TimeOut. Summary: This diff adds timeout_hint_us to WriteOptions. If it's non-zero, then 1) writes associated with this options MAY be aborted when it has been waiting for longer than the specified time. If an abortion happens, associated writes will return Status::TimeOut. 2) the stall time of the associated write caused by flush or compaction will be limited by timeout_hint_us. The default value of timeout_hint_us is 0 (i.e., OFF.) The statistics of timeout writes will be recorded in WRITE_TIMEDOUT. Test Plan: export ROCKSDB_TESTS=WriteTimeoutAndDelayTest make db_test ./db_test Reviewers: igor, ljin, haobo, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D18837 --- db/db_impl.cc | 121 +++++++++++++++++++++++++++++----- db/db_impl.h | 6 +- db/db_test.cc | 122 +++++++++++++++++++++++++++++++++++ include/rocksdb/options.h | 13 +++- include/rocksdb/statistics.h | 2 + include/rocksdb/status.h | 11 +++- port/port_posix.cc | 5 +- util/status.cc | 3 + util/stop_watch.h | 2 +- 9 files changed, 262 insertions(+), 23 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index ce1cf78ff1..a15568432b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -80,7 +80,9 @@ struct DBImpl::Writer { WriteBatch* batch; bool sync; bool disableWAL; + bool in_batch_group; bool done; + uint64_t timeout_hint_us; port::CondVar cv; explicit Writer(port::Mutex* mu) : cv(mu) { } @@ -3729,13 +3731,41 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { w.batch = my_batch; w.sync = options.sync; w.disableWAL = options.disableWAL; + w.in_batch_group = false; + w.done = false; + w.timeout_hint_us = options.timeout_hint_us; + + uint64_t expiration_time = 0; + if (w.timeout_hint_us == 0) { + w.timeout_hint_us = kNoTimeOut; + } else { + expiration_time = env_->NowMicros() + w.timeout_hint_us; + } w.done = false; - StopWatch sw(env_, options_.statistics.get(), DB_WRITE, false); mutex_.Lock(); + // the following code block pushes the current writer "w" into the writer + // queue "writers_" and wait until one of the following conditions met: + // 1. the job of "w" has been done by some other writers. + // 2. "w" becomes the first writer in "writers_" + // 3. "w" timed-out. writers_.push_back(&w); + + bool timed_out = false; while (!w.done && &w != writers_.front()) { - w.cv.Wait(); + if (expiration_time == 0) { + w.cv.Wait(); + } else if (w.cv.TimedWait(expiration_time)) { + if (w.in_batch_group) { + // then it means the front writer is currently doing the + // write on behalf of this "timed-out" writer. Then it + // should wait until the write completes. + expiration_time = 0; + } else { + timed_out = true; + break; + } + } } if (!options.disableWAL) { @@ -3746,10 +3776,33 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { mutex_.Unlock(); RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1); return w.status; + } else if (timed_out) { + bool found = false; + for (auto iter = writers_.begin(); iter != writers_.end(); iter++) { + if (*iter == &w) { + writers_.erase(iter); + found = true; + break; + } + } + assert(found); + // writers_.front() might still be in cond_wait without a time-out. + // As a result, we need to signal it to wake it up. Otherwise no + // one else will wake him up, and RocksDB will hang. + if (!writers_.empty()) { + writers_.front()->cv.Signal(); + } + mutex_.Unlock(); + RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1); + return Status::TimedOut(); } else { RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1); } + // Once reaches this point, the current writer "w" will try to do its write + // job. It may also pick up some of the remaining writers in the "writers_" + // when it finds suitable, and finish them in the same write batch. + // This is how a write job could be done by the other writer. assert(!single_column_family_mode_ || versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); @@ -3774,8 +3827,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (LIKELY(single_column_family_mode_)) { // fast path - status = MakeRoomForWrite(default_cf_handle_->cfd(), my_batch == nullptr, - &superversions_to_free, &logs_to_free); + status = MakeRoomForWrite( + default_cf_handle_->cfd(), my_batch == nullptr, + &superversions_to_free, &logs_to_free, + expiration_time); } else { // refcounting cfd in iteration bool dead_cfd = false; @@ -3786,8 +3841,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { (flush_column_family_if_log_file != 0 && cfd->GetLogNumber() <= flush_column_family_if_log_file); // May temporarily unlock and wait. - status = MakeRoomForWrite(cfd, force_flush, &superversions_to_free, - &logs_to_free); + status = MakeRoomForWrite( + cfd, force_flush, &superversions_to_free, &logs_to_free, + expiration_time); if (cfd->Unref()) { dead_cfd = true; } @@ -3883,11 +3939,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } } - if (options_.paranoid_checks && !status.ok() && bg_error_.ok()) { + if (options_.paranoid_checks && !status.ok() && + !status.IsTimedOut() && bg_error_.ok()) { bg_error_ = status; // stop compaction & fail any further writes } - while (true) { + // Pop out the current writer and all writers being pushed before the + // current writer from the writer queue. + while (!writers_.empty()) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { @@ -3904,6 +3963,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } mutex_.Unlock(); + if (status.IsTimedOut()) { + RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1); + } + for (auto& sv : superversions_to_free) { delete sv; } @@ -3915,6 +3978,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { return status; } +// This function will be called only when the first writer succeeds. +// All writers in the to-be-built batch group will be processed. +// // REQUIRES: Writer list must be non-empty // REQUIRES: First writer must have a non-nullptr batch void DBImpl::BuildBatchGroup(Writer** last_writer, @@ -3950,6 +4016,12 @@ void DBImpl::BuildBatchGroup(Writer** last_writer, break; } + if (w->timeout_hint_us < first->timeout_hint_us) { + // Do not include those writes with shorter timeout. Otherwise, we might + // execute a write that should instead be aborted because of timeout. + break; + } + if (w->batch != nullptr) { size += WriteBatchInternal::ByteSize(w->batch); if (size > max_size) { @@ -3959,6 +4031,7 @@ void DBImpl::BuildBatchGroup(Writer** last_writer, write_batch_group->push_back(w->batch); } + w->in_batch_group = true; *last_writer = w; } } @@ -4000,7 +4073,8 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) { Status DBImpl::MakeRoomForWrite( ColumnFamilyData* cfd, bool force, autovector* superversions_to_free, - autovector* logs_to_free) { + autovector* logs_to_free, + uint64_t expiration_time) { mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; @@ -4013,12 +4087,16 @@ Status DBImpl::MakeRoomForWrite( // might generate a tight feedback loop, constantly scheduling more background // work, even if additional background work is not needed bool schedule_background_work = true; + bool has_timeout = (expiration_time > 0); while (true) { if (!bg_error_.ok()) { // Yield previous error s = bg_error_; break; + } else if (has_timeout && env_->NowMicros() > expiration_time) { + s = Status::TimedOut(); + break; } else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several @@ -4063,7 +4141,11 @@ Status DBImpl::MakeRoomForWrite( { StopWatch sw(env_, options_.statistics.get(), STALL_MEMTABLE_COMPACTION_COUNT); - bg_cv_.Wait(); + if (!has_timeout) { + bg_cv_.Wait(); + } else { + bg_cv_.TimedWait(expiration_time); + } stall = sw.ElapsedMicros(); } RecordTick(options_.statistics.get(), @@ -4078,10 +4160,15 @@ Status DBImpl::MakeRoomForWrite( { StopWatch sw(env_, options_.statistics.get(), STALL_L0_NUM_FILES_COUNT); - bg_cv_.Wait(); + if (!has_timeout) { + bg_cv_.Wait(); + } else { + bg_cv_.TimedWait(expiration_time); + } stall = sw.ElapsedMicros(); } - RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall); + RecordTick(options_.statistics.get(), + STALL_L0_NUM_FILES_MICROS, stall); cfd->internal_stats()->RecordWriteStall(InternalStats::LEVEL0_NUM_FILES, stall); } else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) { @@ -4112,18 +4199,18 @@ Status DBImpl::MakeRoomForWrite( score = cfd->current()->MaxCompactionScore(); // Delay a write when the compaction score for any level is too large. // TODO: add statistics + uint64_t slowdown = SlowdownAmount(score, cfd->options()->soft_rate_limit, + cfd->options()->hard_rate_limit); mutex_.Unlock(); { StopWatch sw(env_, options_.statistics.get(), SOFT_RATE_LIMIT_DELAY_COUNT); - env_->SleepForMicroseconds( - SlowdownAmount(score, cfd->options()->soft_rate_limit, - cfd->options()->hard_rate_limit)); - rate_limit_delay_millis += sw.ElapsedMicros(); + env_->SleepForMicroseconds(slowdown); + slowdown = sw.ElapsedMicros(); + rate_limit_delay_millis += slowdown; } allow_soft_rate_limit_delay = false; mutex_.Lock(); - } else { unique_ptr lfile; log::Writer* new_log = nullptr; diff --git a/db/db_impl.h b/db/db_impl.h index fb0bdb4af0..48bf4de37c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -28,6 +29,7 @@ #include "rocksdb/transaction_log.h" #include "util/autovector.h" #include "util/stats_logger.h" +#include "util/stop_watch.h" #include "util/thread_local.h" #include "db/internal_stats.h" @@ -345,7 +347,8 @@ class DBImpl : public DB { Status MakeRoomForWrite(ColumnFamilyData* cfd, bool force /* flush even if there is room? */, autovector* superversions_to_free, - autovector* logs_to_free); + autovector* logs_to_free, + uint64_t expiration_time); void BuildBatchGroup(Writer** last_writer, autovector* write_batch_group); @@ -578,6 +581,7 @@ class DBImpl : public DB { bool flush_on_destroy_; // Used when disableWAL is true. static const int KEEP_LOG_FILE_NUM = 1000; + static const uint64_t kNoTimeOut = std::numeric_limits::max(); std::string db_absolute_path_; // count of the number of contiguous delaying writes diff --git a/db/db_test.cc b/db/db_test.cc index 8010eaa811..575768db10 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7001,6 +7001,128 @@ TEST(DBTest, FIFOCompactionTest) { } } } + +TEST(DBTest, SimpleWriteTimeoutTest) { + Options options; + options.env = env_; + options.create_if_missing = true; + options.write_buffer_size = 100000; + options.max_background_flushes = 0; + options.max_write_buffer_number = 2; + options.min_write_buffer_number_to_merge = 3; + options.max_total_wal_size = std::numeric_limits::max(); + WriteOptions write_opt = WriteOptions(); + write_opt.timeout_hint_us = 500; + DestroyAndReopen(&options); + // fill the two write buffer + ASSERT_OK(Put(Key(1), Key(1) + std::string(100000, 'v'), write_opt)); + ASSERT_OK(Put(Key(2), Key(2) + std::string(100000, 'v'), write_opt)); + // As the only two write buffers are full in this moment, the third + // Put is expected to be timed-out. + ASSERT_TRUE( + Put(Key(3), Key(3) + std::string(100000, 'v'), write_opt).IsTimedOut()); +} + +// Multi-threaded Timeout Test +namespace { + +static const int kValueSize = 1000; +static const int kWriteBufferSize = 100000; + +struct TimeoutWriterState { + int id; + DB* db; + std::atomic done; + std::map success_kvs; +}; + +static void RandomTimeoutWriter(void* arg) { + TimeoutWriterState* state = reinterpret_cast(arg); + static const uint64_t kTimerBias = 50; + int thread_id = state->id; + DB* db = state->db; + + Random rnd(1000 + thread_id); + WriteOptions write_opt = WriteOptions(); + write_opt.timeout_hint_us = 500; + int timeout_count = 0; + int num_keys = kNumKeys * 5; + + for (int k = 0; k < num_keys; ++k) { + int key = k + thread_id * num_keys; + std::string value = RandomString(&rnd, kValueSize); + // only the second-half is randomized + if (k > num_keys / 2) { + switch (rnd.Next() % 5) { + case 0: + write_opt.timeout_hint_us = 500 * thread_id; + break; + case 1: + write_opt.timeout_hint_us = num_keys - k; + break; + case 2: + write_opt.timeout_hint_us = 1; + break; + default: + write_opt.timeout_hint_us = 0; + state->success_kvs.insert({key, value}); + } + } + + uint64_t time_before_put = db->GetEnv()->NowMicros(); + Status s = db->Put(write_opt, Key(key), value); + uint64_t put_duration = db->GetEnv()->NowMicros() - time_before_put; + if (write_opt.timeout_hint_us == 0 || + put_duration + kTimerBias < write_opt.timeout_hint_us) { + ASSERT_OK(s); + std::string result; + } + if (s.IsTimedOut()) { + timeout_count++; + ASSERT_GT(put_duration + kTimerBias, write_opt.timeout_hint_us); + } + } + + state->done = true; +} + +TEST(DBTest, MTRandomTimeoutTest) { + Options options; + options.env = env_; + options.create_if_missing = true; + options.max_write_buffer_number = 2; + options.compression = kNoCompression; + options.level0_slowdown_writes_trigger = 10; + options.level0_stop_writes_trigger = 20; + options.write_buffer_size = kWriteBufferSize; + DestroyAndReopen(&options); + + TimeoutWriterState thread_states[kNumThreads]; + for (int tid = 0; tid < kNumThreads; ++tid) { + thread_states[tid].id = tid; + thread_states[tid].db = db_; + thread_states[tid].done = false; + env_->StartThread(RandomTimeoutWriter, &thread_states[tid]); + } + + for (int tid = 0; tid < kNumThreads; ++tid) { + while (thread_states[tid].done == false) { + env_->SleepForMicroseconds(100000); + } + } + + Flush(); + + for (int tid = 0; tid < kNumThreads; ++tid) { + auto& success_kvs = thread_states[tid].success_kvs; + for (auto it = success_kvs.begin(); it != success_kvs.end(); ++it) { + ASSERT_EQ(Get(Key(it->first)), it->second); + } + } +} + +} // anonymous namespace + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index df7383d25d..9281493326 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -975,7 +975,18 @@ struct WriteOptions { // and the write may got lost after a crash. bool disableWAL; - WriteOptions() : sync(false), disableWAL(false) {} + // If non-zero, then associated write waiting longer than the specified + // time MAY be aborted and returns Status::TimedOut. A write that takes + // less than the specified time is guaranteed to not fail with + // Status::TimedOut. + // + // The number of times a write call encounters a timeout is recorded in + // Statistics.WRITE_TIMEDOUT + // + // Default: 0 + uint64_t timeout_hint_us; + + WriteOptions() : sync(false), disableWAL(false), timeout_hint_us(0) {} }; // Options that control flush operations diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 7d5235f657..77f0b03882 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -115,6 +115,7 @@ enum Tickers { // head of the writers queue. WRITE_DONE_BY_SELF, WRITE_DONE_BY_OTHER, + WRITE_TIMEDOUT, // Number of writes ending up with timed-out. WRITE_WITH_WAL, // Number of Write calls that request WAL COMPACT_READ_BYTES, // Bytes read during compaction COMPACT_WRITE_BYTES, // Bytes written during compaction @@ -176,6 +177,7 @@ const std::vector> TickersNameMap = { {WAL_FILE_BYTES, "rocksdb.wal.bytes"}, {WRITE_DONE_BY_SELF, "rocksdb.write.self"}, {WRITE_DONE_BY_OTHER, "rocksdb.write.other"}, + {WRITE_TIMEDOUT, "rocksdb.write.timedout"}, {WRITE_WITH_WAL, "rocksdb.write.wal"}, {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"}, {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 0298a28380..b20689a77c 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -65,6 +65,12 @@ class Status { const Slice& msg2 = Slice()) { return Status(kShutdownInProgress, msg, msg2); } + static Status TimedOut() { + return Status(kTimedOut); + } + static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kTimedOut, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -93,6 +99,8 @@ class Status { // Returns true iff the status indicates Incomplete bool IsShutdownInProgress() const { return code() == kShutdownInProgress; } + bool IsTimedOut() const { return code() == kTimedOut; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -106,7 +114,8 @@ class Status { kIOError = 5, kMergeInProgress = 6, kIncomplete = 7, - kShutdownInProgress = 8 + kShutdownInProgress = 8, + kTimedOut = 9 }; Code code() const { diff --git a/port/port_posix.cc b/port/port_posix.cc index 90dde32279..f353c475e4 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -20,11 +20,12 @@ namespace rocksdb { namespace port { -static void PthreadCall(const char* label, int result) { - if (result != 0) { +static int PthreadCall(const char* label, int result) { + if (result != 0 && result != ETIMEDOUT) { fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); abort(); } + return result; } Mutex::Mutex(bool adaptive) { diff --git a/util/status.cc b/util/status.cc index 2a5f05a4b8..3165a497df 100644 --- a/util/status.cc +++ b/util/status.cc @@ -68,6 +68,9 @@ std::string Status::ToString() const { case kShutdownInProgress: type = "Shutdown in progress: "; break; + case kTimedOut: + type = "Operation timed out: "; + break; default: snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", static_cast(code())); diff --git a/util/stop_watch.h b/util/stop_watch.h index 48e1b01c25..bc31cfc46e 100644 --- a/util/stop_watch.h +++ b/util/stop_watch.h @@ -24,7 +24,7 @@ class StopWatch { - uint64_t ElapsedMicros() { + uint64_t ElapsedMicros() const { return env_->NowMicros() - start_time_; } From a1df6c1fc8db9f7714cd0c61f12a231333bcbb11 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 3 Jul 2014 15:56:03 -0700 Subject: [PATCH 3/7] Update HISTORY.md to include TimeOut write API and compaction update. Summary: Update HISTORY.md to include TimeOut write API and compaction update. Test Plan: n/a Reviewers: ljin, sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19449 --- HISTORY.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/HISTORY.md b/HISTORY.md index cb25e89877..a4ef5d659c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,7 +4,8 @@ ### New Features * HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory(). - +* RocksDB is now able to reclaim storage space more effectively during the compaction process. This is done by compensating the size of each deletion entry by the 2X average value size, which makes compaction to be triggerred by deletion entries more easily. +* Add TimeOut API to write. Now WriteOptions have a variable called timeout_hint_us. With timeout_hint_us set to non-zero, any write associated with this timeout_hint_us may be aborted when it runs longer than the specified timeout_hint_us, and it is guaranteed that any write completes earlier than the specified time-out will not be aborted due to the time-out condition. ## 3.2.0 (06/20/2014) From 90a6aca48ee8bcf3f0846b57e82617beada5741b Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 3 Jul 2014 16:28:03 -0700 Subject: [PATCH 4/7] Finer report I/O stats about Flush and Compaction. Summary: This diff allows the I/O stats about Flush and Compaction to be reported in a more accurate way. Instead of measuring the size of a file, it measure I/O cost in per read / write basis. Test Plan: make all check Reviewers: sdong, igor, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19383 --- db/db_impl.cc | 31 ++++++++++++++++++++++------ db/db_impl.h | 3 +++ include/rocksdb/iostats_context.h | 34 +++++++++++++++++++++++++++++++ include/rocksdb/statistics.h | 2 ++ util/env_posix.cc | 8 ++++++++ util/iostats_context.cc | 30 +++++++++++++++++++++++++++ util/iostats_context_imp.h | 32 +++++++++++++++++++++++++++++ 7 files changed, 134 insertions(+), 6 deletions(-) create mode 100644 include/rocksdb/iostats_context.h create mode 100644 util/iostats_context.cc create mode 100644 util/iostats_context_imp.h diff --git a/db/db_impl.cc b/db/db_impl.cc index a15568432b..c56acecf8f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -65,6 +65,7 @@ #include "util/log_buffer.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" +#include "util/iostats_context_imp.h" #include "util/stop_watch.h" #include "util/sync_point.h" @@ -1604,6 +1605,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, // true, mark DB read-only bg_error_ = s; } + RecordFlushIOStats(); return s; } @@ -1920,11 +1922,28 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } } +void DBImpl::RecordFlushIOStats() { + RecordTick(options_.statistics.get(), FLUSH_WRITE_BYTES, + iostats_context.bytes_written); + IOSTATS_RESET(bytes_written); +} + +void DBImpl::RecordCompactionIOStats() { + RecordTick(options_.statistics.get(), COMPACT_READ_BYTES, + IOSTATS(bytes_read)); + IOSTATS_RESET(bytes_read); + RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, + IOSTATS(bytes_written)); + IOSTATS_RESET(bytes_written); +} + void DBImpl::BGWorkFlush(void* db) { + IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); reinterpret_cast(db)->BackgroundCallFlush(); } void DBImpl::BGWorkCompaction(void* db) { + IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW); reinterpret_cast(db)->BackgroundCallCompaction(); } @@ -2024,6 +2043,7 @@ void DBImpl::BackgroundCallFlush() { // that case, all DB variables will be dealloacated and referencing them // will cause trouble. } + RecordFlushIOStats(); } void DBImpl::BackgroundCallCompaction() { @@ -2559,6 +2579,7 @@ Status DBImpl::ProcessKeyValueCompaction( while (input->Valid() && !shutting_down_.Acquire_Load() && !cfd->IsDropped()) { + RecordCompactionIOStats(); // FLUSH preempts compaction // TODO(icanadi) this currently only checks if flush is necessary on // compacting column family. we should also check if flush is necessary on @@ -2817,6 +2838,8 @@ Status DBImpl::ProcessKeyValueCompaction( } } + RecordCompactionIOStats(); + return status; } @@ -3124,22 +3147,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, for (int i = 0; i < compact->compaction->num_input_files(0); i++) { stats.bytes_readn += compact->compaction->input(0, i)->fd.GetFileSize(); - RecordTick(options_.statistics.get(), COMPACT_READ_BYTES, - compact->compaction->input(0, i)->fd.GetFileSize()); } for (int i = 0; i < compact->compaction->num_input_files(1); i++) { stats.bytes_readnp1 += compact->compaction->input(1, i)->fd.GetFileSize(); - RecordTick(options_.statistics.get(), COMPACT_READ_BYTES, - compact->compaction->input(1, i)->fd.GetFileSize()); } for (int i = 0; i < num_output_files; i++) { stats.bytes_written += compact->outputs[i].file_size; - RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, - compact->outputs[i].file_size); } + RecordCompactionIOStats(); + LogFlush(options_.info_log); mutex_.Lock(); cfd->internal_stats()->AddCompactionStats(compact->compaction->output_level(), diff --git a/db/db_impl.h b/db/db_impl.h index 48bf4de37c..6ac1d97f45 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -359,6 +359,9 @@ class DBImpl : public DB { // Wait for memtable flushed Status WaitForFlushMemTable(ColumnFamilyData* cfd); + void RecordFlushIOStats(); + void RecordCompactionIOStats(); + void MaybeScheduleLogDBDeployStats(); #ifndef ROCKSDB_LITE diff --git a/include/rocksdb/iostats_context.h b/include/rocksdb/iostats_context.h new file mode 100644 index 0000000000..0a220b53ab --- /dev/null +++ b/include/rocksdb/iostats_context.h @@ -0,0 +1,34 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_ +#define INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_ + +#include +#include + +// A thread local context for gathering io-stats efficiently and transparently. +namespace rocksdb { + +struct IOStatsContext { + // reset all io-stats counter to zero + void Reset(); + + std::string ToString() const; + + // the thread pool id + uint64_t thread_pool_id; + + // number of bytes that has been written. + uint64_t bytes_written; + // number of bytes that has been read. + uint64_t bytes_read; +}; + +extern __thread IOStatsContext iostats_context; + +} // namespace rocksdb + +#endif // INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_ diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 77f0b03882..c205f1b8c7 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -119,6 +119,7 @@ enum Tickers { WRITE_WITH_WAL, // Number of Write calls that request WAL COMPACT_READ_BYTES, // Bytes read during compaction COMPACT_WRITE_BYTES, // Bytes written during compaction + FLUSH_WRITE_BYTES, // Bytes written during flush // Number of table's properties loaded directly from file, without creating // table reader object. @@ -179,6 +180,7 @@ const std::vector> TickersNameMap = { {WRITE_DONE_BY_OTHER, "rocksdb.write.other"}, {WRITE_TIMEDOUT, "rocksdb.write.timedout"}, {WRITE_WITH_WAL, "rocksdb.write.wal"}, + {FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"}, {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"}, {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, diff --git a/util/env_posix.cc b/util/env_posix.cc index 3bfeb0ea0d..a73ec6b0ea 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -40,6 +40,7 @@ #include "util/logging.h" #include "util/posix_logger.h" #include "util/random.h" +#include "util/iostats_context_imp.h" #include // Get nano time for mach systems @@ -178,6 +179,7 @@ class PosixSequentialFile: public SequentialFile { do { r = fread_unlocked(scratch, 1, n, file_); } while (r == 0 && ferror(file_) && errno == EINTR); + IOSTATS_ADD(bytes_read, r); *result = Slice(scratch, r); if (r < n) { if (feof(file_)) { @@ -241,6 +243,7 @@ class PosixRandomAccessFile: public RandomAccessFile { do { r = pread(fd_, scratch, n, static_cast(offset)); } while (r < 0 && errno == EINTR); + IOSTATS_ADD_IF_POSITIVE(bytes_read, r); *result = Slice(scratch, (r < 0) ? 0 : r); if (r < 0) { // An error: return a non-ok status @@ -488,6 +491,7 @@ class PosixMmapFile : public WritableFile { size_t n = (left <= avail) ? left : avail; memcpy(dst_, src, n); + IOSTATS_ADD(bytes_written, n); dst_ += n; src += n; left -= n; @@ -694,6 +698,7 @@ class PosixWritableFile : public WritableFile { } return IOError(filename_, errno); } + IOSTATS_ADD(bytes_written, done); TEST_KILL_RANDOM(rocksdb_kill_odds); left -= done; @@ -744,6 +749,7 @@ class PosixWritableFile : public WritableFile { } return IOError(filename_, errno); } + IOSTATS_ADD(bytes_written, done); TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); left -= done; src += done; @@ -877,6 +883,7 @@ class PosixRandomRWFile : public RandomRWFile { } return IOError(filename_, errno); } + IOSTATS_ADD(bytes_written, done); left -= done; src += done; @@ -890,6 +897,7 @@ class PosixRandomRWFile : public RandomRWFile { char* scratch) const { Status s; ssize_t r = pread(fd_, scratch, n, static_cast(offset)); + IOSTATS_ADD_IF_POSITIVE(bytes_read, r); *result = Slice(scratch, (r < 0) ? 0 : r); if (r < 0) { s = IOError(filename_, errno); diff --git a/util/iostats_context.cc b/util/iostats_context.cc new file mode 100644 index 0000000000..6108317792 --- /dev/null +++ b/util/iostats_context.cc @@ -0,0 +1,30 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include "rocksdb/env.h" +#include "util/iostats_context_imp.h" + +namespace rocksdb { + +__thread IOStatsContext iostats_context; + +void IOStatsContext::Reset() { + thread_pool_id = Env::Priority::TOTAL; + bytes_read = 0; + bytes_written = 0; +} + +#define OUTPUT(counter) #counter << " = " << counter << ", " + +std::string IOStatsContext::ToString() const { + std::ostringstream ss; + ss << OUTPUT(thread_pool_id) + << OUTPUT(bytes_read) + << OUTPUT(bytes_written); + return ss.str(); +} + +} // namespace rocksdb diff --git a/util/iostats_context_imp.h b/util/iostats_context_imp.h new file mode 100644 index 0000000000..ed34037d33 --- /dev/null +++ b/util/iostats_context_imp.h @@ -0,0 +1,32 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#pragma once +#include "rocksdb/iostats_context.h" + +// increment a specific counter by the specified value +#define IOSTATS_ADD(metric, value) \ + (iostats_context.metric += value) + +// Increase metric value only when it is positive +#define IOSTATS_ADD_IF_POSITIVE(metric, value) \ + if (value > 0) { IOSTATS_ADD(metric, value); } + +// reset a specific counter to zero +#define IOSTATS_RESET(metric) \ + (iostats_context.metric = 0) + +// reset all counters to zero +#define IOSTATS_RESET_ALL() \ + (iostats_context.Reset()) + +#define IOSTATS_SET_THREAD_POOL_ID(value) \ + (iostats_context.thread_pool_id = value) + +#define IOSTATS_THREAD_POOL_ID() \ + (iostats_context.thread_pool_id) + +#define IOSTATS(metric) \ + (iostats_context.metric) From d33657a4a5ebba9d3aa2a896a67eaad8131773f0 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 3 Jul 2014 17:19:17 -0700 Subject: [PATCH 5/7] Fixed a warning in release mode. Summary: Removed a variable that is only used in assertion check. Test Plan: make release Reviewers: ljin, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19455 --- db/db_impl.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/db/db_impl.cc b/db/db_impl.cc index c56acecf8f..6862f9a776 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3796,15 +3796,21 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1); return w.status; } else if (timed_out) { +#ifndef NDEBUG bool found = false; +#endif for (auto iter = writers_.begin(); iter != writers_.end(); iter++) { if (*iter == &w) { writers_.erase(iter); +#ifndef NDEBUG found = true; +#endif break; } } +#ifndef NDEBUG assert(found); +#endif // writers_.front() might still be in cond_wait without a time-out. // As a result, we need to signal it to wake it up. Otherwise no // one else will wake him up, and RocksDB will hang. From 8d9a46fcd12850c9756a722bed52bdf9d9ab5916 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Thu, 3 Jul 2014 23:13:08 -0700 Subject: [PATCH 6/7] initialize decoded_internal_key_valid Summary: ReadInternalKey() will assign correct value anyway. Initialize it to true to suppress compiler error reported https://github.com/facebook/rocksdb/issues/186 Test Plan: I cannot reproduce it but this is obvious Reviewers: sdong, yhchiang Reviewed By: yhchiang Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19467 --- table/plain_table_key_coding.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/table/plain_table_key_coding.cc b/table/plain_table_key_coding.cc index 51849b3e32..eedf58aeaa 100644 --- a/table/plain_table_key_coding.cc +++ b/table/plain_table_key_coding.cc @@ -198,7 +198,8 @@ Status PlainTableKeyDecoder::NextPlainEncodingKey( user_key_size = static_cast(tmp_size); *bytes_read = key_ptr - start; } - bool decoded_internal_key_valid; + // dummy initial value to avoid compiler complain + bool decoded_internal_key_valid = true; Slice decoded_internal_key; Status s = ReadInternalKey(key_ptr, limit, user_key_size, parsed_key, bytes_read, @@ -227,7 +228,8 @@ Status PlainTableKeyDecoder::NextPrefixEncodingKey( bool expect_suffix = false; do { size_t size = 0; - bool decoded_internal_key_valid; + // dummy initial value to avoid compiler complain + bool decoded_internal_key_valid = true; const char* pos = DecodeSize(key_ptr, limit, &entry_type, &size); if (pos == nullptr) { return Status::Corruption("Unexpected EOF when reading size of the key"); From 7b85c1e900a1e0f47c78ff1ddfa8ddd924715eaf Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Fri, 4 Jul 2014 00:02:12 -0700 Subject: [PATCH 7/7] Improve SimpleWriteTimeoutTest to avoid false alarm. Summary: SimpleWriteTimeoutTest has two parts: 1) insert two large key/values to make memtable full and expect both of them are successful; 2) insert another key / value and expect it to be timed-out. Previously we also set a timeout in the first step, but this might sometimes cause false alarm. This diff makes the first two writes run without timeout setting. Test Plan: export ROCKSDB_TESTS=Time make db_test Reviewers: sdong, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19461 --- db/db_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/db/db_test.cc b/db/db_test.cc index 575768db10..025e04f24a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7012,13 +7012,14 @@ TEST(DBTest, SimpleWriteTimeoutTest) { options.min_write_buffer_number_to_merge = 3; options.max_total_wal_size = std::numeric_limits::max(); WriteOptions write_opt = WriteOptions(); - write_opt.timeout_hint_us = 500; + write_opt.timeout_hint_us = 0; DestroyAndReopen(&options); // fill the two write buffer ASSERT_OK(Put(Key(1), Key(1) + std::string(100000, 'v'), write_opt)); ASSERT_OK(Put(Key(2), Key(2) + std::string(100000, 'v'), write_opt)); // As the only two write buffers are full in this moment, the third // Put is expected to be timed-out. + write_opt.timeout_hint_us = 300; ASSERT_TRUE( Put(Key(3), Key(3) + std::string(100000, 'v'), write_opt).IsTimedOut()); }