Take snapshots once for all cf flushes (#4934)

Summary:
FlushMemTablesToOutputFiles calls FlushMemTableToOutputFile for each column family. The patch moves the take-snapshot logic to outside FlushMemTableToOutputFile so that it does it once for all the flushes. This also addresses a deadlock issue for resetting the managed snapshot of job_snapshot in the 2nd call to FlushMemTableToOutputFile.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4934

Differential Revision: D13900747

Pulled By: maysamyabandeh

fbshipit-source-id: f3cd650c5fff24cf95c1aaf8a10c149d42bf042c
This commit is contained in:
Maysam Yabandeh 2019-01-31 11:53:29 -08:00 committed by Facebook Github Bot
parent 32a6dd9a41
commit 35e5689e11
3 changed files with 23 additions and 16 deletions

View file

@ -1135,8 +1135,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->compaction_job_stats.file_prepare_write_nanos += sub_compact->compaction_job_stats.file_prepare_write_nanos +=
IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos; IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
sub_compact->compaction_job_stats.cpu_micros -= sub_compact->compaction_job_stats.cpu_micros -=
(IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
+ IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) / 1000; IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
1000;
if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) { if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
SetPerfLevel(prev_perf_level); SetPerfLevel(prev_perf_level);
} }

View file

@ -923,11 +923,13 @@ class DBImpl : public DB {
// Flush the in-memory write buffer to storage. Switches to a new // Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful. Then // log-file/memtable and writes a new descriptor iff successful. Then
// installs a new super version for the column family. // installs a new super version for the column family.
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, Status FlushMemTableToOutputFile(
const MutableCFOptions& mutable_cf_options, ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* madeProgress, JobContext* job_context, bool* madeProgress, JobContext* job_context,
SuperVersionContext* superversion_context, SuperVersionContext* superversion_context,
LogBuffer* log_buffer); std::vector<SequenceNumber>& snapshot_seqs,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, LogBuffer* log_buffer);
// Argument required by background flush thread. // Argument required by background flush thread.
struct BGFlushArg { struct BGFlushArg {

View file

@ -132,16 +132,14 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
Status DBImpl::FlushMemTableToOutputFile( Status DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* made_progress, JobContext* job_context, bool* made_progress, JobContext* job_context,
SuperVersionContext* superversion_context, LogBuffer* log_buffer) { SuperVersionContext* superversion_context,
std::vector<SequenceNumber>& snapshot_seqs,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, LogBuffer* log_buffer) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending()); assert(cfd->imm()->IsFlushPending());
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot;
SnapshotChecker* snapshot_checker;
GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
FlushJob flush_job( FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options, dbname_, cfd, immutable_db_options_, mutable_cf_options,
@ -239,14 +237,20 @@ Status DBImpl::FlushMemTablesToOutputFiles(
return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress, return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer); job_context, log_buffer);
} }
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot;
SnapshotChecker* snapshot_checker;
GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
Status status; Status status;
for (auto& arg : bg_flush_args) { for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_; ColumnFamilyData* cfd = arg.cfd_;
MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
SuperVersionContext* superversion_context = arg.superversion_context_; SuperVersionContext* superversion_context = arg.superversion_context_;
Status s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, Status s = FlushMemTableToOutputFile(
job_context, superversion_context, cfd, mutable_cf_options, made_progress, job_context,
log_buffer); superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, log_buffer);
if (!s.ok()) { if (!s.ok()) {
status = s; status = s;
if (!s.IsShutdownInProgress()) { if (!s.IsShutdownInProgress()) {