dynamic inplace_update options

Summary:
Make inplace_update_support and inplace_update_num_locks dynamic.
inplace_callback becomes immutable
We are almost free of references to cfd->options() in db_impl

Test Plan: unit test

Reviewers: igor, yhchiang, rven, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D25293
This commit is contained in:
Lei Jin 2014-10-27 12:10:13 -07:00
parent bc3bc4bc2f
commit f1841985e4
14 changed files with 70 additions and 60 deletions

View file

@ -388,12 +388,13 @@ const EnvOptions* ColumnFamilyData::soptions() const {
void ColumnFamilyData::SetCurrent(Version* current) { current_ = current; }
void ColumnFamilyData::CreateNewMemtable(const MemTableOptions& moptions) {
void ColumnFamilyData::CreateNewMemtable(
const MutableCFOptions& mutable_cf_options) {
assert(current_ != nullptr);
if (mem_ != nullptr) {
delete mem_->Unref();
}
mem_ = new MemTable(internal_comparator_, ioptions_, moptions);
mem_ = new MemTable(internal_comparator_, ioptions_, mutable_cf_options);
mem_->Ref();
}

View file

@ -198,7 +198,7 @@ class ColumnFamilyData {
Version* dummy_versions() { return dummy_versions_; }
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
void SetCurrent(Version* current);
void CreateNewMemtable(const MemTableOptions& moptions);
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options);
TableCache* table_cache() const { return table_cache_.get(); }

View file

@ -1228,8 +1228,7 @@ Status DBImpl::Recover(
if (!s.ok()) {
// Clear memtables if recovery failed
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->CreateNewMemtable(MemTableOptions(
*cfd->GetLatestMutableCFOptions(), *cfd->options()));
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
}
}
}
@ -1360,8 +1359,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// file-systems cause the DB::Open() to fail.
return status;
}
cfd->CreateNewMemtable(MemTableOptions(
*cfd->GetLatestMutableCFOptions(), *cfd->options()));
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
}
}
}
@ -1398,8 +1396,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// Recovery failed
break;
}
cfd->CreateNewMemtable(MemTableOptions(
*cfd->GetLatestMutableCFOptions(), *cfd->options()));
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
}
// write MANIFEST with update
@ -2749,7 +2746,7 @@ Status DBImpl::ProcessKeyValueCompaction(
ColumnFamilyData* cfd = compact->compaction->column_family_data();
MergeHelper merge(
cfd->user_comparator(), cfd->ioptions()->merge_operator,
db_options_.info_log.get(), cfd->options()->min_partial_merge_operands,
db_options_.info_log.get(), cfd->ioptions()->min_partial_merge_operands,
false /* internal key corruption is expected */);
auto compaction_filter = cfd->ioptions()->compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
@ -4281,9 +4278,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
}
if (s.ok()) {
new_mem = new MemTable(cfd->internal_comparator(),
*cfd->ioptions(), MemTableOptions(mutable_cf_options,
*cfd->options()));
new_mem = new MemTable(cfd->internal_comparator(), *cfd->ioptions(),
mutable_cf_options);
new_superversion = new SuperVersion();
}
}

View file

@ -32,7 +32,8 @@
namespace rocksdb {
MemTableOptions::MemTableOptions(
const MutableCFOptions& mutable_cf_options, const Options& options)
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options)
: write_buffer_size(mutable_cf_options.write_buffer_size),
arena_block_size(mutable_cf_options.arena_block_size),
memtable_prefix_bloom_bits(mutable_cf_options.memtable_prefix_bloom_bits),
@ -40,21 +41,23 @@ MemTableOptions::MemTableOptions(
mutable_cf_options.memtable_prefix_bloom_probes),
memtable_prefix_bloom_huge_page_tlb_size(
mutable_cf_options.memtable_prefix_bloom_huge_page_tlb_size),
inplace_update_support(options.inplace_update_support),
inplace_update_num_locks(options.inplace_update_num_locks),
inplace_callback(options.inplace_callback),
inplace_update_support(ioptions.inplace_update_support),
inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
inplace_callback(ioptions.inplace_callback),
max_successive_merges(mutable_cf_options.max_successive_merges),
filter_deletes(mutable_cf_options.filter_deletes) {}
filter_deletes(mutable_cf_options.filter_deletes),
statistics(ioptions.statistics),
merge_operator(ioptions.merge_operator),
info_log(ioptions.info_log) {}
MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableCFOptions& ioptions,
const MemTableOptions& moptions)
const MutableCFOptions& mutable_cf_options)
: comparator_(cmp),
ioptions_(ioptions),
moptions_(moptions),
moptions_(ioptions, mutable_cf_options),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(moptions.arena_block_size)),
arena_(moptions.arena_block_size),
kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
arena_(moptions_.arena_block_size),
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &arena_, ioptions.prefix_extractor, ioptions.info_log)),
num_entries_(0),
@ -63,20 +66,20 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
file_number_(0),
first_seqno_(0),
mem_next_logfile_number_(0),
locks_(moptions.inplace_update_support ? moptions.inplace_update_num_locks
: 0),
locks_(moptions_.inplace_update_support ?
moptions_.inplace_update_num_locks : 0),
prefix_extractor_(ioptions.prefix_extractor),
should_flush_(ShouldFlushNow()),
flush_scheduled_(false) {
// if should_flush_ == true without an entry inserted, something must have
// gone wrong already.
assert(!should_flush_);
if (prefix_extractor_ && moptions.memtable_prefix_bloom_bits > 0) {
if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(
&arena_,
moptions.memtable_prefix_bloom_bits, ioptions.bloom_locality,
moptions.memtable_prefix_bloom_probes, nullptr,
moptions.memtable_prefix_bloom_huge_page_tlb_size,
moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality,
moptions_.memtable_prefix_bloom_probes, nullptr,
moptions_.memtable_prefix_bloom_huge_page_tlb_size,
ioptions.info_log));
}
}
@ -454,10 +457,10 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
saver.status = s;
saver.mem = this;
saver.merge_context = merge_context;
saver.merge_operator = ioptions_.merge_operator;
saver.logger = ioptions_.info_log;
saver.merge_operator = moptions_.merge_operator;
saver.logger = moptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;
saver.statistics = ioptions_.statistics;
saver.statistics = moptions_.statistics;
table_->Get(key, &saver, SaveValue);
}
@ -578,12 +581,12 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
memcpy(p, prev_buffer, new_prev_size);
}
}
RecordTick(ioptions_.statistics, NUMBER_KEYS_UPDATED);
RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
should_flush_ = ShouldFlushNow();
return true;
} else if (status == UpdateStatus::UPDATED) {
Add(seq, kTypeValue, key, Slice(str_value));
RecordTick(ioptions_.statistics, NUMBER_KEYS_WRITTEN);
RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
should_flush_ = ShouldFlushNow();
return true;
} else if (status == UpdateStatus::UPDATE_FAILED) {

View file

@ -32,8 +32,8 @@ class MergeContext;
struct MemTableOptions {
explicit MemTableOptions(
const MutableCFOptions& mutable_cf_options,
const Options& options);
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options);
size_t write_buffer_size;
size_t arena_block_size;
uint32_t memtable_prefix_bloom_bits;
@ -47,6 +47,9 @@ struct MemTableOptions {
std::string* merged_value);
size_t max_successive_merges;
bool filter_deletes;
Statistics* statistics;
MergeOperator* merge_operator;
Logger* info_log;
};
class MemTable {
@ -64,7 +67,7 @@ class MemTable {
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator,
const ImmutableCFOptions& ioptions,
const MemTableOptions& moptions);
const MutableCFOptions& mutable_cf_options);
~MemTable();
@ -199,7 +202,6 @@ class MemTable {
const Arena& TEST_GetArena() const { return arena_; }
const ImmutableCFOptions* GetImmutableOptions() const { return &ioptions_; }
const MemTableOptions* GetMemTableOptions() const { return &moptions_; }
private:
@ -211,7 +213,6 @@ class MemTable {
friend class MemTableList;
KeyComparator comparator_;
const ImmutableCFOptions& ioptions_;
const MemTableOptions moptions_;
int refs_;
const size_t kArenaBlockSize;

View file

@ -220,7 +220,7 @@ class Repairer {
Slice record;
WriteBatch batch;
MemTable* mem = new MemTable(icmp_, ioptions_,
MemTableOptions(MutableCFOptions(options_, ioptions_), options_));
MutableCFOptions(options_, ioptions_));
auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_);
mem->Ref();
int counter = 0;

View file

@ -2926,8 +2926,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
AppendVersion(new_cfd, v);
// GetLatestMutableCFOptions() is safe here without mutex since the
// cfd is not available to client
new_cfd->CreateNewMemtable(MemTableOptions(
*new_cfd->GetLatestMutableCFOptions(), *new_cfd->options()));
new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions());
new_cfd->SetLogNumber(edit->log_number_);
return new_cfd;
}

View file

@ -349,13 +349,12 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* ioptions = mem->GetImmutableOptions();
auto* moptions = mem->GetMemTableOptions();
if (!moptions->inplace_update_support) {
mem->Add(sequence_, kTypeValue, key, value);
} else if (moptions->inplace_callback == nullptr) {
mem->Update(sequence_, key, value);
RecordTick(ioptions->statistics, NUMBER_KEYS_UPDATED);
RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED);
} else {
if (mem->UpdateCallback(sequence_, key, value)) {
} else {
@ -382,11 +381,11 @@ class MemTableInserter : public WriteBatch::Handler {
if (status == UpdateStatus::UPDATED_INPLACE) {
// prev_value is updated in-place with final value.
mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
RecordTick(ioptions->statistics, NUMBER_KEYS_WRITTEN);
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
} else if (status == UpdateStatus::UPDATED) {
// merged_value contains the final value.
mem->Add(sequence_, kTypeValue, key, Slice(merged_value));
RecordTick(ioptions->statistics, NUMBER_KEYS_WRITTEN);
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
}
}
}
@ -406,7 +405,6 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* ioptions = mem->GetImmutableOptions();
auto* moptions = mem->GetMemTableOptions();
bool perform_merge = false;
@ -441,16 +439,16 @@ class MemTableInserter : public WriteBatch::Handler {
Slice get_value_slice = Slice(get_value);
// 2) Apply this merge
auto merge_operator = ioptions->merge_operator;
auto merge_operator = moptions->merge_operator;
assert(merge_operator);
std::deque<std::string> operands;
operands.push_front(value.ToString());
std::string new_value;
if (!merge_operator->FullMerge(key, &get_value_slice, operands,
&new_value, ioptions->info_log)) {
&new_value, moptions->info_log)) {
// Failed to merge!
RecordTick(ioptions->statistics, NUMBER_MERGE_FAILURES);
RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES);
// Store the delta in memtable
perform_merge = false;
@ -477,7 +475,6 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* ioptions = mem->GetImmutableOptions();
auto* moptions = mem->GetMemTableOptions();
if (!dont_filter_deletes_ && moptions->filter_deletes) {
SnapshotImpl read_from_snapshot;
@ -490,7 +487,7 @@ class MemTableInserter : public WriteBatch::Handler {
cf_handle = db_->DefaultColumnFamily();
}
if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
RecordTick(ioptions->statistics, NUMBER_FILTERED_DELETES);
RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
return Status::OK();
}
}

View file

@ -29,7 +29,7 @@ static std::string PrintContents(WriteBatch* b) {
options.memtable_factory = factory;
ImmutableCFOptions ioptions(options);
MemTable* mem = new MemTable(cmp, ioptions,
MemTableOptions(MutableCFOptions(options, ioptions), options));
MutableCFOptions(options, ioptions));
mem->Ref();
std::string state;
ColumnFamilyMemTablesDefault cf_mems_default(mem, &options);

View file

@ -5,6 +5,7 @@
#pragma once
#include <string>
#include <vector>
#include "rocksdb/options.h"
@ -36,6 +37,13 @@ struct ImmutableCFOptions {
CompactionFilterFactoryV2* compaction_filter_factory_v2;
bool inplace_update_support;
UpdateStatus (*inplace_callback)(char* existing_value,
uint32_t* existing_value_size,
Slice delta_value,
std::string* merged_value);
Logger* info_log;
Statistics* statistics;

View file

@ -439,7 +439,7 @@ class MemTableConstructor: public Constructor {
options.memtable_factory = table_factory_;
ImmutableCFOptions ioptions(options);
memtable_ = new MemTable(internal_comparator_, ioptions,
MemTableOptions(MutableCFOptions(options, ioptions), options));
MutableCFOptions(options, ioptions));
memtable_->Ref();
}
~MemTableConstructor() {
@ -455,7 +455,7 @@ class MemTableConstructor: public Constructor {
options.memtable_factory = table_factory_;
ImmutableCFOptions mem_ioptions(options);
memtable_ = new MemTable(internal_comparator_, mem_ioptions,
MemTableOptions(MutableCFOptions(options, mem_ioptions), options));
MutableCFOptions(options, mem_ioptions));
memtable_->Ref();
int seq = 1;
for (KVMap::const_iterator it = data.begin();
@ -1879,7 +1879,7 @@ TEST(MemTableTest, Simple) {
options.memtable_factory = table_factory;
ImmutableCFOptions ioptions(options);
MemTable* memtable = new MemTable(cmp, ioptions,
MemTableOptions(MutableCFOptions(options, ioptions), options));
MutableCFOptions(options, ioptions));
memtable->Ref();
WriteBatch batch;
WriteBatchInternal::SetSequence(&batch, 100);

View file

@ -22,6 +22,7 @@ struct MutableCFOptions {
options.memtable_prefix_bloom_huge_page_tlb_size),
max_successive_merges(options.max_successive_merges),
filter_deletes(options.filter_deletes),
inplace_update_num_locks(options.inplace_update_num_locks),
disable_auto_compactions(options.disable_auto_compactions),
soft_rate_limit(options.soft_rate_limit),
hard_rate_limit(options.hard_rate_limit),
@ -53,6 +54,7 @@ struct MutableCFOptions {
memtable_prefix_bloom_huge_page_tlb_size(0),
max_successive_merges(0),
filter_deletes(false),
inplace_update_num_locks(0),
disable_auto_compactions(false),
soft_rate_limit(0),
hard_rate_limit(0),
@ -94,6 +96,7 @@ struct MutableCFOptions {
size_t memtable_prefix_bloom_huge_page_tlb_size;
size_t max_successive_merges;
bool filter_deletes;
size_t inplace_update_num_locks;
// Compaction related options
bool disable_auto_compactions;

View file

@ -42,6 +42,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
compaction_filter(options.compaction_filter),
compaction_filter_factory(options.compaction_filter_factory.get()),
compaction_filter_factory_v2(options.compaction_filter_factory_v2.get()),
inplace_update_support(options.inplace_update_support),
inplace_callback(options.inplace_callback),
info_log(options.info_log.get()),
statistics(options.statistics.get()),
env(options.env),

View file

@ -94,6 +94,8 @@ bool ParseMemtableOptions(const std::string& name, const std::string& value,
new_options->filter_deletes = ParseBoolean(name, value);
} else if (name == "max_write_buffer_number") {
new_options->max_write_buffer_number = ParseInt(value);
} else if (name == "inplace_update_num_locks") {
new_options->inplace_update_num_locks = ParseInt64(value);
} else {
return false;
}
@ -299,14 +301,12 @@ bool GetColumnFamilyOptionsFromMap(
} else if (o.first == "compaction_options_fifo") {
new_options->compaction_options_fifo.max_table_files_size
= ParseUint64(o.second);
} else if (o.first == "inplace_update_support") {
new_options->inplace_update_support = ParseBoolean(o.first, o.second);
} else if (o.first == "inplace_update_num_locks") {
new_options->inplace_update_num_locks = ParseInt64(o.second);
} else if (o.first == "bloom_locality") {
new_options->bloom_locality = ParseUint32(o.second);
} else if (o.first == "min_partial_merge_operands") {
new_options->min_partial_merge_operands = ParseUint32(o.second);
} else if (o.first == "inplace_update_support") {
new_options->inplace_update_support = ParseBoolean(o.first, o.second);
} else {
return false;
}