Partial cleanup of CompactionJob

Summary:
Logging, dealing with key prefix batches and updating stats
moved from CompactionJob::Run() into separate functions.

Test Plan: make all && make check

Reviewers: sdong, rven, yhchiang, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D41919
This commit is contained in:
Andres Notzli 2015-07-14 00:09:20 -07:00
parent 1879d9370c
commit ab137af4ba
2 changed files with 193 additions and 171 deletions

View file

@ -307,170 +307,25 @@ Status CompactionJob::Run() {
TEST_SYNC_POINT("CompactionJob::Run():Start");
log_buffer_->FlushBufferToLog();
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
auto* compaction = compact_->compaction;
// Let's check if anything will get logged. Don't prepare all the info if
// we're not logging
if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
Compaction::InputLevelSummaryBuffer inputs_summary;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(),
job_id_, compaction->InputLevelSummary(&inputs_summary),
compaction->score());
char scratch[2345];
compact_->compaction->Summary(scratch, sizeof(scratch));
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch);
// build event logger report
auto stream = event_logger_->Log();
stream << "job" << job_id_ << "event"
<< "compaction_started";
for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
stream << ("files_L" + ToString(compaction->level(i)));
stream.StartArray();
for (auto f : *compaction->inputs(i)) {
stream << f->fd.GetNumber();
}
stream.EndArray();
}
stream << "score" << compaction->score() << "input_data_size"
<< compaction->CalculateTotalInputSize();
}
LogCompaction(cfd, compaction);
const uint64_t start_micros = env_->NowMicros();
std::unique_ptr<Iterator> input(
versions_->MakeInputIterator(compact_->compaction));
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::unique_ptr<CompactionFilterV2> compaction_filter_from_factory_v2 =
compact_->compaction->CreateCompactionFilterV2();
auto compaction_filter_v2 = compaction_filter_from_factory_v2.get();
Status status;
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
if (!compaction_filter_v2) {
status = ProcessKeyValueCompaction(&imm_micros, input.get(), false);
} else {
// temp_backup_input always point to the start of the current buffer
// temp_backup_input = backup_input;
// iterate through input,
// 1) buffer ineligible keys and value keys into 2 separate buffers;
// 2) send value_buffer to compaction filter and alternate the values;
// 3) merge value_buffer with ineligible_value_buffer;
// 4) run the modified "compaction" using the old for loop.
bool prefix_initialized = false;
shared_ptr<Iterator> backup_input(
versions_->MakeInputIterator(compact_->compaction));
backup_input->SeekToFirst();
uint64_t total_filter_time = 0;
while (backup_input->Valid() &&
!shutting_down_->load(std::memory_order_acquire) &&
!cfd->IsDropped()) {
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary
// on other column families, too
imm_micros += yield_callback_();
Slice key = backup_input->key();
Slice value = backup_input->value();
if (!ParseInternalKey(key, &ikey)) {
// log error
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Failed to parse key: %s", cfd->GetName().c_str(),
job_id_, key.ToString().c_str());
continue;
} else {
const SliceTransform* transformer =
cfd->ioptions()->compaction_filter_factory_v2->GetPrefixExtractor();
const auto key_prefix = transformer->Transform(ikey.user_key);
if (!prefix_initialized) {
compact_->cur_prefix_ = key_prefix.ToString();
prefix_initialized = true;
}
// If the prefix remains the same, keep buffering
if (key_prefix.compare(Slice(compact_->cur_prefix_)) == 0) {
// Apply the compaction filter V2 to all the kv pairs sharing
// the same prefix
if (ikey.type == kTypeValue &&
(visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
// Buffer all keys sharing the same prefix for CompactionFilterV2
// Iterate through keys to check prefix
compact_->BufferKeyValueSlices(key, value);
} else {
// buffer ineligible keys
compact_->BufferOtherKeyValueSlices(key, value);
}
backup_input->Next();
continue;
// finish changing values for eligible keys
} else {
// Now prefix changes, this batch is done.
// Call compaction filter on the buffered values to change the value
if (compact_->key_str_buf_.size() > 0) {
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->cur_prefix_ = key_prefix.ToString();
}
}
// Merge this batch of data (values + ineligible keys)
compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
// Done buffering for the current prefix. Spit it out to disk
// Now just iterate through all the kv-pairs
status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
if (!status.ok()) {
break;
}
// After writing the kv-pairs, we can safely remove the reference
// to the string buffer and clean them up
compact_->CleanupBatchBuffer();
compact_->CleanupMergedBuffer();
// Buffer the key that triggers the mismatch in prefix
if (ikey.type == kTypeValue &&
(visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
compact_->BufferKeyValueSlices(key, value);
} else {
compact_->BufferOtherKeyValueSlices(key, value);
}
backup_input->Next();
if (!backup_input->Valid()) {
// If this is the single last value, we need to merge it.
if (compact_->key_str_buf_.size() > 0) {
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
if (!status.ok()) {
break;
}
compact_->CleanupBatchBuffer();
compact_->CleanupMergedBuffer();
}
} // done processing all prefix batches
// finish the last batch
if (status.ok()) {
if (compact_->key_str_buf_.size() > 0) {
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
}
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time);
status = ProcessPrefixBatches(cfd, &imm_micros, input.get(),
compaction_filter_v2);
} // checking for compaction filter v2
if (status.ok() &&
@ -492,24 +347,7 @@ Status CompactionJob::Run() {
compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros;
MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
size_t num_output_files = compact_->outputs.size();
if (compact_->builder != nullptr) {
// An error occurred so ignore the last output.
assert(num_output_files > 0);
--num_output_files;
}
compaction_stats_.num_output_files = static_cast<int>(num_output_files);
UpdateCompactionInputStats();
for (size_t i = 0; i < num_output_files; i++) {
compaction_stats_.bytes_written += compact_->outputs[i].file_size;
}
if (compact_->num_input_records > compact_->num_output_records) {
compaction_stats_.num_dropped_records +=
compact_->num_input_records - compact_->num_output_records;
}
UpdateCompactionStats();
RecordCompactionIOStats();
@ -579,6 +417,135 @@ void CompactionJob::Install(Status* status,
CleanupCompaction(*status);
}
Status CompactionJob::ProcessPrefixBatches(
ColumnFamilyData* cfd,
int64_t* imm_micros,
Iterator* input,
CompactionFilterV2* compaction_filter_v2) {
// temp_backup_input always point to the start of the current buffer
// temp_backup_input = backup_input;
// iterate through input,
// 1) buffer ineligible keys and value keys into 2 separate buffers;
// 2) send value_buffer to compaction filter and alternate the values;
// 3) merge value_buffer with ineligible_value_buffer;
// 4) run the modified "compaction" using the old for loop.
ParsedInternalKey ikey;
Status status;
bool prefix_initialized = false;
shared_ptr<Iterator> backup_input(
versions_->MakeInputIterator(compact_->compaction));
backup_input->SeekToFirst();
uint64_t total_filter_time = 0;
while (backup_input->Valid() &&
!shutting_down_->load(std::memory_order_acquire) &&
!cfd->IsDropped()) {
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
imm_micros += yield_callback_();
Slice key = backup_input->key();
Slice value = backup_input->value();
if (!ParseInternalKey(key, &ikey)) {
// log error
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Failed to parse key: %s", cfd->GetName().c_str(),
job_id_, key.ToString().c_str());
continue;
} else {
const SliceTransform* transformer =
cfd->ioptions()->compaction_filter_factory_v2->GetPrefixExtractor();
const auto key_prefix = transformer->Transform(ikey.user_key);
if (!prefix_initialized) {
compact_->cur_prefix_ = key_prefix.ToString();
prefix_initialized = true;
}
// If the prefix remains the same, keep buffering
if (key_prefix.compare(Slice(compact_->cur_prefix_)) == 0) {
// Apply the compaction filter V2 to all the kv pairs sharing
// the same prefix
if (ikey.type == kTypeValue &&
(visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
// Buffer all keys sharing the same prefix for CompactionFilterV2
// Iterate through keys to check prefix
compact_->BufferKeyValueSlices(key, value);
} else {
// buffer ineligible keys
compact_->BufferOtherKeyValueSlices(key, value);
}
backup_input->Next();
continue;
// finish changing values for eligible keys
} else {
// Now prefix changes, this batch is done.
// Call compaction filter on the buffered values to change the value
if (compact_->key_str_buf_.size() > 0) {
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->cur_prefix_ = key_prefix.ToString();
}
}
// Merge this batch of data (values + ineligible keys)
compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
// Done buffering for the current prefix. Spit it out to disk
// Now just iterate through all the kv-pairs
status = ProcessKeyValueCompaction(imm_micros, input, true);
if (!status.ok()) {
break;
}
// After writing the kv-pairs, we can safely remove the reference
// to the string buffer and clean them up
compact_->CleanupBatchBuffer();
compact_->CleanupMergedBuffer();
// Buffer the key that triggers the mismatch in prefix
if (ikey.type == kTypeValue &&
(visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
compact_->BufferKeyValueSlices(key, value);
} else {
compact_->BufferOtherKeyValueSlices(key, value);
}
backup_input->Next();
if (!backup_input->Valid()) {
// If this is the single last value, we need to merge it.
if (compact_->key_str_buf_.size() > 0) {
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(imm_micros, input, true);
if (!status.ok()) {
break;
}
compact_->CleanupBatchBuffer();
compact_->CleanupMergedBuffer();
}
} // done processing all prefix batches
// finish the last batch
if (status.ok()) {
if (compact_->key_str_buf_.size() > 0) {
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(imm_micros, input, true);
}
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time);
return status;
}
Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
Iterator* input,
bool is_compaction_v2) {
@ -627,8 +594,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
}
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
// compacting column family. we should also check if flush is necessary
// on other column families, too
(*imm_micros) += yield_callback_();
Slice key;
@ -1244,7 +1211,15 @@ void CopyPrefix(
#endif // !ROCKSDB_LITE
void CompactionJob::UpdateCompactionInputStats() {
void CompactionJob::UpdateCompactionStats() {
size_t num_output_files = compact_->outputs.size();
if (compact_->builder != nullptr) {
// An error occurred so ignore the last output.
assert(num_output_files > 0);
--num_output_files;
}
compaction_stats_.num_output_files = static_cast<int>(num_output_files);
Compaction* compaction = compact_->compaction;
compaction_stats_.num_input_files_in_non_output_levels = 0;
compaction_stats_.num_input_files_in_output_level = 0;
@ -1264,6 +1239,14 @@ void CompactionJob::UpdateCompactionInputStats() {
input_level);
}
}
for (size_t i = 0; i < num_output_files; i++) {
compaction_stats_.bytes_written += compact_->outputs[i].file_size;
}
if (compact_->num_input_records > compact_->num_output_records) {
compaction_stats_.num_dropped_records +=
compact_->num_input_records - compact_->num_output_records;
}
}
void CompactionJob::UpdateCompactionInputStatsHelper(
@ -1318,4 +1301,35 @@ void CompactionJob::UpdateCompactionJobStats(
#endif // !ROCKSDB_LITE
}
void CompactionJob::LogCompaction(
ColumnFamilyData* cfd, Compaction* compaction) {
// Let's check if anything will get logged. Don't prepare all the info if
// we're not logging
if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
Compaction::InputLevelSummaryBuffer inputs_summary;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(),
job_id_, compaction->InputLevelSummary(&inputs_summary),
compaction->score());
char scratch[2345];
compaction->Summary(scratch, sizeof(scratch));
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch);
// build event logger report
auto stream = event_logger_->Log();
stream << "job" << job_id_ << "event"
<< "compaction_started";
for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
stream << ("files_L" + ToString(compaction->level(i)));
stream.StartArray();
for (auto f : *compaction->inputs(i)) {
stream << f->fd.GetNumber();
}
stream.EndArray();
}
stream << "score" << compaction->score() << "input_data_size"
<< compaction->CalculateTotalInputSize();
}
}
} // namespace rocksdb

View file

@ -83,6 +83,12 @@ class CompactionJob {
// update the thread status for starting a compaction.
void ReportStartedCompaction(Compaction* compaction);
void AllocateCompactionOutputFileNumbers();
// Processes batches of keys with the same prefixes. This is used for
// CompactionFilterV2.
Status ProcessPrefixBatches(ColumnFamilyData* cfd,
int64_t* imm_micros,
Iterator* input,
CompactionFilterV2* compaction_filter_v2);
// Call compaction filter if is_compaction_v2 is not true. Then iterate
// through input and compact the kv-pairs
Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input,
@ -105,10 +111,12 @@ class CompactionJob {
int64_t* key_drop_newer_entry,
int64_t* key_drop_obsolete);
void UpdateCompactionInputStats();
void UpdateCompactionStats();
void UpdateCompactionInputStatsHelper(
int* num_files, uint64_t* bytes_read, int input_level);
void LogCompaction(ColumnFamilyData* cfd, Compaction* compaction);
int job_id_;
// CompactionJob state