diff --git a/db/compaction_job.cc b/db/compaction_job.cc index bf8a77f215..722316c6e1 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -307,170 +307,25 @@ Status CompactionJob::Run() { TEST_SYNC_POINT("CompactionJob::Run():Start"); log_buffer_->FlushBufferToLog(); ColumnFamilyData* cfd = compact_->compaction->column_family_data(); - auto* compaction = compact_->compaction; - // Let's check if anything will get logged. Don't prepare all the info if - // we're not logging - if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) { - Compaction::InputLevelSummaryBuffer inputs_summary; - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(), - job_id_, compaction->InputLevelSummary(&inputs_summary), - compaction->score()); - char scratch[2345]; - compact_->compaction->Summary(scratch, sizeof(scratch)); - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch); - // build event logger report - auto stream = event_logger_->Log(); - stream << "job" << job_id_ << "event" - << "compaction_started"; - for (size_t i = 0; i < compaction->num_input_levels(); ++i) { - stream << ("files_L" + ToString(compaction->level(i))); - stream.StartArray(); - for (auto f : *compaction->inputs(i)) { - stream << f->fd.GetNumber(); - } - stream.EndArray(); - } - stream << "score" << compaction->score() << "input_data_size" - << compaction->CalculateTotalInputSize(); - } + LogCompaction(cfd, compaction); const uint64_t start_micros = env_->NowMicros(); std::unique_ptr input( versions_->MakeInputIterator(compact_->compaction)); input->SeekToFirst(); - Status status; - ParsedInternalKey ikey; std::unique_ptr compaction_filter_from_factory_v2 = compact_->compaction->CreateCompactionFilterV2(); auto compaction_filter_v2 = compaction_filter_from_factory_v2.get(); + Status status; int64_t imm_micros = 0; // Micros spent doing imm_ compactions if (!compaction_filter_v2) { status = ProcessKeyValueCompaction(&imm_micros, input.get(), false); } else { - // temp_backup_input always point to the start of the current buffer - // temp_backup_input = backup_input; - // iterate through input, - // 1) buffer ineligible keys and value keys into 2 separate buffers; - // 2) send value_buffer to compaction filter and alternate the values; - // 3) merge value_buffer with ineligible_value_buffer; - // 4) run the modified "compaction" using the old for loop. - bool prefix_initialized = false; - shared_ptr backup_input( - versions_->MakeInputIterator(compact_->compaction)); - backup_input->SeekToFirst(); - uint64_t total_filter_time = 0; - while (backup_input->Valid() && - !shutting_down_->load(std::memory_order_acquire) && - !cfd->IsDropped()) { - // FLUSH preempts compaction - // TODO(icanadi) this currently only checks if flush is necessary on - // compacting column family. we should also check if flush is necessary - // on other column families, too - - imm_micros += yield_callback_(); - - Slice key = backup_input->key(); - Slice value = backup_input->value(); - - if (!ParseInternalKey(key, &ikey)) { - // log error - Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, - "[%s] [JOB %d] Failed to parse key: %s", cfd->GetName().c_str(), - job_id_, key.ToString().c_str()); - continue; - } else { - const SliceTransform* transformer = - cfd->ioptions()->compaction_filter_factory_v2->GetPrefixExtractor(); - const auto key_prefix = transformer->Transform(ikey.user_key); - if (!prefix_initialized) { - compact_->cur_prefix_ = key_prefix.ToString(); - prefix_initialized = true; - } - // If the prefix remains the same, keep buffering - if (key_prefix.compare(Slice(compact_->cur_prefix_)) == 0) { - // Apply the compaction filter V2 to all the kv pairs sharing - // the same prefix - if (ikey.type == kTypeValue && - (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { - // Buffer all keys sharing the same prefix for CompactionFilterV2 - // Iterate through keys to check prefix - compact_->BufferKeyValueSlices(key, value); - } else { - // buffer ineligible keys - compact_->BufferOtherKeyValueSlices(key, value); - } - backup_input->Next(); - continue; - // finish changing values for eligible keys - } else { - // Now prefix changes, this batch is done. - // Call compaction filter on the buffered values to change the value - if (compact_->key_str_buf_.size() > 0) { - uint64_t time = 0; - CallCompactionFilterV2(compaction_filter_v2, &time); - total_filter_time += time; - } - compact_->cur_prefix_ = key_prefix.ToString(); - } - } - - // Merge this batch of data (values + ineligible keys) - compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); - - // Done buffering for the current prefix. Spit it out to disk - // Now just iterate through all the kv-pairs - status = ProcessKeyValueCompaction(&imm_micros, input.get(), true); - - if (!status.ok()) { - break; - } - - // After writing the kv-pairs, we can safely remove the reference - // to the string buffer and clean them up - compact_->CleanupBatchBuffer(); - compact_->CleanupMergedBuffer(); - // Buffer the key that triggers the mismatch in prefix - if (ikey.type == kTypeValue && - (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { - compact_->BufferKeyValueSlices(key, value); - } else { - compact_->BufferOtherKeyValueSlices(key, value); - } - backup_input->Next(); - if (!backup_input->Valid()) { - // If this is the single last value, we need to merge it. - if (compact_->key_str_buf_.size() > 0) { - uint64_t time = 0; - CallCompactionFilterV2(compaction_filter_v2, &time); - total_filter_time += time; - } - compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); - - status = ProcessKeyValueCompaction(&imm_micros, input.get(), true); - if (!status.ok()) { - break; - } - - compact_->CleanupBatchBuffer(); - compact_->CleanupMergedBuffer(); - } - } // done processing all prefix batches - // finish the last batch - if (status.ok()) { - if (compact_->key_str_buf_.size() > 0) { - uint64_t time = 0; - CallCompactionFilterV2(compaction_filter_v2, &time); - total_filter_time += time; - } - compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); - status = ProcessKeyValueCompaction(&imm_micros, input.get(), true); - } - RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); + status = ProcessPrefixBatches(cfd, &imm_micros, input.get(), + compaction_filter_v2); } // checking for compaction filter v2 if (status.ok() && @@ -492,24 +347,7 @@ Status CompactionJob::Run() { compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros; MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); - - size_t num_output_files = compact_->outputs.size(); - if (compact_->builder != nullptr) { - // An error occurred so ignore the last output. - assert(num_output_files > 0); - --num_output_files; - } - compaction_stats_.num_output_files = static_cast(num_output_files); - - UpdateCompactionInputStats(); - - for (size_t i = 0; i < num_output_files; i++) { - compaction_stats_.bytes_written += compact_->outputs[i].file_size; - } - if (compact_->num_input_records > compact_->num_output_records) { - compaction_stats_.num_dropped_records += - compact_->num_input_records - compact_->num_output_records; - } + UpdateCompactionStats(); RecordCompactionIOStats(); @@ -579,6 +417,135 @@ void CompactionJob::Install(Status* status, CleanupCompaction(*status); } +Status CompactionJob::ProcessPrefixBatches( + ColumnFamilyData* cfd, + int64_t* imm_micros, + Iterator* input, + CompactionFilterV2* compaction_filter_v2) { + // temp_backup_input always point to the start of the current buffer + // temp_backup_input = backup_input; + // iterate through input, + // 1) buffer ineligible keys and value keys into 2 separate buffers; + // 2) send value_buffer to compaction filter and alternate the values; + // 3) merge value_buffer with ineligible_value_buffer; + // 4) run the modified "compaction" using the old for loop. + ParsedInternalKey ikey; + Status status; + bool prefix_initialized = false; + shared_ptr backup_input( + versions_->MakeInputIterator(compact_->compaction)); + backup_input->SeekToFirst(); + uint64_t total_filter_time = 0; + while (backup_input->Valid() && + !shutting_down_->load(std::memory_order_acquire) && + !cfd->IsDropped()) { + // FLUSH preempts compaction + // TODO(icanadi) this currently only checks if flush is necessary on + // compacting column family. we should also check if flush is necessary on + // other column families, too + + imm_micros += yield_callback_(); + + Slice key = backup_input->key(); + Slice value = backup_input->value(); + + if (!ParseInternalKey(key, &ikey)) { + // log error + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "[%s] [JOB %d] Failed to parse key: %s", cfd->GetName().c_str(), + job_id_, key.ToString().c_str()); + continue; + } else { + const SliceTransform* transformer = + cfd->ioptions()->compaction_filter_factory_v2->GetPrefixExtractor(); + const auto key_prefix = transformer->Transform(ikey.user_key); + if (!prefix_initialized) { + compact_->cur_prefix_ = key_prefix.ToString(); + prefix_initialized = true; + } + // If the prefix remains the same, keep buffering + if (key_prefix.compare(Slice(compact_->cur_prefix_)) == 0) { + // Apply the compaction filter V2 to all the kv pairs sharing + // the same prefix + if (ikey.type == kTypeValue && + (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { + // Buffer all keys sharing the same prefix for CompactionFilterV2 + // Iterate through keys to check prefix + compact_->BufferKeyValueSlices(key, value); + } else { + // buffer ineligible keys + compact_->BufferOtherKeyValueSlices(key, value); + } + backup_input->Next(); + continue; + // finish changing values for eligible keys + } else { + // Now prefix changes, this batch is done. + // Call compaction filter on the buffered values to change the value + if (compact_->key_str_buf_.size() > 0) { + uint64_t time = 0; + CallCompactionFilterV2(compaction_filter_v2, &time); + total_filter_time += time; + } + compact_->cur_prefix_ = key_prefix.ToString(); + } + } + + // Merge this batch of data (values + ineligible keys) + compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); + + // Done buffering for the current prefix. Spit it out to disk + // Now just iterate through all the kv-pairs + status = ProcessKeyValueCompaction(imm_micros, input, true); + + if (!status.ok()) { + break; + } + + // After writing the kv-pairs, we can safely remove the reference + // to the string buffer and clean them up + compact_->CleanupBatchBuffer(); + compact_->CleanupMergedBuffer(); + // Buffer the key that triggers the mismatch in prefix + if (ikey.type == kTypeValue && + (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { + compact_->BufferKeyValueSlices(key, value); + } else { + compact_->BufferOtherKeyValueSlices(key, value); + } + backup_input->Next(); + if (!backup_input->Valid()) { + // If this is the single last value, we need to merge it. + if (compact_->key_str_buf_.size() > 0) { + uint64_t time = 0; + CallCompactionFilterV2(compaction_filter_v2, &time); + total_filter_time += time; + } + compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); + + status = ProcessKeyValueCompaction(imm_micros, input, true); + if (!status.ok()) { + break; + } + + compact_->CleanupBatchBuffer(); + compact_->CleanupMergedBuffer(); + } + } // done processing all prefix batches + // finish the last batch + if (status.ok()) { + if (compact_->key_str_buf_.size() > 0) { + uint64_t time = 0; + CallCompactionFilterV2(compaction_filter_v2, &time); + total_filter_time += time; + } + compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); + status = ProcessKeyValueCompaction(imm_micros, input, true); + } + RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); + return status; +} + Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, bool is_compaction_v2) { @@ -627,8 +594,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, } // FLUSH preempts compaction // TODO(icanadi) this currently only checks if flush is necessary on - // compacting column family. we should also check if flush is necessary on - // other column families, too + // compacting column family. we should also check if flush is necessary + // on other column families, too (*imm_micros) += yield_callback_(); Slice key; @@ -1244,7 +1211,15 @@ void CopyPrefix( #endif // !ROCKSDB_LITE -void CompactionJob::UpdateCompactionInputStats() { +void CompactionJob::UpdateCompactionStats() { + size_t num_output_files = compact_->outputs.size(); + if (compact_->builder != nullptr) { + // An error occurred so ignore the last output. + assert(num_output_files > 0); + --num_output_files; + } + compaction_stats_.num_output_files = static_cast(num_output_files); + Compaction* compaction = compact_->compaction; compaction_stats_.num_input_files_in_non_output_levels = 0; compaction_stats_.num_input_files_in_output_level = 0; @@ -1264,6 +1239,14 @@ void CompactionJob::UpdateCompactionInputStats() { input_level); } } + + for (size_t i = 0; i < num_output_files; i++) { + compaction_stats_.bytes_written += compact_->outputs[i].file_size; + } + if (compact_->num_input_records > compact_->num_output_records) { + compaction_stats_.num_dropped_records += + compact_->num_input_records - compact_->num_output_records; + } } void CompactionJob::UpdateCompactionInputStatsHelper( @@ -1318,4 +1301,35 @@ void CompactionJob::UpdateCompactionJobStats( #endif // !ROCKSDB_LITE } +void CompactionJob::LogCompaction( + ColumnFamilyData* cfd, Compaction* compaction) { + // Let's check if anything will get logged. Don't prepare all the info if + // we're not logging + if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) { + Compaction::InputLevelSummaryBuffer inputs_summary; + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(), + job_id_, compaction->InputLevelSummary(&inputs_summary), + compaction->score()); + char scratch[2345]; + compaction->Summary(scratch, sizeof(scratch)); + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch); + // build event logger report + auto stream = event_logger_->Log(); + stream << "job" << job_id_ << "event" + << "compaction_started"; + for (size_t i = 0; i < compaction->num_input_levels(); ++i) { + stream << ("files_L" + ToString(compaction->level(i))); + stream.StartArray(); + for (auto f : *compaction->inputs(i)) { + stream << f->fd.GetNumber(); + } + stream.EndArray(); + } + stream << "score" << compaction->score() << "input_data_size" + << compaction->CalculateTotalInputSize(); + } +} + } // namespace rocksdb diff --git a/db/compaction_job.h b/db/compaction_job.h index fdf13e8b2d..7de753f666 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -83,6 +83,12 @@ class CompactionJob { // update the thread status for starting a compaction. void ReportStartedCompaction(Compaction* compaction); void AllocateCompactionOutputFileNumbers(); + // Processes batches of keys with the same prefixes. This is used for + // CompactionFilterV2. + Status ProcessPrefixBatches(ColumnFamilyData* cfd, + int64_t* imm_micros, + Iterator* input, + CompactionFilterV2* compaction_filter_v2); // Call compaction filter if is_compaction_v2 is not true. Then iterate // through input and compact the kv-pairs Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, @@ -105,10 +111,12 @@ class CompactionJob { int64_t* key_drop_newer_entry, int64_t* key_drop_obsolete); - void UpdateCompactionInputStats(); + void UpdateCompactionStats(); void UpdateCompactionInputStatsHelper( int* num_files, uint64_t* bytes_read, int input_level); + void LogCompaction(ColumnFamilyData* cfd, Compaction* compaction); + int job_id_; // CompactionJob state