Merge branch 'master' into columnfamilies

Conflicts:
	db/db_impl.cc
This commit is contained in:
Igor Canadi 2014-03-05 12:25:05 -08:00
commit 0738ae6dc9
12 changed files with 564 additions and 272 deletions

View file

@ -13,6 +13,7 @@
* By default, checksums are verified on every read from database
* Added is_manual_compaction to CompactionFilter::Context
* Added "virtual void WaitForJoin() = 0" in class Env
* Removed BackupEngine::DeleteBackupsNewerThan() function
### New Features
* If we find one truncated record at the end of the MANIFEST or WAL files,

View file

@ -191,41 +191,72 @@ void Compaction::ResetNextCompactionIndex() {
input_version_->ResetNextCompactionIndex(level_);
}
static void InputSummary(std::vector<FileMetaData*>& files, char* output,
/*
for sizes >=10TB, print "XXTB"
for sizes >=10GB, print "XXGB"
etc.
*/
static void FileSizeSummary(unsigned long long sz, char* output, int len) {
const unsigned long long ull10 = 10;
if (sz >= ull10<<40) {
snprintf(output, len, "%lluTB", sz>>40);
} else if (sz >= ull10<<30) {
snprintf(output, len, "%lluGB", sz>>30);
} else if (sz >= ull10<<20) {
snprintf(output, len, "%lluMB", sz>>20);
} else if (sz >= ull10<<10) {
snprintf(output, len, "%lluKB", sz>>10);
} else {
snprintf(output, len, "%lluB", sz);
}
}
static int InputSummary(std::vector<FileMetaData*>& files, char* output,
int len) {
int write = 0;
for (unsigned int i = 0; i < files.size(); i++) {
int sz = len - write;
int ret = snprintf(output + write, sz, "%lu(%lu) ",
(unsigned long)files.at(i)->number,
(unsigned long)files.at(i)->file_size);
int ret;
char sztxt[16];
FileSizeSummary((unsigned long long)files.at(i)->file_size, sztxt, 16);
ret = snprintf(output + write, sz, "%lu(%s) ",
(unsigned long)files.at(i)->number,
sztxt);
if (ret < 0 || ret >= sz)
break;
write += ret;
}
return write;
}
void Compaction::Summary(char* output, int len) {
int write = snprintf(output, len,
"Base version %lu Base level %d, seek compaction:%d, inputs:",
"Base version %lu Base level %d, seek compaction:%d, inputs: [",
(unsigned long)input_version_->GetVersionNumber(),
level_,
seek_compaction_);
if (write < 0 || write > len) {
if (write < 0 || write >= len) {
return;
}
char level_low_summary[100];
InputSummary(inputs_[0], level_low_summary, sizeof(level_low_summary));
char level_up_summary[100];
if (inputs_[1].size()) {
InputSummary(inputs_[1], level_up_summary, sizeof(level_up_summary));
} else {
level_up_summary[0] = '\0';
write += InputSummary(inputs_[0], output+write, len-write);
if (write < 0 || write >= len) {
return;
}
snprintf(output + write, len - write, "[%s],[%s]",
level_low_summary, level_up_summary);
write += snprintf(output+write, len-write, "],[");
if (write < 0 || write >= len) {
return;
}
if (inputs_[1].size()) {
write += InputSummary(inputs_[1], output+write, len-write);
}
if (write < 0 || write >= len) {
return;
}
snprintf(output+write, len-write, "]");
}
} // namespace rocksdb

View file

@ -555,22 +555,27 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
version->files_[level].size(), version->LevelFileSummary(&tmp, 0));
// Check for size amplification first.
Compaction* c = PickCompactionUniversalSizeAmp(version, score);
if (c == nullptr) {
Compaction* c;
if ((c = PickCompactionUniversalSizeAmp(version, score)) != nullptr) {
Log(options_->info_log, "Universal: compacting for size amp\n");
} else {
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio = options_->compaction_options_universal.size_ratio;
c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX);
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
if (c == nullptr) {
if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX)) != nullptr) {
Log(options_->info_log, "Universal: compacting for size ratio\n");
} else {
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
unsigned int num_files = version->files_[level].size() -
options_->level0_file_num_compaction_trigger;
c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files);
if ((c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files)) != nullptr) {
Log(options_->info_log, "Universal: compacting for file num\n");
}
}
}
if (c == nullptr) {
@ -675,14 +680,32 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
if (f->being_compacted) {
break;
}
// pick files if the total candidate file size (increased by the
// Pick files if the total/last candidate file size (increased by the
// specified ratio) is still larger than the next candidate file.
// candidate_size is the total size of files picked so far with the
// default kCompactionStopStyleTotalSize; with
// kCompactionStopStyleSimilarSize, it's simply the size of the last
// picked file.
uint64_t sz = (candidate_size * (100L + ratio)) /100;
if (sz < f->file_size) {
break;
}
if (options_->compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) {
// Similar-size stopping rule: also check the last picked file isn't
// far larger than the next candidate file.
sz = (f->file_size * (100L + ratio)) / 100;
if (sz < candidate_size) {
// If the small file we've encountered begins a run of similar-size
// files, we'll pick them up on a future iteration of the outer
// loop. If it's some lonely straggler, it'll eventually get picked
// by the last-resort read amp strategy which disregards size ratios.
break;
}
candidate_size = f->file_size;
} else { // default kCompactionStopStyleTotalSize
candidate_size += f->file_size;
}
candidate_count++;
candidate_size += f->file_size;
}
// Found a series of consecutive files that need compaction.

View file

@ -7,6 +7,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <cstddef>
#include <sys/types.h>
#include <stdio.h>
@ -187,6 +189,11 @@ DEFINE_int32(max_background_compactions,
"The maximum number of concurrent background compactions"
" that can occur in parallel.");
DEFINE_int32(max_background_flushes,
rocksdb::Options().max_background_flushes,
"The maximum number of concurrent background flushes"
" that can occur in parallel.");
static rocksdb::CompactionStyle FLAGS_compaction_style_e;
DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style,
"style of compaction: level-based vs universal");
@ -223,6 +230,8 @@ DEFINE_int32(open_files, rocksdb::Options().max_open_files,
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
" use default settings.");
DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. "
"Negative means no bloom filter.");
DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing"
" database. If you set this flag and also specify a benchmark that"
@ -487,11 +496,15 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) {
}
DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
"plain table");
DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated "
"per prefix, 0 means no special handling of the prefix, "
"i.e. use the prefix comes with the generated random number.");
enum RepFactory {
kSkipList,
kPrefixHash,
kVectorRep
kVectorRep,
kHashLinkedList
};
enum RepFactory StringToRepFactory(const char* ctype) {
assert(ctype);
@ -502,12 +515,15 @@ enum RepFactory StringToRepFactory(const char* ctype) {
return kPrefixHash;
else if (!strcasecmp(ctype, "vector"))
return kVectorRep;
else if (!strcasecmp(ctype, "hash_linkedlist"))
return kHashLinkedList;
fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
return kSkipList;
}
static enum RepFactory FLAGS_rep_factory;
DEFINE_string(memtablerep, "skip_list", "");
DEFINE_int64(hash_bucket_count, 1024 * 1024, "hash bucket count");
DEFINE_bool(use_plain_table, false, "if use plain table "
"instead of block-based table format");
@ -593,9 +609,9 @@ class Stats {
double start_;
double finish_;
double seconds_;
long long done_;
long long last_report_done_;
long long next_report_;
int64_t done_;
int64_t last_report_done_;
int64_t next_report_;
int64_t bytes_;
double last_op_finish_;
double last_report_finish_;
@ -672,12 +688,12 @@ class Stats {
else if (next_report_ < 100000) next_report_ += 10000;
else if (next_report_ < 500000) next_report_ += 50000;
else next_report_ += 100000;
fprintf(stderr, "... finished %lld ops%30s\r", done_, "");
fprintf(stderr, "... finished %" PRIu64 " ops%30s\r", done_, "");
fflush(stderr);
} else {
double now = FLAGS_env->NowMicros();
fprintf(stderr,
"%s ... thread %d: (%lld,%lld) ops and "
"%s ... thread %d: (%" PRIu64 ",%" PRIu64 ") ops and "
"(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(),
id_,
@ -773,7 +789,7 @@ struct ThreadState {
class Duration {
public:
Duration(int max_seconds, long long max_ops) {
Duration(int max_seconds, int64_t max_ops) {
max_seconds_ = max_seconds;
max_ops_= max_ops;
ops_ = 0;
@ -799,8 +815,8 @@ class Duration {
private:
int max_seconds_;
long long max_ops_;
long long ops_;
int64_t max_ops_;
int64_t ops_;
double start_at_;
};
@ -811,24 +827,27 @@ class Benchmark {
const FilterPolicy* filter_policy_;
const SliceTransform* prefix_extractor_;
DB* db_;
long long num_;
int64_t num_;
int value_size_;
int key_size_;
int prefix_size_;
int64_t keys_per_prefix_;
int entries_per_batch_;
WriteOptions write_options_;
long long reads_;
long long writes_;
long long readwrites_;
long long merge_keys_;
int64_t reads_;
int64_t writes_;
int64_t readwrites_;
int64_t merge_keys_;
int heap_counter_;
char keyFormat_[100]; // will contain the format of key. e.g "%016d"
void PrintHeader() {
PrintEnvironment();
fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size);
fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
FLAGS_value_size,
static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
fprintf(stdout, "Entries: %lld\n", num_);
fprintf(stdout, "Entries: %" PRIu64 "\n", num_);
fprintf(stdout, "Prefix: %d bytes\n", FLAGS_prefix_size);
fprintf(stdout, "Keys per prefix: %" PRIu64 "\n", keys_per_prefix_);
fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
((static_cast<int64_t>(FLAGS_key_size + FLAGS_value_size) * num_)
/ 1048576.0));
@ -856,7 +875,7 @@ class Benchmark {
case rocksdb::kLZ4HCCompression:
fprintf(stdout, "Compression: lz4hc\n");
break;
}
}
switch (FLAGS_rep_factory) {
case kPrefixHash:
@ -868,6 +887,9 @@ class Benchmark {
case kVectorRep:
fprintf(stdout, "Memtablerep: vector\n");
break;
case kHashLinkedList:
fprintf(stdout, "Memtablerep: hash_linkedlist\n");
break;
}
fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
@ -1000,12 +1022,13 @@ class Benchmark {
filter_policy_(FLAGS_bloom_bits >= 0
? NewBloomFilterPolicy(FLAGS_bloom_bits)
: nullptr),
prefix_extractor_(NewFixedPrefixTransform(FLAGS_use_plain_table ?
FLAGS_prefix_size : FLAGS_key_size-1)),
prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
db_(nullptr),
num_(FLAGS_num),
value_size_(FLAGS_value_size),
key_size_(FLAGS_key_size),
prefix_size_(FLAGS_prefix_size),
keys_per_prefix_(FLAGS_keys_per_prefix),
entries_per_batch_(1),
reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
@ -1014,6 +1037,11 @@ class Benchmark {
),
merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
heap_counter_(0) {
if (FLAGS_prefix_size > FLAGS_key_size) {
fprintf(stderr, "prefix size is larger than key size");
exit(1);
}
std::vector<std::string> files;
FLAGS_env->GetChildren(FLAGS_db, &files);
for (unsigned int i = 0; i < files.size(); i++) {
@ -1032,17 +1060,55 @@ class Benchmark {
delete prefix_extractor_;
}
//this function will construct string format for key. e.g "%016lld"
void ConstructStrFormatForKey(char* str, int keySize) {
str[0] = '%';
str[1] = '0';
sprintf(str+2, "%dlld%s", keySize, "%s");
}
// Generate key according to the given specification and random number.
// The resulting key will have the following format (if keys_per_prefix_
// is positive), extra trailing bytes are either cut off or paddd with '0'.
// The prefix value is derived from key value.
// ----------------------------
// | prefix 00000 | key 00000 |
// ----------------------------
// If keys_per_prefix_ is 0, the key is simply a binary representation of
// random number followed by trailing '0's
// ----------------------------
// | key 00000 |
// ----------------------------
std::string GenerateKeyFromInt(uint64_t v, int64_t num_keys) {
std::string key;
key.resize(key_size_);
char* start = &(key[0]);
char* pos = start;
if (keys_per_prefix_ > 0) {
int64_t num_prefix = num_keys / keys_per_prefix_;
int64_t prefix = v % num_prefix;
int bytes_to_fill = std::min(prefix_size_, 8);
if (port::kLittleEndian) {
for (int i = 0; i < bytes_to_fill; ++i) {
pos[i] = (prefix >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
}
} else {
memcpy(pos, static_cast<void*>(&prefix), bytes_to_fill);
}
if (prefix_size_ > 8) {
// fill the rest with 0s
memset(pos + 8, '0', prefix_size_ - 8);
}
pos += prefix_size_;
}
unique_ptr<char []> GenerateKeyFromInt(long long v, const char* suffix = "") {
unique_ptr<char []> keyInStr(new char[kMaxKeySize + 1]);
snprintf(keyInStr.get(), kMaxKeySize + 1, keyFormat_, v, suffix);
return keyInStr;
int bytes_to_fill = std::min(key_size_ - static_cast<int>(pos - start), 8);
if (port::kLittleEndian) {
for (int i = 0; i < bytes_to_fill; ++i) {
pos[i] = (v >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
}
} else {
memcpy(pos, static_cast<void*>(&v), bytes_to_fill);
}
pos += bytes_to_fill;
if (key_size_ > pos - start) {
memset(pos, '0', key_size_ - (pos - start));
}
return key;
}
void Run() {
@ -1066,7 +1132,6 @@ class Benchmark {
writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
value_size_ = FLAGS_value_size;
key_size_ = FLAGS_key_size;
ConstructStrFormatForKey(keyFormat_, key_size_);
entries_per_batch_ = 1;
write_options_ = WriteOptions();
if (FLAGS_sync) {
@ -1469,12 +1534,14 @@ class Benchmark {
options.min_write_buffer_number_to_merge =
FLAGS_min_write_buffer_number_to_merge;
options.max_background_compactions = FLAGS_max_background_compactions;
options.max_background_flushes = FLAGS_max_background_flushes;
options.compaction_style = FLAGS_compaction_style_e;
options.block_size = FLAGS_block_size;
options.filter_policy = filter_policy_;
options.prefix_extractor =
(FLAGS_use_plain_table || FLAGS_use_prefix_blooms) ? prefix_extractor_
: nullptr;
options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits;
options.max_open_files = FLAGS_open_files;
options.statistics = dbstats;
options.env = FLAGS_env;
@ -1488,19 +1555,26 @@ class Benchmark {
options.max_bytes_for_level_multiplier =
FLAGS_max_bytes_for_level_multiplier;
options.filter_deletes = FLAGS_filter_deletes;
if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) {
fprintf(stderr, "prefix_size should be non-zero iff memtablerep "
"== prefix_hash\n");
if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash ||
FLAGS_rep_factory == kHashLinkedList)) {
fprintf(stderr, "prefix_size should be non-zero if PrefixHash or "
"HashLinkedList memtablerep is used\n");
exit(1);
}
switch (FLAGS_rep_factory) {
case kPrefixHash:
options.memtable_factory.reset(NewHashSkipListRepFactory(
NewFixedPrefixTransform(FLAGS_prefix_size)));
prefix_extractor_,
FLAGS_hash_bucket_count));
break;
case kSkipList:
// no need to do anything
break;
case kHashLinkedList:
options.memtable_factory.reset(NewHashLinkListRepFactory(
prefix_extractor_,
FLAGS_hash_bucket_count));
break;
case kVectorRep:
options.memtable_factory.reset(
new VectorRepFactory
@ -1508,7 +1582,8 @@ class Benchmark {
break;
}
if (FLAGS_use_plain_table) {
if (FLAGS_rep_factory != kPrefixHash) {
if (FLAGS_rep_factory != kPrefixHash &&
FLAGS_rep_factory != kHashLinkedList) {
fprintf(stderr, "Waring: plain table is used with skipList\n");
}
if (!FLAGS_mmap_read && !FLAGS_mmap_write) {
@ -1688,7 +1763,7 @@ class Benchmark {
void DoWrite(ThreadState* thread, WriteMode write_mode) {
const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
const int num_ops = writes_ == 0 ? num_ : writes_ ;
const int64_t num_ops = writes_ == 0 ? num_ : writes_;
Duration duration(test_duration, num_ops);
unique_ptr<BitSet> bit_set;
@ -1698,7 +1773,7 @@ class Benchmark {
if (num_ != FLAGS_num) {
char msg[100];
snprintf(msg, sizeof(msg), "(%lld ops)", num_);
snprintf(msg, sizeof(msg), "(%" PRIu64 " ops)", num_);
thread->stats.AddMessage(msg);
}
@ -1710,7 +1785,7 @@ class Benchmark {
while (!duration.Done(entries_per_batch_)) {
batch.Clear();
for (int j = 0; j < entries_per_batch_; j++) {
long long k = 0;
int64_t k = 0;
switch(write_mode) {
case SEQUENTIAL:
k = i +j;
@ -1720,7 +1795,7 @@ class Benchmark {
break;
case UNIQUE_RANDOM:
{
const long long t = thread->rand.Next() % FLAGS_num;
const int64_t t = thread->rand.Next() % FLAGS_num;
if (!bit_set->test(t)) {
// best case
k = t;
@ -1748,9 +1823,9 @@ class Benchmark {
break;
}
};
unique_ptr<char []> key = GenerateKeyFromInt(k);
batch.Put(key.get(), gen.Generate(value_size_));
bytes += value_size_ + strlen(key.get());
std::string key = GenerateKeyFromInt(k, FLAGS_num);
batch.Put(key, gen.Generate(value_size_));
bytes += value_size_ + key.size();
thread->stats.FinishedSingleOp(db_);
}
s = db_->Write(write_options_, &batch);
@ -1765,7 +1840,7 @@ class Benchmark {
void ReadSequential(ThreadState* thread) {
Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
long long i = 0;
int64_t i = 0;
int64_t bytes = 0;
for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
bytes += iter->key().size() + iter->value().size();
@ -1778,7 +1853,7 @@ class Benchmark {
void ReadReverse(ThreadState* thread) {
Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
long long i = 0;
int64_t i = 0;
int64_t bytes = 0;
for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
bytes += iter->key().size() + iter->value().size();
@ -1792,20 +1867,20 @@ class Benchmark {
// Calls MultiGet over a list of keys from a random distribution.
// Returns the total number of keys found.
long MultiGetRandom(ReadOptions& options, int num_keys,
Random64& rand, long long range, const char* suffix) {
Random64* rand, int64_t range, const char* suffix) {
assert(num_keys > 0);
std::vector<Slice> keys(num_keys);
std::vector<std::string> values(num_keys);
std::vector<unique_ptr<char []> > gen_keys(num_keys);
std::vector<std::string> gen_keys(num_keys);
int i;
long long k;
int64_t k;
// Fill the keys vector
for(i=0; i<num_keys; ++i) {
k = rand.Next() % range;
gen_keys[i] = GenerateKeyFromInt(k,suffix);
keys[i] = gen_keys[i].get();
k = rand->Next() % range;
gen_keys[i] = GenerateKeyFromInt(k, range) + suffix;
keys[i] = gen_keys[i];
}
if (FLAGS_use_snapshot) {
@ -1841,7 +1916,7 @@ class Benchmark {
ReadOptions options(FLAGS_verify_checksum, true);
Duration duration(FLAGS_duration, reads_);
long long found = 0;
int64_t found = 0;
if (FLAGS_use_multiget) { // MultiGet
const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group
@ -1850,7 +1925,8 @@ class Benchmark {
// Recalculate number of keys per group, and call MultiGet until done
long num_keys;
while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, "");
found +=
MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, "");
thread->stats.FinishedSingleOp(db_);
keys_left -= num_keys;
}
@ -1858,11 +1934,11 @@ class Benchmark {
options.tailing = true;
Iterator* iter = db_->NewIterator(options);
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % FLAGS_num;
unique_ptr<char[]> key = GenerateKeyFromInt(k);
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
iter->Seek(key.get());
if (iter->Valid() && iter->key().compare(Slice(key.get())) == 0) {
iter->Seek(key);
if (iter->Valid() && iter->key().compare(Slice(key)) == 0) {
++found;
}
@ -1870,33 +1946,34 @@ class Benchmark {
}
delete iter;
} else { // Regular case. Do one "get" at a time Get
options.tailing = true;
options.prefix_seek = (FLAGS_prefix_size == 0);
Iterator* iter = db_->NewIterator(options);
std::string value;
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
}
if (FLAGS_read_range < 2) {
if (db_->Get(options, key.get(), &value).ok()) {
if (db_->Get(options, key, &value).ok()) {
found++;
}
} else {
Slice skey(key.get());
int count = 1;
if (FLAGS_get_approx) {
unique_ptr<char []> key2 =
GenerateKeyFromInt(k + (int) FLAGS_read_range);
Slice skey2(key2.get());
Range range(skey, skey2);
std::string key2 =
GenerateKeyFromInt(k + static_cast<int>(FLAGS_read_range),
FLAGS_num + FLAGS_read_range);
Range range(key, key2);
uint64_t sizes;
db_->GetApproximateSizes(&range, 1, &sizes);
}
for (iter->Seek(skey);
for (iter->Seek(key);
iter->Valid() && count <= FLAGS_read_range;
++count, iter->Next()) {
found++;
@ -1915,8 +1992,14 @@ class Benchmark {
}
char msg[100];
snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_);
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
found, reads_);
thread->stats.AddMessage(msg);
if (FLAGS_perf_level > 0) {
thread->stats.AddMessage(perf_context.ToString());
}
}
void PrefixScanRandom(ThreadState* thread) {
@ -1928,13 +2011,13 @@ class Benchmark {
ReadOptions options(FLAGS_verify_checksum, true);
Duration duration(FLAGS_duration, reads_);
long long found = 0;
int64_t found = 0;
while (!duration.Done(1)) {
std::string value;
const int k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);
Slice skey(key.get());
std::string key = GenerateKeyFromInt(k, FLAGS_num);
Slice skey(key);
Slice prefix = prefix_extractor_->Transform(skey);
options.prefix = FLAGS_use_prefix_api ? &prefix : nullptr;
@ -1950,7 +2033,8 @@ class Benchmark {
}
char msg[100];
snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_);
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
found, reads_);
thread->stats.AddMessage(msg);
}
@ -1968,7 +2052,8 @@ class Benchmark {
long num_keys;
long found;
while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
found = MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, ".");
found =
MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, ".");
// We should not find any key since the key we try to get has a
// different suffix
@ -1983,9 +2068,9 @@ class Benchmark {
std::string value;
Status s;
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k, ".");
s = db_->Get(options, key.get(), &value);
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num) + ".";
s = db_->Get(options, key, &value);
assert(!s.ok() && s.IsNotFound());
thread->stats.FinishedSingleOp(db_);
}
@ -1995,26 +2080,26 @@ class Benchmark {
void ReadHot(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
const long long range = (FLAGS_num + 99) / 100;
long long found = 0;
const int64_t range = (FLAGS_num + 99) / 100;
int64_t found = 0;
if (FLAGS_use_multiget) {
const long long kpg = FLAGS_keys_per_multiget; // keys per multiget group
long long keys_left = reads_;
const int64_t kpg = FLAGS_keys_per_multiget; // keys per multiget group
int64_t keys_left = reads_;
// Recalculate number of keys per group, and call MultiGet until done
long num_keys;
while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
found += MultiGetRandom(options, num_keys, thread->rand, range, "");
found += MultiGetRandom(options, num_keys, &thread->rand, range, "");
thread->stats.FinishedSingleOp(db_);
keys_left -= num_keys;
}
} else {
std::string value;
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % range;
unique_ptr<char []> key = GenerateKeyFromInt(k);
if (db_->Get(options, key.get(), &value).ok()){
const int64_t k = thread->rand.Next() % range;
std::string key = GenerateKeyFromInt(k, range);
if (db_->Get(options, key, &value).ok()) {
++found;
}
thread->stats.FinishedSingleOp(db_);
@ -2022,7 +2107,8 @@ class Benchmark {
}
char msg[100];
snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_);
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
found, reads_);
thread->stats.AddMessage(msg);
}
@ -2040,18 +2126,19 @@ class Benchmark {
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
std::string value;
long long found = 0;
int64_t found = 0;
while (!duration.Done(1)) {
Iterator* iter = db_->NewIterator(options);
const long long k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);
iter->Seek(key.get());
if (iter->Valid() && iter->key() == key.get()) found++;
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
iter->Seek(key);
if (iter->Valid() && iter->key() == Slice(key)) found++;
delete iter;
thread->stats.FinishedSingleOp(db_);
}
char msg[100];
snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, num_);
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
found, num_);
thread->stats.AddMessage(msg);
}
@ -2063,9 +2150,9 @@ class Benchmark {
while (!duration.Done(entries_per_batch_)) {
batch.Clear();
for (int j = 0; j < entries_per_batch_; j++) {
const long long k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
unique_ptr<char []> key = GenerateKeyFromInt(k);
batch.Delete(key.get());
const int64_t k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
std::string key = GenerateKeyFromInt(k, FLAGS_num);
batch.Delete(key);
thread->stats.FinishedSingleOp(db_);
}
s = db_->Write(write_options_, &batch);
@ -2113,10 +2200,9 @@ class Benchmark {
}
}
const long long k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);
Status s = db_->Put(write_options_, key.get(),
gen.Generate(value_size_));
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -2228,18 +2314,18 @@ class Benchmark {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
std::string value;
long long found = 0;
int64_t found = 0;
int get_weight = 0;
int put_weight = 0;
int delete_weight = 0;
long long gets_done = 0;
long long puts_done = 0;
long long deletes_done = 0;
int64_t gets_done = 0;
int64_t puts_done = 0;
int64_t deletes_done = 0;
// the number of iterations is the larger of read_ or write_
for (long long i = 0; i < readwrites_; i++) {
const long long k = thread->rand.Next() % (FLAGS_numdistinct);
unique_ptr<char []> key = GenerateKeyFromInt(k);
for (int64_t i = 0; i < readwrites_; i++) {
const int64_t k = thread->rand.Next() % (FLAGS_numdistinct);
std::string key = GenerateKeyFromInt(k, FLAGS_numdistinct);
if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
// one batch completed, reinitialize for next batch
get_weight = FLAGS_readwritepercent;
@ -2248,7 +2334,7 @@ class Benchmark {
}
if (get_weight > 0) {
// do all the gets first
Status s = GetMany(options, key.get(), &value);
Status s = GetMany(options, key, &value);
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "getmany error: %s\n", s.ToString().c_str());
// we continue after error rather than exiting so that we can
@ -2261,8 +2347,7 @@ class Benchmark {
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
Status s = PutMany(write_options_, key.get(),
gen.Generate(value_size_));
Status s = PutMany(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
exit(1);
@ -2270,7 +2355,7 @@ class Benchmark {
put_weight--;
puts_done++;
} else if (delete_weight > 0) {
Status s = DeleteMany(write_options_, key.get());
Status s = DeleteMany(write_options_, key);
if (!s.ok()) {
fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str());
exit(1);
@ -2283,7 +2368,8 @@ class Benchmark {
}
char msg[100];
snprintf(msg, sizeof(msg),
"( get:%lld put:%lld del:%lld total:%lld found:%lld)",
"( get:%" PRIu64 " put:%" PRIu64 " del:%" PRIu64 " total:%" \
PRIu64 " found:%" PRIu64 ")",
gets_done, puts_done, deletes_done, readwrites_, found);
thread->stats.AddMessage(msg);
}
@ -2300,17 +2386,17 @@ class Benchmark {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
std::string value;
long long found = 0;
int64_t found = 0;
int get_weight = 0;
int put_weight = 0;
long long reads_done = 0;
long long writes_done = 0;
int64_t reads_done = 0;
int64_t writes_done = 0;
Duration duration(FLAGS_duration, readwrites_);
// the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
if (get_weight == 0 && put_weight == 0) {
// one batch completed, reinitialize for next batch
get_weight = FLAGS_readwritepercent;
@ -2323,17 +2409,14 @@ class Benchmark {
}
if (FLAGS_get_approx) {
char key2[100];
snprintf(key2, sizeof(key2), "%016lld", k + 1);
Slice skey2(key2);
Slice skey(key2);
Range range(skey, skey2);
std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1);
Range range(key, key2);
uint64_t sizes;
db_->GetApproximateSizes(&range, 1, &sizes);
}
// do all the gets first
Status s = db_->Get(options, key.get(), &value);
Status s = db_->Get(options, key, &value);
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
// we continue after error rather than exiting so that we can
@ -2352,8 +2435,7 @@ class Benchmark {
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
Status s = db_->Put(write_options_, key.get(),
gen.Generate(value_size_));
Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -2364,8 +2446,8 @@ class Benchmark {
thread->stats.FinishedSingleOp(db_);
}
char msg[100];
snprintf(msg, sizeof(msg),
"( reads:%lld writes:%lld total:%lld found:%lld)",
snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
" total:%" PRIu64 " found:%" PRIu64 ")",
reads_done, writes_done, readwrites_, found);
thread->stats.AddMessage(msg);
}
@ -2388,10 +2470,10 @@ class Benchmark {
long num_keys; // number of keys to read in current group
long num_put_keys; // number of keys to put in current group
long found = 0;
long reads_done = 0;
long writes_done = 0;
long multigets_done = 0;
int64_t found = 0;
int64_t reads_done = 0;
int64_t writes_done = 0;
int64_t multigets_done = 0;
// the number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
@ -2415,18 +2497,18 @@ class Benchmark {
assert(num_keys + num_put_keys <= keys_left);
// Apply the MultiGet operations
found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, "");
found += MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, "");
++multigets_done;
reads_done+=num_keys;
thread->stats.FinishedSingleOp(db_);
// Now do the puts
int i;
long long k;
int64_t k;
for(i=0; i<num_put_keys; ++i) {
k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);
Status s = db_->Put(write_options_, key.get(),
std::string key = GenerateKeyFromInt(k, FLAGS_num);
Status s = db_->Put(write_options_, key,
gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
@ -2440,7 +2522,8 @@ class Benchmark {
}
char msg[100];
snprintf(msg, sizeof(msg),
"( reads:%ld writes:%ld total:%lld multiget_ops:%ld found:%ld)",
"( reads:%" PRIu64 " writes:%" PRIu64 " total:%" PRIu64 \
" multiget_ops:%" PRIu64 " found:%" PRIu64 ")",
reads_done, writes_done, readwrites_, multigets_done, found);
thread->stats.AddMessage(msg);
}
@ -2451,29 +2534,26 @@ class Benchmark {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
std::string value;
long long found = 0;
int64_t found = 0;
Duration duration(FLAGS_duration, readwrites_);
// the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
}
if (FLAGS_get_approx) {
char key2[100];
snprintf(key2, sizeof(key2), "%016lld", k + 1);
Slice skey2(key2);
Slice skey(key2);
Range range(skey, skey2);
std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1);
Range range(key, key2);
uint64_t sizes;
db_->GetApproximateSizes(&range, 1, &sizes);
}
if (db_->Get(options, key.get(), &value).ok()) {
if (db_->Get(options, key, &value).ok()) {
found++;
}
@ -2481,7 +2561,7 @@ class Benchmark {
db_->ReleaseSnapshot(options.snapshot);
}
Status s = db_->Put(write_options_, key.get(), gen.Generate(value_size_));
Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -2490,7 +2570,7 @@ class Benchmark {
}
char msg[100];
snprintf(msg, sizeof(msg),
"( updates:%lld found:%lld)", readwrites_, found);
"( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
thread->stats.AddMessage(msg);
}
@ -2501,30 +2581,27 @@ class Benchmark {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
std::string value;
long found = 0;
int64_t found = 0;
// The number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
}
if (FLAGS_get_approx) {
char key2[100];
snprintf(key2, sizeof(key2), "%016lld", k + 1);
Slice skey2(key2);
Slice skey(key2);
Range range(skey, skey2);
std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1);
Range range(key, key2);
uint64_t sizes;
db_->GetApproximateSizes(&range, 1, &sizes);
}
// Get the existing value
if (db_->Get(options, key.get(), &value).ok()) {
if (db_->Get(options, key, &value).ok()) {
found++;
} else {
// If not existing, then just assume an empty string of data
@ -2544,7 +2621,7 @@ class Benchmark {
value.append(operand.data(), operand.size());
// Write back to the database
Status s = db_->Put(write_options_, key.get(), value);
Status s = db_->Put(write_options_, key, value);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -2552,7 +2629,8 @@ class Benchmark {
thread->stats.FinishedSingleOp(db_);
}
char msg[100];
snprintf(msg, sizeof(msg), "( updates:%lld found:%ld)", readwrites_, found);
snprintf(msg, sizeof(msg), "( updates:%" PRIu64 " found:%" PRIu64 ")",
readwrites_, found);
thread->stats.AddMessage(msg);
}
@ -2572,11 +2650,10 @@ class Benchmark {
// The number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % merge_keys_;
unique_ptr<char []> key = GenerateKeyFromInt(k);
const int64_t k = thread->rand.Next() % merge_keys_;
std::string key = GenerateKeyFromInt(k, merge_keys_);
Status s = db_->Merge(write_options_, key.get(),
gen.Generate(value_size_));
Status s = db_->Merge(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
@ -2587,7 +2664,7 @@ class Benchmark {
// Print some statistics
char msg[100];
snprintf(msg, sizeof(msg), "( updates:%lld)", readwrites_);
snprintf(msg, sizeof(msg), "( updates:%" PRIu64 ")", readwrites_);
thread->stats.AddMessage(msg);
}
@ -2602,23 +2679,22 @@ class Benchmark {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
std::string value;
long long num_hits = 0;
long long num_gets = 0;
long long num_merges = 0;
int64_t num_hits = 0;
int64_t num_gets = 0;
int64_t num_merges = 0;
size_t max_length = 0;
// the number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % merge_keys_;
unique_ptr<char []> key = GenerateKeyFromInt(k);
const int64_t k = thread->rand.Next() % merge_keys_;
std::string key = GenerateKeyFromInt(k, merge_keys_);
bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;
if (do_merge) {
Status s = db_->Merge(write_options_, key.get(),
gen.Generate(value_size_));
Status s = db_->Merge(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
@ -2627,7 +2703,7 @@ class Benchmark {
num_merges++;
} else {
Status s = db_->Get(options, key.get(), &value);
Status s = db_->Get(options, key, &value);
if (value.length() > max_length)
max_length = value.length();
@ -2647,7 +2723,8 @@ class Benchmark {
}
char msg[100];
snprintf(msg, sizeof(msg),
"(reads:%lld merges:%lld total:%lld hits:%lld maxlength:%zu)",
"(reads:%" PRIu64 " merges:%" PRIu64 " total:%" PRIu64 " hits:%" \
PRIu64 " maxlength:%zu)",
num_gets, num_merges, readwrites_, num_hits, max_length);
thread->stats.AddMessage(msg);
}

View file

@ -2330,7 +2330,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
compact->compaction->level(), compact->compaction->num_input_files(1),
compact->compaction->output_level(), compact->compaction->score(),
options_.max_background_compactions - bg_compaction_scheduled_);
char scratch[256];
char scratch[2345];
compact->compaction->Summary(scratch, sizeof(scratch));
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
@ -2727,10 +2727,11 @@ struct IterState {
static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
DBImpl::DeletionState deletion_state;
bool need_cleanup = state->super_version->Unref();
if (need_cleanup) {
DBImpl::DeletionState deletion_state;
state->mu->Lock();
state->super_version->Cleanup();
state->db->FindObsoleteFiles(deletion_state, false, true);

View file

@ -1470,6 +1470,23 @@ TEST(DBTest, FilterDeletes) {
} while (ChangeCompactOptions());
}
TEST(DBTest, IterSeekBeforePrev) {
ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d"));
dbfull()->Flush(FlushOptions());
ASSERT_OK(Put("0", "f"));
ASSERT_OK(Put("1", "h"));
dbfull()->Flush(FlushOptions());
ASSERT_OK(Put("2", "j"));
auto iter = db_->NewIterator(ReadOptions());
iter->Seek(Slice("c"));
iter->Prev();
iter->Seek(Slice("a"));
iter->Prev();
delete iter;
}
TEST(DBTest, IterEmpty) {
do {
CreateAndReopenWithCF({"pikachu"});
@ -2552,6 +2569,89 @@ TEST(DBTest, UniversalCompactionOptions) {
}
}
TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100<<10; //100KB
// trigger compaction if there are >= 4 files
options.level0_file_num_compaction_trigger = 4;
options.compaction_options_universal.size_ratio = 10;
options.compaction_options_universal.stop_style = kCompactionStopStyleSimilarSize;
options.num_levels=1;
Reopen(&options);
Random rnd(301);
int key_idx = 0;
// Stage 1:
// Generate a set of files at level 0, but don't trigger level-0
// compaction.
for (int num = 0;
num < options.level0_file_num_compaction_trigger-1;
num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0), num + 1);
}
// Generate one more file at level-0, which should trigger level-0
// compaction.
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForCompact();
// Suppose each file flushed from mem table has size 1. Now we compact
// (level0_file_num_compaction_trigger+1)=4 files and should have a big
// file of size 4.
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
// Stage 2:
// Now we have one file at level 0, with size 4. We also have some data in
// mem table. Let's continue generating new files at level 0, but don't
// trigger level-0 compaction.
// First, clean up memtable before inserting new data. This will generate
// a level-0 file, with size around 0.4 (according to previously written
// data amount).
dbfull()->Flush(FlushOptions());
for (int num = 0;
num < options.level0_file_num_compaction_trigger-3;
num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0), num + 3);
}
// Generate one more file at level-0, which should trigger level-0
// compaction.
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForCompact();
// Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
// After compaction, we should have 3 files, with size 4, 0.4, 2.
ASSERT_EQ(NumTableFilesAtLevel(0), 3);
// Stage 3:
// Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one
// more file at level-0, which should trigger level-0 compaction.
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForCompact();
// Level-0 compaction is triggered, but no file will be picked up.
ASSERT_EQ(NumTableFilesAtLevel(0), 4);
}
#if defined(SNAPPY) && defined(ZLIB) && defined(BZIP2)
TEST(DBTest, CompressedCache) {
int num_iter = 80;

View file

@ -7,6 +7,7 @@
#define STORAGE_ROCKSDB_INCLUDE_PERF_CONTEXT_H
#include <stdint.h>
#include <string>
namespace rocksdb {
@ -26,6 +27,8 @@ struct PerfContext {
void Reset(); // reset all performance counters to zero
std::string ToString() const;
uint64_t user_key_comparison_count; // total number of user key comparisons
uint64_t block_cache_hit_count; // total number of block cache hits
uint64_t block_read_count; // total number of block reads (with IO)

View file

@ -58,14 +58,13 @@ struct BackupableDBOptions {
explicit BackupableDBOptions(const std::string& _backup_dir,
Env* _backup_env = nullptr,
bool _share_table_files = true,
Logger* _info_log = nullptr,
bool _sync = true,
bool _destroy_old_data = false) :
backup_dir(_backup_dir),
backup_env(_backup_env),
info_log(_info_log),
sync(_sync),
destroy_old_data(_destroy_old_data) { }
Logger* _info_log = nullptr, bool _sync = true,
bool _destroy_old_data = false)
: backup_dir(_backup_dir),
backup_env(_backup_env),
info_log(_info_log),
sync(_sync),
destroy_old_data(_destroy_old_data) {}
};
typedef uint32_t BackupID;
@ -99,8 +98,6 @@ class BackupEngine {
const std::string& wal_dir) = 0;
virtual Status RestoreDBFromLatestBackup(const std::string& db_dir,
const std::string& wal_dir) = 0;
virtual void DeleteBackupsNewerThan(uint64_t sequence_number) = 0;
};
// Stack your DB with BackupableDB to be able to backup the DB
@ -138,32 +135,33 @@ class BackupableDB : public StackableDB {
// Use this class to access information about backups and restore from them
class RestoreBackupableDB {
public:
RestoreBackupableDB(Env* db_env, const BackupableDBOptions& options);
~RestoreBackupableDB();
public:
RestoreBackupableDB(Env* db_env, const BackupableDBOptions& options);
~RestoreBackupableDB();
// Returns info about backups in backup_info
void GetBackupInfo(std::vector<BackupInfo>* backup_info);
// Returns info about backups in backup_info
void GetBackupInfo(std::vector<BackupInfo>* backup_info);
// restore from backup with backup_id
// IMPORTANT -- if options_.share_table_files == true and you restore DB
// from some backup that is not the latest, and you start creating new
// backups from the new DB, all the backups that were newer than the
// backup you restored from will be deleted
//
// Example: Let's say you have backups 1, 2, 3, 4, 5 and you restore 3.
// If you try creating a new backup now, old backups 4 and 5 will be deleted
// and new backup with ID 4 will be created.
Status RestoreDBFromBackup(BackupID backup_id, const std::string& db_dir,
const std::string& wal_dir);
// restore from backup with backup_id
// IMPORTANT -- if options_.share_table_files == true and you restore DB
// from some backup that is not the latest, and you start creating new
// backups from the new DB, they will probably fail
//
// Example: Let's say you have backups 1, 2, 3, 4, 5 and you restore 3.
// If you add new data to the DB and try creating a new backup now, the
// database will diverge from backups 4 and 5 and the new backup will fail.
// If you want to create new backup, you will first have to delete backups 4
// and 5.
Status RestoreDBFromBackup(BackupID backup_id, const std::string& db_dir,
const std::string& wal_dir);
// restore from the latest backup
Status RestoreDBFromLatestBackup(const std::string& db_dir,
const std::string& wal_dir);
// deletes old backups, keeping latest num_backups_to_keep alive
Status PurgeOldBackups(uint32_t num_backups_to_keep);
// deletes a specific backup
Status DeleteBackup(BackupID backup_id);
// restore from the latest backup
Status RestoreDBFromLatestBackup(const std::string& db_dir,
const std::string& wal_dir);
// deletes old backups, keeping latest num_backups_to_keep alive
Status PurgeOldBackups(uint32_t num_backups_to_keep);
// deletes a specific backup
Status DeleteBackup(BackupID backup_id);
private:
BackupEngine* backup_engine_;

View file

@ -82,7 +82,6 @@ void FilterBlockBuilder::AddKey(const Slice& key) {
Slice prefix = prefix_extractor_->Transform(user_key);
InternalKey internal_prefix_tmp(prefix, 0, kTypeValue);
Slice internal_prefix = internal_prefix_tmp.Encode();
assert(comparator_->Compare(internal_prefix, key) <= 0);
start_.push_back(entries_.size());
entries_.append(internal_prefix.data(), internal_prefix.size());
}

View file

@ -3,6 +3,8 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include <sstream>
#include "util/perf_context_imp.h"
namespace rocksdb {
@ -38,6 +40,35 @@ void PerfContext::Reset() {
write_memtable_time = 0;
}
#define OUTPUT(counter) #counter << " = " << counter << ", "
std::string PerfContext::ToString() const {
std::ostringstream ss;
ss << OUTPUT(user_key_comparison_count)
<< OUTPUT(block_cache_hit_count)
<< OUTPUT(block_read_count)
<< OUTPUT(block_read_byte)
<< OUTPUT(block_read_time)
<< OUTPUT(block_checksum_time)
<< OUTPUT(block_decompress_time)
<< OUTPUT(internal_key_skipped_count)
<< OUTPUT(internal_delete_skipped_count)
<< OUTPUT(write_wal_time)
<< OUTPUT(get_snapshot_time)
<< OUTPUT(get_from_memtable_time)
<< OUTPUT(get_from_memtable_count)
<< OUTPUT(get_post_process_time)
<< OUTPUT(get_from_output_files_time)
<< OUTPUT(seek_child_seek_time)
<< OUTPUT(seek_child_seek_count)
<< OUTPUT(seek_min_heap_time)
<< OUTPUT(seek_internal_seek_time)
<< OUTPUT(find_next_user_entry_time)
<< OUTPUT(write_pre_and_post_process_time)
<< OUTPUT(write_memtable_time);
return ss.str();
}
__thread PerfContext perf_context;
}

View file

@ -46,8 +46,6 @@ class BackupEngineImpl : public BackupEngine {
return RestoreDBFromBackup(latest_backup_id_, db_dir, wal_dir);
}
void DeleteBackupsNewerThan(uint64_t sequence_number);
private:
struct FileInfo {
FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
@ -185,6 +183,12 @@ class BackupEngineImpl : public BackupEngine {
Env* db_env_;
Env* backup_env_;
// directories
unique_ptr<Directory> backup_directory_;
unique_ptr<Directory> shared_directory_;
unique_ptr<Directory> meta_directory_;
unique_ptr<Directory> private_directory_;
static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB
};
@ -203,11 +207,17 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
// create all the dirs we need
backup_env_->CreateDirIfMissing(GetAbsolutePath());
backup_env_->NewDirectory(GetAbsolutePath(), &backup_directory_);
if (options_.share_table_files) {
backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel()));
backup_env_->NewDirectory(GetAbsolutePath(GetSharedFileRel()),
&shared_directory_);
}
backup_env_->CreateDirIfMissing(GetAbsolutePath(GetPrivateDirRel()));
backup_env_->NewDirectory(GetAbsolutePath(GetPrivateDirRel()),
&private_directory_);
backup_env_->CreateDirIfMissing(GetBackupMetaDir());
backup_env_->NewDirectory(GetBackupMetaDir(), &meta_directory_);
std::vector<std::string> backup_meta_files;
backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
@ -279,26 +289,6 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
BackupEngineImpl::~BackupEngineImpl() { LogFlush(options_.info_log); }
void BackupEngineImpl::DeleteBackupsNewerThan(uint64_t sequence_number) {
for (auto backup : backups_) {
if (backup.second.GetSequenceNumber() > sequence_number) {
Log(options_.info_log,
"Deleting backup %u because sequence number (%" PRIu64
") is newer than %" PRIu64 "",
backup.first, backup.second.GetSequenceNumber(), sequence_number);
backup.second.Delete();
obsolete_backups_.push_back(backup.first);
}
}
for (auto ob : obsolete_backups_) {
backups_.erase(backups_.find(ob));
}
auto itr = backups_.end();
latest_backup_id_ = (itr == backups_.begin()) ? 0 : (--itr)->first;
PutLatestBackupFileContents(latest_backup_id_); // Ignore errors
GarbageCollection(false);
}
Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
Status s;
std::vector<std::string> live_files;
@ -347,9 +337,8 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
return Status::Corruption("Can't parse file name. This is very bad");
}
// we should only get sst, manifest and current files here
assert(type == kTableFile ||
type == kDescriptorFile ||
type == kCurrentFile);
assert(type == kTableFile || type == kDescriptorFile ||
type == kCurrentFile);
// rules:
// * if it's kTableFile, than it's shared
@ -393,6 +382,28 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
// install the newly created backup meta! (atomic)
s = PutLatestBackupFileContents(new_backup_id);
}
if (s.ok() && options_.sync) {
unique_ptr<Directory> backup_private_directory;
backup_env_->NewDirectory(
GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
&backup_private_directory);
if (backup_private_directory != nullptr) {
backup_private_directory->Fsync();
}
if (private_directory_ != nullptr) {
private_directory_->Fsync();
}
if (meta_directory_ != nullptr) {
meta_directory_->Fsync();
}
if (shared_directory_ != nullptr) {
shared_directory_->Fsync();
}
if (backup_directory_ != nullptr) {
backup_directory_->Fsync();
}
}
if (!s.ok()) {
// clean all the files we might have created
Log(options_.info_log, "Backup failed -- %s", s.ToString().c_str());
@ -590,6 +601,7 @@ Status BackupEngineImpl::CopyFile(const std::string& src,
unique_ptr<SequentialFile> src_file;
EnvOptions env_options;
env_options.use_mmap_writes = false;
env_options.use_os_buffer = false;
if (size != nullptr) {
*size = 0;
}
@ -705,6 +717,7 @@ Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
EnvOptions env_options;
env_options.use_mmap_writes = false;
env_options.use_os_buffer = false;
std::unique_ptr<SequentialFile> src_file;
Status s = src_env->NewSequentialFile(src, &src_file, env_options);
@ -892,6 +905,9 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
uint64_t size;
s = env_->GetFileSize(backup_dir + "/" + filename, &size);
if (!s.ok()) {
return s;
}
if (line.empty()) {
return Status::Corruption("File checksum is missing");
@ -912,6 +928,11 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
files.emplace_back(filename, size, checksum_value);
}
if (s.ok() && data.size() > 0) {
// file has to be read completely. if not, we count it as corruption
s = Status::Corruption("Tailing data in backup meta file");
}
if (s.ok()) {
for (const auto& file_info : files) {
s = AddFile(file_info);
@ -967,11 +988,7 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options)
: StackableDB(db),
backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) {
if (options.share_table_files) {
backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber());
}
}
backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) {}
BackupableDB::~BackupableDB() {
delete backup_engine_;

View file

@ -715,27 +715,38 @@ TEST(BackupableDBTest, OnlineIntegrationTest) {
CloseRestoreDB();
}
TEST(BackupableDBTest, DeleteNewerBackups) {
TEST(BackupableDBTest, FailOverwritingBackups) {
options_.write_buffer_size = 1024 * 1024 * 1024; // 1GB
// create backups 1, 2, 3, 4, 5
OpenBackupableDB(true);
for (int i = 0; i < 5; ++i) {
FillDB(db_.get(), 100 * i, 100 * (i + 1));
ASSERT_OK(db_->CreateNewBackup(!!(i % 2)));
ASSERT_OK(db_->CreateNewBackup(true));
CloseBackupableDB();
OpenBackupableDB(false);
}
CloseBackupableDB();
// backup 3 is fine
AssertBackupConsistency(3, 0, 300, 500);
// this should delete backups 4 and 5
OpenBackupableDB();
CloseBackupableDB();
// backups 4 and 5 don't exist
// restore 3
OpenRestoreDB();
Status s = restore_db_->RestoreDBFromBackup(4, dbname_, dbname_);
ASSERT_TRUE(s.IsNotFound());
s = restore_db_->RestoreDBFromBackup(5, dbname_, dbname_);
ASSERT_TRUE(s.IsNotFound());
ASSERT_OK(restore_db_->RestoreDBFromBackup(3, dbname_, dbname_));
CloseRestoreDB();
OpenBackupableDB(false);
FillDB(db_.get(), 0, 300);
Status s = db_->CreateNewBackup(true);
// the new backup fails because new table files
// clash with old table files from backups 4 and 5
// (since write_buffer_size is huge, we can be sure that
// each backup will generate only one sst file and that
// a file generated by a new backup is the same as
// sst file generated by backup 4)
ASSERT_TRUE(s.IsCorruption());
ASSERT_OK(db_->DeleteBackup(4));
ASSERT_OK(db_->DeleteBackup(5));
// now, the backup can succeed
ASSERT_OK(db_->CreateNewBackup(true));
CloseBackupableDB();
}
TEST(BackupableDBTest, NoShareTableFiles) {