From 35e5689e113febb2289f4138c8d6a6d678b06b1f Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Thu, 31 Jan 2019 11:53:29 -0800 Subject: [PATCH] Take snapshots once for all cf flushes (#4934) Summary: FlushMemTablesToOutputFiles calls FlushMemTableToOutputFile for each column family. The patch moves the take-snapshot logic to outside FlushMemTableToOutputFile so that it does it once for all the flushes. This also addresses a deadlock issue for resetting the managed snapshot of job_snapshot in the 2nd call to FlushMemTableToOutputFile. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4934 Differential Revision: D13900747 Pulled By: maysamyabandeh fbshipit-source-id: f3cd650c5fff24cf95c1aaf8a10c149d42bf042c --- db/compaction_job.cc | 5 +++-- db/db_impl.h | 12 +++++++----- db/db_impl_compaction_flush.cc | 22 +++++++++++++--------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 58f34d165e..426ab04320 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1135,8 +1135,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { sub_compact->compaction_job_stats.file_prepare_write_nanos += IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos; sub_compact->compaction_job_stats.cpu_micros -= - (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos - + IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) / 1000; + (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos + + IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) / + 1000; if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) { SetPerfLevel(prev_perf_level); } diff --git a/db/db_impl.h b/db/db_impl.h index 4b663cf238..db6e2ed605 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -923,11 +923,13 @@ class DBImpl : public DB { // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. Then // installs a new super version for the column family. - Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, - const MutableCFOptions& mutable_cf_options, - bool* madeProgress, JobContext* job_context, - SuperVersionContext* superversion_context, - LogBuffer* log_buffer); + Status FlushMemTableToOutputFile( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + bool* madeProgress, JobContext* job_context, + SuperVersionContext* superversion_context, + std::vector& snapshot_seqs, + SequenceNumber earliest_write_conflict_snapshot, + SnapshotChecker* snapshot_checker, LogBuffer* log_buffer); // Argument required by background flush thread. struct BGFlushArg { diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index d0feb9c40d..40295b8401 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -132,16 +132,14 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { Status DBImpl::FlushMemTableToOutputFile( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, bool* made_progress, JobContext* job_context, - SuperVersionContext* superversion_context, LogBuffer* log_buffer) { + SuperVersionContext* superversion_context, + std::vector& snapshot_seqs, + SequenceNumber earliest_write_conflict_snapshot, + SnapshotChecker* snapshot_checker, LogBuffer* log_buffer) { mutex_.AssertHeld(); assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); - std::vector snapshot_seqs; - SequenceNumber earliest_write_conflict_snapshot; - SnapshotChecker* snapshot_checker; - GetSnapshotContext(job_context, &snapshot_seqs, - &earliest_write_conflict_snapshot, &snapshot_checker); FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, @@ -239,14 +237,20 @@ Status DBImpl::FlushMemTablesToOutputFiles( return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress, job_context, log_buffer); } + std::vector snapshot_seqs; + SequenceNumber earliest_write_conflict_snapshot; + SnapshotChecker* snapshot_checker; + GetSnapshotContext(job_context, &snapshot_seqs, + &earliest_write_conflict_snapshot, &snapshot_checker); Status status; for (auto& arg : bg_flush_args) { ColumnFamilyData* cfd = arg.cfd_; MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); SuperVersionContext* superversion_context = arg.superversion_context_; - Status s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, - job_context, superversion_context, - log_buffer); + Status s = FlushMemTableToOutputFile( + cfd, mutable_cf_options, made_progress, job_context, + superversion_context, snapshot_seqs, earliest_write_conflict_snapshot, + snapshot_checker, log_buffer); if (!s.ok()) { status = s; if (!s.IsShutdownInProgress()) {