From 01dcef114b05bb50e237060f483f644bb1be53e0 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 10 Mar 2014 16:14:48 -0700 Subject: [PATCH 01/10] Env to add a function to allow users to query waiting queue length Summary: Add a function to Env so that users can query the waiting queue length of each thread pool Test Plan: add a test in env_test Reviewers: haobo Reviewed By: haobo CC: dhruba, igor, yhchiang, ljin, nkg-, leveldb Differential Revision: https://reviews.facebook.net/D16755 --- HISTORY.md | 1 + hdfs/env_hdfs.h | 9 +++++++++ include/rocksdb/env.h | 8 ++++++++ util/env_posix.cc | 26 ++++++++++++++++++++------ util/env_test.cc | 13 +++++++++++++ 5 files changed, 51 insertions(+), 6 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 6f5d4d9d50..a7713c24c1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,6 +12,7 @@ * Added new option -- verify_checksums_in_compaction * Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership) Changed HashSkipListRepFactory and HashLinkListRepFactory constructor to not take SliceTransform object (use Options.prefix_extractor implicitly) +* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools ### New Features * If we find one truncated record at the end of the MANIFEST or WAL files, diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 17d8fcb2bb..303cd81cf6 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -110,6 +110,11 @@ class HdfsEnv : public Env { virtual void WaitForJoin() { posixEnv->WaitForJoin(); } + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const + override { + return posixEnv->GetThreadPoolQueueLen(pri); + } + virtual Status GetTestDirectory(std::string* path) { return posixEnv->GetTestDirectory(path); } @@ -292,6 +297,10 @@ class HdfsEnv : public Env { virtual void WaitForJoin() {} + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const { + return 0; + } + virtual Status GetTestDirectory(std::string* path) {return notsup;} virtual uint64_t NowMicros() {return 0;} diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index c96a659feb..16eb164409 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -210,6 +210,11 @@ class Env { // Wait for all threads started by StartThread to terminate. virtual void WaitForJoin() = 0; + // Get thread pool queue length for specific thrad pool. + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const { + return 0; + } + // *path is set to a temporary directory that can be used for testing. It may // or many not have just been created. The directory may or may not differ // between runs of the same process, but subsequent calls will return the @@ -702,6 +707,9 @@ class EnvWrapper : public Env { return target_->StartThread(f, a); } void WaitForJoin() { return target_->WaitForJoin(); } + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const { + return target_->GetThreadPoolQueueLen(pri); + } virtual Status GetTestDirectory(std::string* path) { return target_->GetTestDirectory(path); } diff --git a/util/env_posix.cc b/util/env_posix.cc index e019d6af09..89d8df68d0 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1206,6 +1206,8 @@ class PosixEnv : public Env { virtual void WaitForJoin(); + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; + virtual Status GetTestDirectory(std::string* result) { const char* env = getenv("TEST_TMPDIR"); if (env && env[0] != '\0') { @@ -1370,12 +1372,12 @@ class PosixEnv : public Env { class ThreadPool { public: - - ThreadPool() : - total_threads_limit_(1), - bgthreads_(0), - queue_(), - exit_all_threads_(false) { + ThreadPool() + : total_threads_limit_(1), + bgthreads_(0), + queue_(), + queue_len_(0), + exit_all_threads_(false) { PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); } @@ -1405,6 +1407,7 @@ class PosixEnv : public Env { void (*function)(void*) = queue_.front().function; void* arg = queue_.front().arg; queue_.pop_front(); + queue_len_.store(queue_.size(), std::memory_order_relaxed); PthreadCall("unlock", pthread_mutex_unlock(&mu_)); (*function)(arg); @@ -1459,6 +1462,7 @@ class PosixEnv : public Env { queue_.push_back(BGItem()); queue_.back().function = function; queue_.back().arg = arg; + queue_len_.store(queue_.size(), std::memory_order_relaxed); // always wake up at least one waiting thread. PthreadCall("signal", pthread_cond_signal(&bgsignal_)); @@ -1466,6 +1470,10 @@ class PosixEnv : public Env { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } + unsigned int GetQueueLen() const { + return queue_len_.load(std::memory_order_relaxed); + } + private: // Entry per Schedule() call struct BGItem { void* arg; void (*function)(void*); }; @@ -1476,6 +1484,7 @@ class PosixEnv : public Env { int total_threads_limit_; std::vector bgthreads_; BGQueue queue_; + std::atomic_uint queue_len_; // Queue length. Used for stats reporting bool exit_all_threads_; }; @@ -1498,6 +1507,11 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) { thread_pools_[pri].Schedule(function, arg); } +unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const { + assert(pri >= Priority::LOW && pri <= Priority::HIGH); + return thread_pools_[pri].GetQueueLen(); +} + namespace { struct StartThreadState { void (*user_function)(void*); diff --git a/util/env_test.cc b/util/env_test.cc index a442e3a5cd..e17027a399 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -172,17 +172,30 @@ TEST(EnvPosixTest, TwoPools) { env_->SetBackgroundThreads(kLowPoolSize); env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + // schedule same number of jobs in each pool for (int i = 0; i < kJobs; i++) { env_->Schedule(&CB::Run, &low_pool_job); env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH); } + // Wait a short while for the jobs to be dispatched. + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(kJobs - kLowPoolSize, env_->GetThreadPoolQueueLen()); + ASSERT_EQ(kJobs - kLowPoolSize, + env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ(kJobs - kHighPoolSize, + env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); // wait for all jobs to finish while (low_pool_job.NumFinished() < kJobs || high_pool_job.NumFinished() < kJobs) { env_->SleepForMicroseconds(kDelayMicros); } + + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); } bool IsSingleVarint(const std::string& s) { From cb9802168f2c84a24af87b0dc08e0a4f9710c55d Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 11 Mar 2014 11:27:19 -0700 Subject: [PATCH 02/10] Add a comment after SignalAll() Summary: Having code after SignalAll has already caused 2 bugs. Let's make sure this doesn't happen again. Test Plan: no test Reviewers: sdong, dhruba, haobo Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16785 --- db/db_impl.cc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/db/db_impl.cc b/db/db_impl.cc index 65e8e437c1..ca91b5e82a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1924,6 +1924,10 @@ void DBImpl::BackgroundCallFlush() { MaybeScheduleFlushOrCompaction(); } bg_cv_.SignalAll(); + // IMPORTANT: there should be no code after calling SignalAll. This call may + // signal the DB destructor that it's OK to proceed with destruction. In + // that case, all DB variables will be dealloacated and referencing them + // will cause trouble. } log_buffer.FlushBufferToLog(); } @@ -1993,6 +1997,10 @@ void DBImpl::BackgroundCallCompaction() { MaybeScheduleFlushOrCompaction(); } bg_cv_.SignalAll(); + // IMPORTANT: there should be no code after calling SignalAll. This call may + // signal the DB destructor that it's OK to proceed with destruction. In + // that case, all DB variables will be dealloacated and referencing them + // will cause trouble. } log_buffer.FlushBufferToLog(); } From 6c66bc08d9fdee1b0a3d1d3e6df557b02fcd0e7c Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 11 Mar 2014 11:20:08 -0700 Subject: [PATCH 03/10] Temp Fix of LogBuffer flushing Summary: To temp fix the log buffer flushing. Flush the buffer inside the lock. Clean the trunk before we find an eventual fix. Test Plan: make all check Reviewers: haobo, igor Reviewed By: igor CC: ljin, leveldb, yhchiang Differential Revision: https://reviews.facebook.net/D16791 --- db/db_impl.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index ca91b5e82a..49fd3c9fb7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1923,13 +1923,13 @@ void DBImpl::BackgroundCallFlush() { if (madeProgress) { MaybeScheduleFlushOrCompaction(); } + log_buffer.FlushBufferToLog(); bg_cv_.SignalAll(); // IMPORTANT: there should be no code after calling SignalAll. This call may // signal the DB destructor that it's OK to proceed with destruction. In // that case, all DB variables will be dealloacated and referencing them // will cause trouble. } - log_buffer.FlushBufferToLog(); } @@ -1996,13 +1996,13 @@ void DBImpl::BackgroundCallCompaction() { if (madeProgress) { MaybeScheduleFlushOrCompaction(); } + log_buffer.FlushBufferToLog(); bg_cv_.SignalAll(); // IMPORTANT: there should be no code after calling SignalAll. This call may // signal the DB destructor that it's OK to proceed with destruction. In // that case, all DB variables will be dealloacated and referencing them // will cause trouble. } - log_buffer.FlushBufferToLog(); } Status DBImpl::BackgroundCompaction(bool* madeProgress, From 02dab3be11b9c331fe18ad4f85246071281c0402 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Tue, 11 Mar 2014 13:44:33 -0700 Subject: [PATCH 04/10] fix db_stress test Summary: Fix the db_stress test, let is run with HashSkipList for real Test Plan: python tools/db_crashtest.py python tools/db_crashtest2.py Reviewers: igor, haobo Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D16773 --- tools/db_crashtest.py | 25 ++++++++++++++++-------- tools/db_crashtest2.py | 2 ++ tools/db_stress.cc | 44 ++++++++++++++++++++++++++++-------------- 3 files changed, 48 insertions(+), 23 deletions(-) diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index b3d08f8880..e4774e5576 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -93,6 +93,8 @@ def main(argv): --max_background_compactions=20 --max_bytes_for_level_base=10485760 --filter_deletes=%s + --memtablerep=prefix_hash + --prefix_size=2 """ % (ops_per_thread, threads, write_buf_size, @@ -108,16 +110,23 @@ def main(argv): print("Running db_stress with pid=%d: %s\n\n" % (child.pid, cmd)) + stop_early = False while time.time() < killtime: - time.sleep(10) + if child.poll() is not None: + print("WARNING: db_stress ended before kill: exitcode=%d\n" + % child.returncode) + stop_early = True + break + time.sleep(1) - if child.poll() is not None: - print("WARNING: db_stress ended before kill: exitcode=%d\n" - % child.returncode) - else: - child.kill() - print("KILLED %d\n" % child.pid) - time.sleep(1) # time to stabilize after a kill + if not stop_early: + if child.poll() is not None: + print("WARNING: db_stress ended before kill: exitcode=%d\n" + % child.returncode) + else: + child.kill() + print("KILLED %d\n" % child.pid) + time.sleep(1) # time to stabilize after a kill while True: line = child.stderr.readline().strip() diff --git a/tools/db_crashtest2.py b/tools/db_crashtest2.py index 8b7ce969df..bb89491756 100644 --- a/tools/db_crashtest2.py +++ b/tools/db_crashtest2.py @@ -107,6 +107,8 @@ def main(argv): --max_background_compactions=20 --max_bytes_for_level_base=10485760 --filter_deletes=%s + --memtablerep=prefix_hash + --prefix_size=2 %s """ % (random.randint(0, 1), threads, diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 4d02bcdc5a..06df067019 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -328,17 +328,17 @@ enum RepFactory StringToRepFactory(const char* ctype) { return kSkipList; } static enum RepFactory FLAGS_rep_factory; -DEFINE_string(memtablerep, "skip_list", ""); +DEFINE_string(memtablerep, "prefix_hash", ""); static bool ValidatePrefixSize(const char* flagname, int32_t value) { - if (value < 0 || value>=2000000000) { - fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n", + if (value < 0 || value > 8) { + fprintf(stderr, "Invalid value for --%s: %d. 0 <= PrefixSize <= 8\n", flagname, value); return false; } return true; } -DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipListRep"); +DEFINE_int32(prefix_size, 2, "Control the prefix size for HashSkipListRep"); static const bool FLAGS_prefix_size_dummy = google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize); @@ -931,15 +931,15 @@ class StressTest { return s; } - // Given a prefix P, this does prefix scans for "0"+P, "1"+P,..."9"+P - // in the same snapshot. Each of these 10 scans returns a series of - // values; each series should be the same length, and it is verified - // for each index i that all the i'th values are of the form "0"+V, - // "1"+V,..."9"+V. + // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P + // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes + // of the key. Each of these 10 scans returns a series of values; + // each series should be the same length, and it is verified for each + // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V. // ASSUMES that MultiPut was used to put (K, V) Status MultiPrefixScan(ThreadState* thread, const ReadOptions& readoptions, - const Slice& prefix) { + const Slice& key) { std::string prefixes[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; Slice prefix_slices[10]; @@ -948,8 +948,9 @@ class StressTest { Iterator* iters[10]; Status s = Status::OK(); for (int i = 0; i < 10; i++) { - prefixes[i] += prefix.ToString(); - prefix_slices[i] = prefixes[i]; + prefixes[i] += key.ToString(); + prefixes[i].resize(FLAGS_prefix_size); + prefix_slices[i] = Slice(prefixes[i]); readoptionscopy[i] = readoptions; readoptionscopy[i].prefix = &prefix_slices[i]; readoptionscopy[i].snapshot = snapshot; @@ -980,7 +981,7 @@ class StressTest { for (int i = 0; i < 10; i++) { if (values[i] != values[0]) { fprintf(stderr, "error : inconsistent values for prefix %s: %s, %s\n", - prefix.ToString().c_str(), values[0].c_str(), + prefixes[i].c_str(), values[0].c_str(), values[i].c_str()); // we continue after error rather than exiting so that we can // find more errors if any @@ -1016,6 +1017,7 @@ class StressTest { const Snapshot* snapshot = db_->GetSnapshot(); ReadOptions readoptionscopy = readoptions; readoptionscopy.snapshot = snapshot; + readoptionscopy.prefix_seek = FLAGS_prefix_size > 0; unique_ptr iter(db_->NewIterator(readoptionscopy)); iter->Seek(key); @@ -1098,8 +1100,8 @@ class StressTest { // keys are longs (e.g., 8 bytes), so we let prefixes be // everything except the last byte. So there will be 2^8=256 // keys per prefix. - Slice prefix = Slice(key.data(), key.size() - 1); if (!FLAGS_test_batches_snapshots) { + Slice prefix = Slice(key.data(), FLAGS_prefix_size); read_opts.prefix = &prefix; Iterator* iter = db_->NewIterator(read_opts); int count = 0; @@ -1115,7 +1117,7 @@ class StressTest { } delete iter; } else { - MultiPrefixScan(thread, read_opts, prefix); + MultiPrefixScan(thread, read_opts, key); } read_opts.prefix = nullptr; } else if (prefixBound <= prob_op && prob_op < writeBound) { @@ -1509,6 +1511,18 @@ int main(int argc, char** argv) { // max number of concurrent compactions. FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions); + if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size <= 0) { + fprintf(stderr, + "Error: prefixpercent is non-zero while prefix_size is " + "not positive!\n"); + exit(1); + } + if (FLAGS_test_batches_snapshots && FLAGS_prefix_size <= 0) { + fprintf(stderr, + "Error: please specify prefix_size for " + "test_batches_snapshots test!\n"); + exit(1); + } if ((FLAGS_readpercent + FLAGS_prefixpercent + FLAGS_writepercent + FLAGS_delpercent + FLAGS_iterpercent) != 100) { fprintf(stderr, From bd45633b7105c469773333bafffe9079ea2df583 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 11 Mar 2014 10:36:30 -0700 Subject: [PATCH 05/10] Fix data race against logging data structure because of LogBuffer Summary: @igor pointed out that there is a potential data race because of the way we use the newly introduced LogBuffer. After "bg_compaction_scheduled_--" or "bg_flush_scheduled_--", they can both become 0. As soon as the lock is released after that, DBImpl's deconstructor can go ahead and deconstruct all the states inside DB, including the info_log object hold in a shared pointer of the options object it keeps. At that point it is not safe anymore to continue using the info logger to write the delayed logs. With the patch, lock is released temporarily for log buffer to be flushed before "bg_compaction_scheduled_--" or "bg_flush_scheduled_--". In order to make sure we don't miss any pending flush or compaction, a new flag bg_schedule_needed_ is added, which is set to be true if there is a pending flush or compaction but not scheduled because of the max thread limit. If the flag is set to be true, the scheduling function will be called before compaction or flush thread finishes. Thanks @igor for this finding! Test Plan: make all check Reviewers: haobo, igor Reviewed By: haobo CC: dhruba, ljin, yhchiang, igor, leveldb Differential Revision: https://reviews.facebook.net/D16767 --- db/db_impl.cc | 60 +++++++++++++++++++++++++++++++++++------------ db/db_impl.h | 4 ++++ util/log_buffer.h | 2 ++ 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 49fd3c9fb7..fd33693534 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -276,6 +276,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) super_version_number_(0), local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), tmp_batch_(), + bg_schedule_needed_(false), bg_compaction_scheduled_(0), bg_manual_only_(0), bg_flush_scheduled_(0), @@ -1830,17 +1831,21 @@ Status DBImpl::TEST_WaitForCompact() { void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); + bg_schedule_needed_ = false; if (bg_work_gate_closed_) { // gate closed for backgrond work } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { bool is_flush_pending = imm_.IsFlushPending(); - if (is_flush_pending && - (bg_flush_scheduled_ < options_.max_background_flushes)) { - // memtable flush needed - bg_flush_scheduled_++; - env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); + if (is_flush_pending) { + if (bg_flush_scheduled_ < options_.max_background_flushes) { + // memtable flush needed + bg_flush_scheduled_++; + env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); + } else { + bg_schedule_needed_ = true; + } } // Schedule BGWorkCompaction if there's a compaction pending (or a memtable @@ -1850,11 +1855,13 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { if ((manual_compaction_ || versions_->current()->NeedsCompaction() || (is_flush_pending && (options_.max_background_flushes <= 0))) && - bg_compaction_scheduled_ < options_.max_background_compactions && (!bg_manual_only_ || manual_compaction_)) { - - bg_compaction_scheduled_++; - env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); + if (bg_compaction_scheduled_ < options_.max_background_compactions) { + bg_compaction_scheduled_++; + env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); + } else { + bg_schedule_needed_ = true; + } } } } @@ -1912,15 +1919,26 @@ void DBImpl::BackgroundCallFlush() { // to delete all obsolete files and we force FindObsoleteFiles() FindObsoleteFiles(deletion_state, !s.ok()); // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { + if (deletion_state.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { mutex_.Unlock(); + // Have to flush the info logs before bg_flush_scheduled_-- + // because if bg_flush_scheduled_ becomes 0 and the lock is + // released, the deconstructor of DB can kick in and destroy all the + // states of DB so info_log might not be available after that point. + // It also applies to access other states that DB owns. log_buffer.FlushBufferToLog(); - PurgeObsoleteFiles(deletion_state); + if (deletion_state.HaveSomethingToDelete()) { + PurgeObsoleteFiles(deletion_state); + } mutex_.Lock(); } bg_flush_scheduled_--; - if (madeProgress) { + // Any time the mutex is released After finding the work to do, another + // thread might execute MaybeScheduleFlushOrCompaction(). It is possible + // that there is a pending job but it is not scheduled because of the + // max thread limit. + if (madeProgress || bg_schedule_needed_) { MaybeScheduleFlushOrCompaction(); } log_buffer.FlushBufferToLog(); @@ -1979,10 +1997,17 @@ void DBImpl::BackgroundCallCompaction() { FindObsoleteFiles(deletion_state, !s.ok()); // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { + if (deletion_state.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { mutex_.Unlock(); + // Have to flush the info logs before bg_compaction_scheduled_-- + // because if bg_flush_scheduled_ becomes 0 and the lock is + // released, the deconstructor of DB can kick in and destroy all the + // states of DB so info_log might not be available after that point. + // It also applies to access other states that DB owns. log_buffer.FlushBufferToLog(); - PurgeObsoleteFiles(deletion_state); + if (deletion_state.HaveSomethingToDelete()) { + PurgeObsoleteFiles(deletion_state); + } mutex_.Lock(); } @@ -1993,7 +2018,12 @@ void DBImpl::BackgroundCallCompaction() { // Previous compaction may have produced too many files in a level, // So reschedule another compaction if we made progress in the // last compaction. - if (madeProgress) { + // + // Also, any time the mutex is released After finding the work to do, + // another thread might execute MaybeScheduleFlushOrCompaction(). It is + // possible that there is a pending job but it is not scheduled because of + // the max thread limit. + if (madeProgress || bg_schedule_needed_) { MaybeScheduleFlushOrCompaction(); } log_buffer.FlushBufferToLog(); diff --git a/db/db_impl.h b/db/db_impl.h index 96e3f1ea31..e42848d11c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -454,6 +454,10 @@ class DBImpl : public DB { // part of ongoing compactions. std::set pending_outputs_; + // At least one compaction or flush job is pending but not yet scheduled + // because of the max background thread limit. + bool bg_schedule_needed_; + // count how many background compactions are running or have been scheduled int bg_compaction_scheduled_; diff --git a/util/log_buffer.h b/util/log_buffer.h index 76503a084a..8ebe92e034 100644 --- a/util/log_buffer.h +++ b/util/log_buffer.h @@ -23,6 +23,8 @@ class LogBuffer { // Add a log entry to the buffer. void AddLogToBuffer(const char* format, va_list ap); + size_t IsEmpty() const { return logs_.empty(); } + // Flush all buffered log to the info log. void FlushBufferToLog(); From 86ba3e24e3d324bd0db5b8b24dc07ff30ed8cf02 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Tue, 11 Mar 2014 16:33:42 -0700 Subject: [PATCH 06/10] make assert based on FLAGS_prefix_size Summary: as title Test Plan: running python tools/db_crashtest.py Reviewers: igor Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D16803 --- tools/db_stress.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 06df067019..ca75bacbd6 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1097,19 +1097,19 @@ class StressTest { } } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) { // OPERATION prefix scan - // keys are longs (e.g., 8 bytes), so we let prefixes be - // everything except the last byte. So there will be 2^8=256 - // keys per prefix. + // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are + // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will + // be 2 ^ ((8 - FLAGS_prefix_size * 8) combinations. if (!FLAGS_test_batches_snapshots) { Slice prefix = Slice(key.data(), FLAGS_prefix_size); read_opts.prefix = &prefix; Iterator* iter = db_->NewIterator(read_opts); - int count = 0; + int64_t count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { assert(iter->key().starts_with(prefix)); - count++; + ++count; } - assert(count <= 256); + assert(count <= (1 << ((8 - FLAGS_prefix_size) * 8))); if (iter->status().ok()) { thread->stats.AddPrefixes(1, count); } else { From 839c8ecfcd486f0db82ecb755a137ad95909966f Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 11 Mar 2014 18:00:46 -0700 Subject: [PATCH 07/10] Fix bad merge of D16791 and D16767 Summary: A bad Auto-Merge caused log buffer is flushed twice. Remove the unintended one. Test Plan: Should already be tested (the code looks the same as when I ran unit tests). Reviewers: haobo, igor Reviewed By: haobo CC: ljin, yhchiang, leveldb Differential Revision: https://reviews.facebook.net/D16821 --- db/db_test.cc | 33 +++++++++++-- db/transaction_log_impl.cc | 78 ++++++++++++++++++++----------- db/transaction_log_impl.h | 5 ++ include/rocksdb/transaction_log.h | 6 +++ tools/db_repl_stress.cc | 17 ++++++- 5 files changed, 107 insertions(+), 32 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 2a8cee4e7c..c50822d739 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4543,16 +4543,43 @@ TEST(DBTest, TransactionLogIterator) { { auto iter = OpenTransactionLogIter(0); ExpectRecords(3, iter); - } - Reopen(&options); - env_->SleepForMicroseconds(2 * 1000 * 1000);{ + assert(!iter->IsObsolete()); + iter->Next(); + assert(!iter->Valid()); + assert(!iter->IsObsolete()); + assert(iter->status().ok()); + + Reopen(&options); + env_->SleepForMicroseconds(2 * 1000 * 1000); Put("key4", DummyString(1024)); Put("key5", DummyString(1024)); Put("key6", DummyString(1024)); + + iter->Next(); + assert(!iter->Valid()); + assert(iter->IsObsolete()); + assert(iter->status().ok()); } { auto iter = OpenTransactionLogIter(0); ExpectRecords(6, iter); + assert(!iter->IsObsolete()); + iter->Next(); + assert(!iter->Valid()); + assert(!iter->IsObsolete()); + assert(iter->status().ok()); + + Put("key7", DummyString(1024)); + iter->Next(); + assert(iter->Valid()); + assert(iter->status().ok()); + + dbfull()->Flush(FlushOptions()); + Put("key8", DummyString(1024)); + iter->Next(); + assert(!iter->Valid()); + assert(iter->IsObsolete()); + assert(iter->status().ok()); } } while (ChangeCompactOptions()); } diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 36b8932a5c..f039c3426a 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -21,6 +21,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( files_(std::move(files)), started_(false), isValid_(false), + is_obsolete_(false), currentFileIndex_(0), currentBatchSeq_(0), currentLastSeq_(0), @@ -69,14 +70,15 @@ bool TransactionLogIteratorImpl::Valid() { return started_ && isValid_; } -bool TransactionLogIteratorImpl::RestrictedRead( - Slice* record, - std::string* scratch) { - // Don't read if no more complete entries to read from logs - if (currentLastSeq_ >= dbimpl_->GetLatestSequenceNumber()) { - return false; +bool TransactionLogIteratorImpl::RestrictedRead(Slice* record, + std::string* scratch) { + bool ret = currentLogReader_->ReadRecord(record, scratch); + + if (!reporter_.last_status.ok()) { + currentStatus_ = reporter_.last_status; } - return currentLogReader_->ReadRecord(record, scratch); + + return ret; } void TransactionLogIteratorImpl::SeekToStartSequence( @@ -86,6 +88,7 @@ void TransactionLogIteratorImpl::SeekToStartSequence( Slice record; started_ = false; isValid_ = false; + is_obsolete_ = false; if (files_->size() <= startFileIndex) { return; } @@ -94,6 +97,18 @@ void TransactionLogIteratorImpl::SeekToStartSequence( currentStatus_ = s; return; } + auto latest_seq_num = dbimpl_->GetLatestSequenceNumber(); + if (startingSequenceNumber_ > latest_seq_num) { + if (strict) { + currentStatus_ = Status::Corruption("Gap in sequence number. Could not " + "seek to required sequence number"); + reporter_.Info(currentStatus_.ToString().c_str()); + } else { + // isValid_ is false; + return; + } + } + while (RestrictedRead(&record, &scratch)) { if (record.size() < 12) { reporter_.Corruption( @@ -123,11 +138,11 @@ void TransactionLogIteratorImpl::SeekToStartSequence( // only file. Otherwise log the error and let the iterator return next entry // If strict is set, we want to seek exactly till the start sequence and it // should have been present in the file we scanned above - if (strict) { + if (strict || files_->size() == 1) { currentStatus_ = Status::Corruption("Gap in sequence number. Could not " "seek to required sequence number"); reporter_.Info(currentStatus_.ToString().c_str()); - } else if (files_->size() != 1) { + } else { currentStatus_ = Status::Corruption("Start sequence was not found, " "skipping to the next available"); reporter_.Info(currentStatus_.ToString().c_str()); @@ -149,11 +164,30 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { // Runs every time until we can seek to the start sequence return SeekToStartSequence(); } - while(true) { + + is_obsolete_ = false; + auto latest_seq_num = dbimpl_->GetLatestSequenceNumber(); + if (currentLastSeq_ >= latest_seq_num) { + isValid_ = false; + return; + } + + bool first = true; + while (currentFileIndex_ < files_->size()) { + if (!first) { + Status status =OpenLogReader(files_->at(currentFileIndex_).get()); + if (!status.ok()) { + isValid_ = false; + currentStatus_ = status; + return; + } + } + first = false; assert(currentLogReader_); if (currentLogReader_->IsEOF()) { currentLogReader_->UnmarkEOF(); } + while (RestrictedRead(&record, &scratch)) { if (record.size() < 12) { reporter_.Corruption( @@ -171,26 +205,14 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { return; } } - // Open the next file - if (currentFileIndex_ < files_->size() - 1) { - ++currentFileIndex_; - Status status =OpenLogReader(files_->at(currentFileIndex_).get()); - if (!status.ok()) { - isValid_ = false; - currentStatus_ = status; - return; - } - } else { - isValid_ = false; - if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) { - currentStatus_ = Status::OK(); - } else { - currentStatus_ = Status::Corruption("NO MORE DATA LEFT"); - } - return; - } + ++currentFileIndex_; } + + // Read all the files but cannot find next record expected. + // TODO(sdong): support to auto fetch new log files from DB and continue. + isValid_ = false; + is_obsolete_ = true; } bool TransactionLogIteratorImpl::IsBatchExpected( diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index 6454d89e76..4c85379b65 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -19,7 +19,9 @@ namespace rocksdb { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; + Status last_status; virtual void Corruption(size_t bytes, const Status& s) { + last_status = s; Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str()); } virtual void Info(const char* s) { @@ -74,6 +76,8 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { virtual bool Valid(); + virtual bool IsObsolete() override { return is_obsolete_; } + virtual void Next(); virtual Status status(); @@ -89,6 +93,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { std::unique_ptr files_; bool started_; bool isValid_; // not valid when it starts of. + bool is_obsolete_; Status currentStatus_; size_t currentFileIndex_; std::unique_ptr currentBatch_; diff --git a/include/rocksdb/transaction_log.h b/include/rocksdb/transaction_log.h index 30443bba55..1bb259b1fd 100644 --- a/include/rocksdb/transaction_log.h +++ b/include/rocksdb/transaction_log.h @@ -73,6 +73,12 @@ class TransactionLogIterator { // Can read data from a valid iterator. virtual bool Valid() = 0; + // IsObsolete() returns true if new log files were created. This usually + // means that the user needs to close the current iterator and create a new + // one to get the newest updates. It should happen only when mem tables are + // flushed. + virtual bool IsObsolete() = 0; + // Moves the iterator to the next WriteBatch. // REQUIRES: Valid() to be true. virtual void Next() = 0; diff --git a/tools/db_repl_stress.cc b/tools/db_repl_stress.cc index 27cb6d5abf..e336908ce4 100644 --- a/tools/db_repl_stress.cc +++ b/tools/db_repl_stress.cc @@ -67,8 +67,21 @@ static void ReplicationThreadBody(void* arg) { } } fprintf(stderr, "Refreshing iterator\n"); - for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) { + for (; !iter->IsObsolete(); iter->Next()) { + if (!iter->Valid()) { + if (t->stop.Acquire_Load() == nullptr) { + return; + } + // need to wait for new rows. + continue; + } + BatchResult res = iter->GetBatch(); + if (!iter->status().ok()) { + fprintf(stderr, "Corruption reported when reading seq no. b/w %ld", + static_cast(currentSeqNum)); + exit(1); + } if (res.sequence != currentSeqNum) { fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", @@ -76,6 +89,8 @@ static void ReplicationThreadBody(void* arg) { (long)res.sequence); exit(1); } + t->no_read++; + currentSeqNum++; } } } From 5ba028c179701843a3c3e1d0b10919c04c233912 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 12 Mar 2014 09:31:06 -0700 Subject: [PATCH 08/10] DBStress cleanup Summary: *) fixed the comment *) constant 1 was not casted to 64-bit, which (I think) might cause overflow if we shift it too much *) default prefix size to be 7, like it was before Test Plan: compiled Reviewers: ljin Reviewed By: ljin CC: leveldb Differential Revision: https://reviews.facebook.net/D16827 --- tools/db_crashtest.py | 2 +- tools/db_crashtest2.py | 2 +- tools/db_stress.cc | 8 +++++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index e4774e5576..16603cfd60 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -94,7 +94,7 @@ def main(argv): --max_bytes_for_level_base=10485760 --filter_deletes=%s --memtablerep=prefix_hash - --prefix_size=2 + --prefix_size=7 """ % (ops_per_thread, threads, write_buf_size, diff --git a/tools/db_crashtest2.py b/tools/db_crashtest2.py index bb89491756..6a28a0ba40 100644 --- a/tools/db_crashtest2.py +++ b/tools/db_crashtest2.py @@ -108,7 +108,7 @@ def main(argv): --max_bytes_for_level_base=10485760 --filter_deletes=%s --memtablerep=prefix_hash - --prefix_size=2 + --prefix_size=7 %s """ % (random.randint(0, 1), threads, diff --git a/tools/db_stress.cc b/tools/db_stress.cc index ca75bacbd6..c3b17a0e8c 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -338,7 +338,7 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) { } return true; } -DEFINE_int32(prefix_size, 2, "Control the prefix size for HashSkipListRep"); +DEFINE_int32(prefix_size, 7, "Control the prefix size for HashSkipListRep"); static const bool FLAGS_prefix_size_dummy = google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize); @@ -1099,7 +1099,8 @@ class StressTest { // OPERATION prefix scan // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will - // be 2 ^ ((8 - FLAGS_prefix_size * 8) combinations. + // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same + // prefix if (!FLAGS_test_batches_snapshots) { Slice prefix = Slice(key.data(), FLAGS_prefix_size); read_opts.prefix = &prefix; @@ -1109,7 +1110,8 @@ class StressTest { assert(iter->key().starts_with(prefix)); ++count; } - assert(count <= (1 << ((8 - FLAGS_prefix_size) * 8))); + assert(count <= + (static_cast(1) << ((8 - FLAGS_prefix_size) * 8))); if (iter->status().ok()) { thread->stats.AddPrefixes(1, count); } else { From 2b95dc1542a5fd0f9e3a8390d9124c852f50ff2d Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 12 Mar 2014 09:37:43 -0700 Subject: [PATCH 09/10] Revert "Fix bad merge of D16791 and D16767" This reverts commit 839c8ecfcd486f0db82ecb755a137ad95909966f. --- db/db_test.cc | 33 ++----------- db/transaction_log_impl.cc | 80 +++++++++++-------------------- db/transaction_log_impl.h | 5 -- include/rocksdb/transaction_log.h | 6 --- tools/db_repl_stress.cc | 17 +------ 5 files changed, 33 insertions(+), 108 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index c50822d739..2a8cee4e7c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4543,43 +4543,16 @@ TEST(DBTest, TransactionLogIterator) { { auto iter = OpenTransactionLogIter(0); ExpectRecords(3, iter); - assert(!iter->IsObsolete()); - iter->Next(); - assert(!iter->Valid()); - assert(!iter->IsObsolete()); - assert(iter->status().ok()); - - Reopen(&options); - env_->SleepForMicroseconds(2 * 1000 * 1000); + } + Reopen(&options); + env_->SleepForMicroseconds(2 * 1000 * 1000);{ Put("key4", DummyString(1024)); Put("key5", DummyString(1024)); Put("key6", DummyString(1024)); - - iter->Next(); - assert(!iter->Valid()); - assert(iter->IsObsolete()); - assert(iter->status().ok()); } { auto iter = OpenTransactionLogIter(0); ExpectRecords(6, iter); - assert(!iter->IsObsolete()); - iter->Next(); - assert(!iter->Valid()); - assert(!iter->IsObsolete()); - assert(iter->status().ok()); - - Put("key7", DummyString(1024)); - iter->Next(); - assert(iter->Valid()); - assert(iter->status().ok()); - - dbfull()->Flush(FlushOptions()); - Put("key8", DummyString(1024)); - iter->Next(); - assert(!iter->Valid()); - assert(iter->IsObsolete()); - assert(iter->status().ok()); } } while (ChangeCompactOptions()); } diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index f039c3426a..36b8932a5c 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -21,7 +21,6 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( files_(std::move(files)), started_(false), isValid_(false), - is_obsolete_(false), currentFileIndex_(0), currentBatchSeq_(0), currentLastSeq_(0), @@ -70,15 +69,14 @@ bool TransactionLogIteratorImpl::Valid() { return started_ && isValid_; } -bool TransactionLogIteratorImpl::RestrictedRead(Slice* record, - std::string* scratch) { - bool ret = currentLogReader_->ReadRecord(record, scratch); - - if (!reporter_.last_status.ok()) { - currentStatus_ = reporter_.last_status; +bool TransactionLogIteratorImpl::RestrictedRead( + Slice* record, + std::string* scratch) { + // Don't read if no more complete entries to read from logs + if (currentLastSeq_ >= dbimpl_->GetLatestSequenceNumber()) { + return false; } - - return ret; + return currentLogReader_->ReadRecord(record, scratch); } void TransactionLogIteratorImpl::SeekToStartSequence( @@ -88,7 +86,6 @@ void TransactionLogIteratorImpl::SeekToStartSequence( Slice record; started_ = false; isValid_ = false; - is_obsolete_ = false; if (files_->size() <= startFileIndex) { return; } @@ -97,18 +94,6 @@ void TransactionLogIteratorImpl::SeekToStartSequence( currentStatus_ = s; return; } - auto latest_seq_num = dbimpl_->GetLatestSequenceNumber(); - if (startingSequenceNumber_ > latest_seq_num) { - if (strict) { - currentStatus_ = Status::Corruption("Gap in sequence number. Could not " - "seek to required sequence number"); - reporter_.Info(currentStatus_.ToString().c_str()); - } else { - // isValid_ is false; - return; - } - } - while (RestrictedRead(&record, &scratch)) { if (record.size() < 12) { reporter_.Corruption( @@ -138,11 +123,11 @@ void TransactionLogIteratorImpl::SeekToStartSequence( // only file. Otherwise log the error and let the iterator return next entry // If strict is set, we want to seek exactly till the start sequence and it // should have been present in the file we scanned above - if (strict || files_->size() == 1) { + if (strict) { currentStatus_ = Status::Corruption("Gap in sequence number. Could not " "seek to required sequence number"); reporter_.Info(currentStatus_.ToString().c_str()); - } else { + } else if (files_->size() != 1) { currentStatus_ = Status::Corruption("Start sequence was not found, " "skipping to the next available"); reporter_.Info(currentStatus_.ToString().c_str()); @@ -164,30 +149,11 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { // Runs every time until we can seek to the start sequence return SeekToStartSequence(); } - - is_obsolete_ = false; - auto latest_seq_num = dbimpl_->GetLatestSequenceNumber(); - if (currentLastSeq_ >= latest_seq_num) { - isValid_ = false; - return; - } - - bool first = true; - while (currentFileIndex_ < files_->size()) { - if (!first) { - Status status =OpenLogReader(files_->at(currentFileIndex_).get()); - if (!status.ok()) { - isValid_ = false; - currentStatus_ = status; - return; - } - } - first = false; + while(true) { assert(currentLogReader_); if (currentLogReader_->IsEOF()) { currentLogReader_->UnmarkEOF(); } - while (RestrictedRead(&record, &scratch)) { if (record.size() < 12) { reporter_.Corruption( @@ -205,14 +171,26 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { return; } } - // Open the next file - ++currentFileIndex_; - } - // Read all the files but cannot find next record expected. - // TODO(sdong): support to auto fetch new log files from DB and continue. - isValid_ = false; - is_obsolete_ = true; + // Open the next file + if (currentFileIndex_ < files_->size() - 1) { + ++currentFileIndex_; + Status status =OpenLogReader(files_->at(currentFileIndex_).get()); + if (!status.ok()) { + isValid_ = false; + currentStatus_ = status; + return; + } + } else { + isValid_ = false; + if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) { + currentStatus_ = Status::OK(); + } else { + currentStatus_ = Status::Corruption("NO MORE DATA LEFT"); + } + return; + } + } } bool TransactionLogIteratorImpl::IsBatchExpected( diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index 4c85379b65..6454d89e76 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -19,9 +19,7 @@ namespace rocksdb { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; - Status last_status; virtual void Corruption(size_t bytes, const Status& s) { - last_status = s; Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str()); } virtual void Info(const char* s) { @@ -76,8 +74,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { virtual bool Valid(); - virtual bool IsObsolete() override { return is_obsolete_; } - virtual void Next(); virtual Status status(); @@ -93,7 +89,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { std::unique_ptr files_; bool started_; bool isValid_; // not valid when it starts of. - bool is_obsolete_; Status currentStatus_; size_t currentFileIndex_; std::unique_ptr currentBatch_; diff --git a/include/rocksdb/transaction_log.h b/include/rocksdb/transaction_log.h index 1bb259b1fd..30443bba55 100644 --- a/include/rocksdb/transaction_log.h +++ b/include/rocksdb/transaction_log.h @@ -73,12 +73,6 @@ class TransactionLogIterator { // Can read data from a valid iterator. virtual bool Valid() = 0; - // IsObsolete() returns true if new log files were created. This usually - // means that the user needs to close the current iterator and create a new - // one to get the newest updates. It should happen only when mem tables are - // flushed. - virtual bool IsObsolete() = 0; - // Moves the iterator to the next WriteBatch. // REQUIRES: Valid() to be true. virtual void Next() = 0; diff --git a/tools/db_repl_stress.cc b/tools/db_repl_stress.cc index e336908ce4..27cb6d5abf 100644 --- a/tools/db_repl_stress.cc +++ b/tools/db_repl_stress.cc @@ -67,21 +67,8 @@ static void ReplicationThreadBody(void* arg) { } } fprintf(stderr, "Refreshing iterator\n"); - for (; !iter->IsObsolete(); iter->Next()) { - if (!iter->Valid()) { - if (t->stop.Acquire_Load() == nullptr) { - return; - } - // need to wait for new rows. - continue; - } - + for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) { BatchResult res = iter->GetBatch(); - if (!iter->status().ok()) { - fprintf(stderr, "Corruption reported when reading seq no. b/w %ld", - static_cast(currentSeqNum)); - exit(1); - } if (res.sequence != currentSeqNum) { fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", @@ -89,8 +76,6 @@ static void ReplicationThreadBody(void* arg) { (long)res.sequence); exit(1); } - t->no_read++; - currentSeqNum++; } } } From 45ad75db80f31a44900ed5705a2e7527e2f6c02f Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 12 Mar 2014 09:38:59 -0700 Subject: [PATCH 10/10] Correct version of D16821 --- db/db_impl.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index fd33693534..b36067e1d7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1941,7 +1941,6 @@ void DBImpl::BackgroundCallFlush() { if (madeProgress || bg_schedule_needed_) { MaybeScheduleFlushOrCompaction(); } - log_buffer.FlushBufferToLog(); bg_cv_.SignalAll(); // IMPORTANT: there should be no code after calling SignalAll. This call may // signal the DB destructor that it's OK to proceed with destruction. In @@ -2026,7 +2025,6 @@ void DBImpl::BackgroundCallCompaction() { if (madeProgress || bg_schedule_needed_) { MaybeScheduleFlushOrCompaction(); } - log_buffer.FlushBufferToLog(); bg_cv_.SignalAll(); // IMPORTANT: there should be no code after calling SignalAll. This call may // signal the DB destructor that it's OK to proceed with destruction. In