diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 7c6c8b1410..db34c737f5 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -327,7 +327,7 @@ Status CompactionJob::Run() { } else { status = ProcessPrefixBatches(cfd, &imm_micros, input.get(), compaction_filter_v2); - } // checking for compaction filter v2 + } if (status.ok() && (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) { @@ -335,7 +335,7 @@ Status CompactionJob::Run() { "Database shutdown or Column family drop during compaction"); } if (status.ok() && compact_->builder != nullptr) { - status = FinishCompactionOutputFile(input.get()); + status = FinishCompactionOutputFile(input->status()); } if (status.ok()) { status = input->status(); @@ -629,7 +629,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, if (compact_->compaction->ShouldStopBefore(key) && compact_->builder != nullptr) { - status = FinishCompactionOutputFile(input); + status = FinishCompactionOutputFile(input->status()); if (!status.ok()) { break; } @@ -772,85 +772,31 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, // We may write a single key (e.g.: for Put/Delete or successful merge). // Or we may instead have to write a sequence/list of keys. // We have to write a sequence iff we have an unsuccessful merge - bool has_merge_list = current_entry_is_merging && !merge.IsSuccess(); - const std::deque* keys = nullptr; - const std::deque* values = nullptr; - std::deque::const_reverse_iterator key_iter; - std::deque::const_reverse_iterator value_iter; - if (has_merge_list) { - keys = &merge.keys(); - values = &merge.values(); - key_iter = keys->rbegin(); // The back (*rbegin()) is the first key - value_iter = values->rbegin(); + if (current_entry_is_merging && !merge.IsSuccess()) { + const auto& keys = merge.keys(); + const auto& values = merge.values(); + std::deque::const_reverse_iterator key_iter = + keys.rbegin(); // The back (*rbegin()) is the first key + std::deque::const_reverse_iterator value_iter = + values.rbegin(); key = Slice(*key_iter); value = Slice(*value_iter); - } - // If we have a list of keys to write, traverse the list. - // If we have a single key to write, simply write that key. - while (true) { - // Invariant: key,value,ikey will always be the next entry to write - Slice newkey(key.data(), key.size()); - std::string kstr; - - // Zeroing out the sequence number leads to better compression. - // If this is the bottommost level (no files in lower levels) - // and the earliest snapshot is larger than this seqno - // then we can squash the seqno to zero. - if (bottommost_level_ && ikey.sequence < earliest_snapshot_ && - ikey.type != kTypeMerge) { - assert(ikey.type != kTypeDeletion); - // make a copy because updating in place would cause problems - // with the priority queue that is managing the input key iterator - kstr.assign(key.data(), key.size()); - UpdateInternalKey(&kstr, (uint64_t)0, ikey.type); - newkey = Slice(kstr); - } - - assert((key.clear(), 1)); // we do not need 'key' anymore - - // Open output file if necessary - if (compact_->builder == nullptr) { - status = OpenCompactionOutputFile(); + // We have a list of keys to write, traverse the list. + while (true) { + status = WriteKeyValue(key, value, ikey, input->status()); if (!status.ok()) { break; } - } - SequenceNumber seqno = GetInternalKeySeqno(newkey); - if (compact_->builder->NumEntries() == 0) { - compact_->current_output()->smallest.DecodeFrom(newkey); - compact_->current_output()->smallest_seqno = seqno; - } else { - compact_->current_output()->smallest_seqno = - std::min(compact_->current_output()->smallest_seqno, seqno); - } - compact_->current_output()->largest.DecodeFrom(newkey); - compact_->builder->Add(newkey, value); - compact_->num_output_records++; - compact_->current_output()->largest_seqno = - std::max(compact_->current_output()->largest_seqno, seqno); - - // Close output file if it is big enough - if (compact_->builder->FileSize() >= - compact_->compaction->max_output_file_size()) { - status = FinishCompactionOutputFile(input); - if (!status.ok()) { - break; - } - } - - // If we have a list of entries, move to next element - // If we only had one entry, then break the loop. - if (has_merge_list) { ++key_iter; ++value_iter; // If at end of list - if (key_iter == keys->rend() || value_iter == values->rend()) { + if (key_iter == keys.rend() || value_iter == values.rend()) { // Sanity Check: if one ends, then both end - assert(key_iter == keys->rend() && value_iter == values->rend()); + assert(key_iter == keys.rend() && value_iter == values.rend()); break; } @@ -858,12 +804,11 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, key = Slice(*key_iter); value = Slice(*value_iter); ParseInternalKey(key, &ikey); - - } else { - // Only had one item to begin with (Put/Delete) - break; } - } // while (true) + } else { + // There is only one item to be written out + status = WriteKeyValue(key, value, ikey, input->status()); + } } // if (!drop) // MergeUntil has moved input to the next entry @@ -878,6 +823,57 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, return status; } +Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, + const ParsedInternalKey& ikey, const Status& input_status) { + Slice newkey(key.data(), key.size()); + std::string kstr; + + // Zeroing out the sequence number leads to better compression. + // If this is the bottommost level (no files in lower levels) + // and the earliest snapshot is larger than this seqno + // then we can squash the seqno to zero. + if (bottommost_level_ && ikey.sequence < earliest_snapshot_ && + ikey.type != kTypeMerge) { + assert(ikey.type != kTypeDeletion); + // make a copy because updating in place would cause problems + // with the priority queue that is managing the input key iterator + kstr.assign(key.data(), key.size()); + UpdateInternalKey(&kstr, (uint64_t)0, ikey.type); + newkey = Slice(kstr); + } + + // Open output file if necessary + if (compact_->builder == nullptr) { + Status status = OpenCompactionOutputFile(); + if (!status.ok()) { + return status; + } + } + + SequenceNumber seqno = GetInternalKeySeqno(newkey); + if (compact_->builder->NumEntries() == 0) { + compact_->current_output()->smallest.DecodeFrom(newkey); + compact_->current_output()->smallest_seqno = seqno; + } else { + compact_->current_output()->smallest_seqno = + std::min(compact_->current_output()->smallest_seqno, seqno); + } + compact_->current_output()->largest.DecodeFrom(newkey); + compact_->builder->Add(newkey, value); + compact_->num_output_records++; + compact_->current_output()->largest_seqno = + std::max(compact_->current_output()->largest_seqno, seqno); + + // Close output file if it is big enough + Status status; + if (compact_->builder->FileSize() >= + compact_->compaction->max_output_file_size()) { + status = FinishCompactionOutputFile(input_status); + } + + return status; +} + void CompactionJob::RecordDroppedKeys( int64_t* key_drop_user, int64_t* key_drop_newer_entry, @@ -967,7 +963,7 @@ void CompactionJob::CallCompactionFilterV2( } // for } -Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { +Status CompactionJob::FinishCompactionOutputFile(const Status& input_status) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_SYNC_FILE); assert(compact_ != nullptr); @@ -980,7 +976,7 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { TableProperties table_properties; // Check for iterator errors - Status s = input->status(); + Status s = input_status; const uint64_t current_entries = compact_->builder->NumEntries(); compact_->current_output()->need_compaction = compact_->builder->NeedCompact(); diff --git a/db/compaction_job.h b/db/compaction_job.h index 7de753f666..ef535c3bf3 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -93,10 +93,14 @@ class CompactionJob { // through input and compact the kv-pairs Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, bool is_compaction_v2); + + Status WriteKeyValue(const Slice& key, const Slice& value, + const ParsedInternalKey& ikey, + const Status& input_status); // Call compaction_filter_v2->Filter() on kv-pairs in compact void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2, uint64_t* time); - Status FinishCompactionOutputFile(Iterator* input); + Status FinishCompactionOutputFile(const Status& input_status); Status InstallCompactionResults(InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options); SequenceNumber findEarliestVisibleSnapshot(