Merge branch 'master' into performance

Conflicts:
	db/db_impl.h
This commit is contained in:
Dhruba Borthakur 2012-11-14 21:39:52 -08:00
commit 6c5a4d646a
20 changed files with 543 additions and 105 deletions

View File

@ -9,4 +9,4 @@
This makes CRC computation much faster, but
binaries won't run on CPUs that don't support it.
* Latest release is 1.5.3.fb
* Latest release is 1.5.5.fb

View File

@ -128,7 +128,7 @@ if test "$USE_SCRIBE"; then
DIRS="$DIRS scribe "
fi
set -f # temporarily disable globbing so that our patterns aren't expanded
set -f # temporarily disable globbing so that our patterns arent expanded
PRUNE_TEST="-name *test*.cc -prune"
PRUNE_BENCH="-name *_bench.cc -prune"
PORTABLE_FILES=`find $DIRS $PRUNE_TEST -o $PRUNE_BENCH -o -name '*.cc' -print | sort | tr "\n" " "`

View File

@ -158,7 +158,7 @@ static int FLAGS_target_file_size_base = 2 * 1048576;
static int FLAGS_target_file_size_multiplier = 1;
// Max bytes for level-1
static int FLAGS_max_bytes_for_level_base = 10 * 1048576;
static uint64_t FLAGS_max_bytes_for_level_base = 10 * 1048576;
// A multiplier to compute max bytes for level-N
static int FLAGS_max_bytes_for_level_multiplier = 10;
@ -191,7 +191,7 @@ static enum leveldb::CompressionType FLAGS_compression_type =
// Allows compression for levels 0 and 1 to be disabled when
// other levels are compressed
static unsigned int FLAGS_min_level_to_compress = -1;
static int FLAGS_min_level_to_compress = -1;
static int FLAGS_table_cache_numshardbits = 4;
@ -211,6 +211,13 @@ static int FLAGS_stats_per_interval = 0;
// less than or equal to this value.
static double FLAGS_rate_limit = 0;
// Control maximum bytes of overlaps in grandparent (i.e., level+2) before we
// stop building a single file in a level->level+1 compaction.
static int FLAGS_max_grandparent_overlap_factor;
// Run read only benchmarks.
static bool FLAGS_read_only = false;
extern bool useOsBuffer;
extern bool useFsReadAhead;
extern bool useMmapRead;
@ -582,10 +589,20 @@ class Benchmark {
void PrintStatistics() {
if (FLAGS_statistics) {
fprintf(stdout, "File opened:%ld closed:%ld errors:%ld\n",
dbstats->getNumFileOpens(),
dbstats->getNumFileCloses(),
dbstats->getNumFileErrors());
fprintf(stdout, "File opened:%ld closed:%ld errors:%ld\n"
"Block Cache Hit Count:%ld Block Cache Miss Count:%ld\n"
"Bloom Filter Useful: %ld \n"
"Compaction key_drop_newer_entry: %ld key_drop_obsolete: %ld "
"Compaction key_drop_user: %ld",
dbstats->getNumFileOpens(),
dbstats->getNumFileCloses(),
dbstats->getNumFileErrors(),
dbstats->getTickerCount(BLOCK_CACHE_HIT),
dbstats->getTickerCount(BLOCK_CACHE_MISS),
dbstats->getTickerCount(BLOOM_FILTER_USEFUL),
dbstats->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
dbstats->getTickerCount(COMPACTION_KEY_DROP_OBSOLETE),
dbstats->getTickerCount(COMPACTION_KEY_DROP_USER));
}
}
@ -942,7 +959,7 @@ class Benchmark {
if (FLAGS_min_level_to_compress >= 0) {
assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
options.compression_per_level = new CompressionType[FLAGS_num_levels];
for (unsigned int i = 0; i < FLAGS_min_level_to_compress; i++) {
for (int i = 0; i < FLAGS_min_level_to_compress; i++) {
options.compression_per_level[i] = kNoCompression;
}
for (unsigned int i = FLAGS_min_level_to_compress;
@ -955,7 +972,14 @@ class Benchmark {
FLAGS_delete_obsolete_files_period_micros;
options.rate_limit = FLAGS_rate_limit;
options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
Status s = DB::Open(options, FLAGS_db, &db_);
options.max_grandparent_overlap_factor =
FLAGS_max_grandparent_overlap_factor;
Status s;
if(FLAGS_read_only) {
s = DB::OpenForReadOnly(options, FLAGS_db, &db_);
} else {
s = DB::Open(options, FLAGS_db, &db_);
}
if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1);
@ -1349,8 +1373,8 @@ int main(int argc, char** argv) {
&n, &junk) == 1) {
FLAGS_target_file_size_multiplier = n;
} else if (
sscanf(argv[i], "--max_bytes_for_level_base=%d%c", &n, &junk) == 1) {
FLAGS_max_bytes_for_level_base = n;
sscanf(argv[i], "--max_bytes_for_level_base=%ld%c", &l, &junk) == 1) {
FLAGS_max_bytes_for_level_base = l;
} else if (sscanf(argv[i], "--max_bytes_for_level_multiplier=%d%c",
&n, &junk) == 1) {
FLAGS_max_bytes_for_level_multiplier = n;
@ -1394,6 +1418,12 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--rate_limit=%lf%c", &d, &junk) == 1 &&
d > 1.0) {
FLAGS_rate_limit = d;
} else if (sscanf(argv[i], "--readonly=%d%c", &n, &junk) == 1 &&
(n == 0 || n ==1 )) {
FLAGS_read_only = n;
} else if (sscanf(argv[i], "--max_grandparent_overlap_factor=%d%c",
&n, &junk) == 1) {
FLAGS_max_grandparent_overlap_factor = n;
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);

View File

@ -24,6 +24,7 @@
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/statistics.h"
#include "leveldb/status.h"
#include "leveldb/table.h"
#include "leveldb/table_builder.h"
@ -169,13 +170,13 @@ Options SanitizeOptions(const std::string& dbname,
DBImpl::DBImpl(const Options& options, const std::string& dbname)
: env_(options.env),
dbname_(dbname),
internal_comparator_(options.comparator),
internal_filter_policy_(options.filter_policy),
options_(SanitizeOptions(
dbname, &internal_comparator_, &internal_filter_policy_, options)),
internal_filter_policy_(options.filter_policy),
owns_info_log_(options_.info_log != options.info_log),
owns_cache_(options_.block_cache != options.block_cache),
dbname_(dbname),
db_lock_(NULL),
shutting_down_(NULL),
bg_cv_(&mutex_),
@ -231,7 +232,7 @@ DBImpl::~DBImpl() {
if (flush_on_destroy_) {
FlushMemTable(FlushOptions());
}
mutex_.Lock();
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-NULL value is ok
while (bg_compaction_scheduled_ || bg_logstats_scheduled_) {
bg_cv_.Wait();
@ -430,7 +431,8 @@ void DBImpl::DeleteObsoleteFiles() {
EvictObsoleteFiles(deletion_state);
}
Status DBImpl::Recover(VersionEdit* edit) {
Status DBImpl::Recover(VersionEdit* edit, bool no_log_recory,
bool error_if_log_file_exist) {
mutex_.AssertHeld();
// Ignore error from CreateDir since the creation of the DB is
@ -489,6 +491,16 @@ Status DBImpl::Recover(VersionEdit* edit) {
}
}
if (logs.size() > 0 && error_if_log_file_exist) {
return Status::Corruption(""
"The db was opened in readonly mode with error_if_log_file_exist"
"flag but a log file already exists");
}
if (no_log_recory) {
return s;
}
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
@ -1310,6 +1322,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
RecordTick(options_.statistics, COMPACTION_KEY_DROP_NEWER_ENTRY);
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
@ -1321,6 +1334,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
RecordTick(options_.statistics, COMPACTION_KEY_DROP_OBSOLETE);
} else if (options_.CompactionFilter != NULL &&
ikey.type != kTypeDeletion &&
ikey.sequence < compact->smallest_snapshot) {
@ -1328,9 +1342,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// it. If this key is not visible via any snapshot and the
// return value of the compaction filter is true and then
// drop this key from the output.
drop = options_.CompactionFilter(compact->compaction->level(),
drop = options_.CompactionFilter(options_.compaction_filter_args,
compact->compaction->level(),
ikey.user_key, value, &compaction_filter_value);
if (drop) {
RecordTick(options_.statistics, COMPACTION_KEY_DROP_USER);
}
// If the application wants to change the value, then do so here.
if (compaction_filter_value != NULL) {
value = *compaction_filter_value;

View File

@ -78,6 +78,18 @@ class DBImpl : public DB {
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes();
protected:
Env* const env_;
const std::string dbname_;
VersionSet* versions_;
const InternalKeyComparator internal_comparator_;
const Options options_; // options_.comparator == &internal_comparator_
const Comparator* user_comparator() const {
return internal_comparator_.user_comparator();
}
private:
friend class DB;
struct CompactionState;
@ -92,7 +104,9 @@ class DBImpl : public DB {
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit);
Status Recover(VersionEdit* edit,
bool no_log_recory = false,
bool error_if_log_file_exist = false);
void MaybeIgnoreError(Status* s) const;
@ -156,13 +170,9 @@ class DBImpl : public DB {
void EvictObsoleteFiles(DeletionState& deletion_state);
// Constant after construction
Env* const env_;
const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
const Options options_; // options_.comparator == &internal_comparator_
bool owns_info_log_;
bool owns_cache_;
const std::string dbname_;
// table_cache_ provides its own synchronization
TableCache* table_cache_;
@ -209,8 +219,6 @@ class DBImpl : public DB {
};
ManualCompaction* manual_compaction_;
VersionSet* versions_;
// Have we encountered a background error in paranoid mode?
Status bg_error_;
@ -290,10 +298,6 @@ class DBImpl : public DB {
DBImpl(const DBImpl&);
void operator=(const DBImpl&);
const Comparator* user_comparator() const {
return internal_comparator_.user_comparator();
}
// dump the delayed_writes_ to the log file and reset counter.
void DelayLoggingAndReset();
};

92
db/db_impl_readonly.cc Normal file
View File

@ -0,0 +1,92 @@
// Copyright (c) 2012 Facebook. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "db/db_impl_readonly.h"
#include "db/db_impl.h"
#include <algorithm>
#include <set>
#include <string>
#include <stdint.h>
#include <stdio.h>
#include <vector>
#include <algorithm>
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/status.h"
#include "leveldb/table.h"
#include "leveldb/table_builder.h"
#include "port/port.h"
#include "table/block.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/build_version.h"
namespace leveldb {
DBImplReadOnly::DBImplReadOnly(const Options& options,
const std::string& dbname)
: DBImpl(options, dbname) {
Log(options_.info_log, "Opening the db in read only mode");
}
DBImplReadOnly::~DBImplReadOnly() {
}
// Implementations of the DB interface
Status DBImplReadOnly::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
Version* current = versions_->current();
SequenceNumber snapshot = versions_->LastSequence();
LookupKey lkey(key, snapshot);
Version::GetStats stats;
s = current->Get(options, lkey, value, &stats);
return s;
}
Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options) {
std::vector<Iterator*> list;
versions_->current()->AddIterators(options, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
return NewDBIterator(
&dbname_, env_, user_comparator(), internal_iter,
reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_);
}
Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
DB** dbptr, bool no_log_recory, bool error_if_log_file_exist) {
*dbptr = NULL;
DBImplReadOnly* impl = new DBImplReadOnly(options, dbname);
impl->mutex_.Lock();
VersionEdit edit(impl->NumberLevels());
Status s = impl->Recover(&edit, no_log_recory, error_if_log_file_exist);
if (s.ok() && !no_log_recory) {
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
impl->mutex_.Unlock();
if (s.ok()) {
*dbptr = impl;
} else {
delete impl;
}
return s;
}
}

72
db/db_impl_readonly.h Normal file
View File

@ -0,0 +1,72 @@
// Copyright (c) 2012 Facebook. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_READONLY_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_READONLY_H_
#include "db/db_impl.h"
#include <deque>
#include <set>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "port/port.h"
#include "util/stats_logger.h"
#ifdef USE_SCRIBE
#include "scribe/scribe_logger.h"
#endif
namespace leveldb {
class DBImplReadOnly : public DBImpl {
public:
DBImplReadOnly(const Options& options, const std::string& dbname);
virtual ~DBImplReadOnly();
// Implementations of the DB interface
virtual Status Get(const ReadOptions& options,
const Slice& key,
std::string* value);
virtual Iterator* NewIterator(const ReadOptions&);
virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Delete(const WriteOptions&, const Slice& key) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Write(const WriteOptions& options, WriteBatch* updates) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual void CompactRange(const Slice* begin, const Slice* end) {
}
virtual Status DisableFileDeletions() {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status EnableFileDeletions() {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Flush(const FlushOptions& options) {
return Status::NotSupported("Not supported operation in read only mode.");
}
private:
friend class DB;
// No copying allowed
DBImplReadOnly(const DBImplReadOnly&);
void operator=(const DBImplReadOnly&);
};
}
#endif

View File

@ -2,16 +2,18 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <cassert>
#include <stdlib.h>
#include <vector>
#include "leveldb/statistics.h"
#include "port/port.h"
#include "util/mutexlock.h"
namespace leveldb {
class DBStatistics: public Statistics {
public:
DBStatistics() { }
DBStatistics() : allTickers_(TICKER_ENUM_MAX) { }
void incNumFileOpens() {
MutexLock l(&mu_);
@ -28,8 +30,19 @@ class DBStatistics: public Statistics {
numFileErrors_++;
}
long getTickerCount(Tickers tickerType) {
assert(tickerType < TICKER_ENUM_MAX);
return allTickers_[tickerType].getCount();
}
void recordTick(Tickers tickerType) {
assert(tickerType < TICKER_ENUM_MAX);
allTickers_[tickerType].recordTick();
}
private:
port::Mutex mu_;
port::Mutex mu_;
std::vector<Ticker> allTickers_;
};
}

View File

@ -1198,18 +1198,21 @@ TEST(DBTest, RepeatedWritesToSameKey) {
// kvs during the compaction process.
static int cfilter_count;
static std::string NEW_VALUE = "NewValue";
static bool keep_filter(int level, const Slice& key,
static bool keep_filter(void* arg, int level, const Slice& key,
const Slice& value, Slice** new_value) {
assert(arg == NULL);
cfilter_count++;
return false;
}
static bool delete_filter(int level, const Slice& key,
static bool delete_filter(void*argv, int level, const Slice& key,
const Slice& value, Slice** new_value) {
assert(arg == NULL);
cfilter_count++;
return true;
}
static bool change_filter(int level, const Slice& key,
static bool change_filter(void*argv, int level, const Slice& key,
const Slice& value, Slice** new_value) {
assert(argv == (void*)100);
assert(new_value != NULL);
*new_value = new Slice(NEW_VALUE);
return false;
@ -1320,6 +1323,7 @@ TEST(DBTest, CompactionFilterWithValueChange) {
Options options = CurrentOptions();
options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.compaction_filter_args = (void *)100;
options.CompactionFilter = change_filter;
Reopen(&options);

View File

@ -22,6 +22,10 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
Version* current_version = current_;
int current_levels = NumberLevels();
if (current_levels <= new_levels) {
return Status::OK();
}
// Make sure there are file only on one level from
// (new_levels-1) to (current_levels-1)
int first_nonempty_level = -1;

View File

@ -33,7 +33,9 @@ THRIFT_INCLUDE+=" -I./thrift -I./thrift/gen-cpp -I./thrift/lib/cpp"
THRIFT_LIBS=" -L $TOOLCHAIN_LIB_BASE/boost/boost-1.48.0/bef9365/lib"
# use Intel SSE support for checksum calculations
export USE_SSE=" -msse -msse4.2 "
if test -z "$USE_SSE"; then
export USE_SSE=" -msse -msse4.2 "
fi
CC="$TOOLCHAIN_EXECUTABLES/gcc/gcc-4.7.1-glibc-2.14.1/bin/gcc"
CXX="$TOOLCHAIN_EXECUTABLES/gcc/gcc-4.7.1-glibc-2.14.1/bin/g++ $JINCLUDE $SNAPPY_INCLUDE $THRIFT_INCLUDE"

View File

@ -54,6 +54,14 @@ class DB {
const std::string& name,
DB** dbptr);
// Open the database for read only. All DB interfaces
// that modify data, like put/delete, will return error.
// If the db is opened in read only mode, then no compactions
// will happen.
static Status OpenForReadOnly(const Options& options,
const std::string& name, DB** dbptr,
bool no_log_recory = true, bool error_if_log_file_exist = false);
DB() { }
virtual ~DB();

View File

@ -228,7 +228,7 @@ struct Options {
// by default 'max_bytes_for_level_base' is 10MB.
int max_bytes_for_level_base;
uint64_t max_bytes_for_level_base;
// by default 'max_bytes_for_level_base' is 10.
int max_bytes_for_level_multiplier;
@ -324,7 +324,11 @@ struct Options {
// should allocate memory for the Slice object that is used to
// return the new value and the leveldb framework will
// free up that memory.
bool (*CompactionFilter)(int level, const Slice& key,
// The compaction_filter_args, if specified here, are passed
// back to the invocation of the CompactionFilter.
void* compaction_filter_args;
bool (*CompactionFilter)(void* compaction_filter_args,
int level, const Slice& key,
const Slice& existing_value, Slice** new_value);
};

View File

@ -7,9 +7,51 @@
namespace leveldb {
/**
* Keep adding ticker's here.
* Any ticker should have a value less than TICKER_ENUM_MAX.
* Add a new ticker by assigning it the current value of TICKER_ENUM_MAX
* And incrementing TICKER_ENUM_MAX.
*/
enum Tickers {
BLOCK_CACHE_MISS = 0,
BLOCK_CACHE_HIT = 1,
BLOOM_FILTER_USEFUL = 2, // no. of times bloom filter has avoided file reads.
/**
* COMPACTION_KEY_DROP_* count the reasons for key drop during compaction
* There are 3 reasons currently.
*/
COMPACTION_KEY_DROP_NEWER_ENTRY = 3, // key was written with a newer value.
COMPACTION_KEY_DROP_OBSOLETE = 4, // The key is obsolete.
COMPACTION_KEY_DROP_USER = 5, // user compaction function has dropped the key.
TICKER_ENUM_MAX = 6,
};
/**
* A dumb ticker which keeps incrementing through its life time.
* Not thread safe. Locking is currently managed by external leveldb lock
*/
class Ticker {
public:
Ticker() : count_(0) { }
inline void recordTick() {
count_++;
}
inline uint64_t getCount() {
return count_;
}
private:
uint64_t count_;
};
// Analyze the performance of a db
class Statistics {
public:
public:
// Create an Statistics object with default values for all fields.
Statistics() : numFileOpens_(0), numFileCloses_(0),
numFileErrors_(0) {}
@ -23,12 +65,21 @@ class Statistics {
virtual long getNumFileErrors() { return numFileErrors_;}
virtual ~Statistics() {}
virtual long getTickerCount(Tickers tickerType) = 0;
virtual void recordTick(Tickers tickerType) = 0;
protected:
long numFileOpens_;
long numFileCloses_;
long numFileErrors_;
};
// Ease of Use functions
inline void RecordTick(Statistics* const statistics, Tickers ticker) {
if (statistics != NULL) {
statistics->recordTick(ticker);
}
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_STATISTICS_H_

View File

@ -9,6 +9,7 @@
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/options.h"
#include "leveldb/statistics.h"
#include "table/block.h"
#include "table/filter_block.h"
#include "table/format.h"
@ -157,6 +158,7 @@ Iterator* Table::BlockReader(void* arg,
bool* didIO) {
Table* table = reinterpret_cast<Table*>(arg);
Cache* block_cache = table->rep_->options.block_cache;
Statistics* const statistics = table->rep_->options.statistics;
Block* block = NULL;
Cache::Handle* cache_handle = NULL;
@ -176,6 +178,8 @@ Iterator* Table::BlockReader(void* arg,
cache_handle = block_cache->Lookup(key);
if (cache_handle != NULL) {
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
RecordTick(statistics, BLOCK_CACHE_HIT);
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) {
@ -188,6 +192,8 @@ Iterator* Table::BlockReader(void* arg,
if (didIO != NULL) {
*didIO = true; // we did some io from storage
}
RecordTick(statistics, BLOCK_CACHE_MISS);
}
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
@ -237,6 +243,7 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k,
handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
RecordTick(rep_->options.statistics, BLOOM_FILTER_USEFUL);
} else {
bool didIO = false;
Iterator* block_iter = BlockReader(this, options, iiter->value(),

View File

@ -62,6 +62,9 @@ static long FLAGS_cache_size = 2 * KB * KB * KB;
// Number of bytes in a block.
static int FLAGS_block_size = 4 * KB;
// Number of times database reopens
static int FLAGS_reopen = 10;
// Maximum number of files to keep open at the same time (use default if == 0)
static int FLAGS_open_files = 0;
@ -97,7 +100,7 @@ static int FLAGS_target_file_size_base = 64 * KB;
static int FLAGS_target_file_size_multiplier = 1;
// Max bytes for level-0
static int FLAGS_max_bytes_for_level_base = 256 * KB;
static uint64_t FLAGS_max_bytes_for_level_base = 256 * KB;
// A multiplier to compute max bytes for level-N
static int FLAGS_max_bytes_for_level_multiplier = 2;
@ -108,8 +111,11 @@ static int FLAGS_level0_stop_writes_trigger = 12;
// Number of files in level-0 that will slow down writes.
static int FLAGS_level0_slowdown_writes_trigger = 8;
// Ratio of reads to writes (expressed as a percentage)
static unsigned int FLAGS_readwritepercent = 10;
// Ratio of reads to total workload (expressed as a percentage)
static unsigned int FLAGS_readpercent = 10;
// Ratio of deletes to total workload (expressed as a percentage)
static unsigned int FLAGS_delpercent = 30;
// Option to disable compation triggered by read.
static int FLAGS_disable_seek_compaction = false;
@ -148,6 +154,7 @@ class Stats {
double seconds_;
long done_;
long writes_;
long deletes_;
int next_report_;
size_t bytes_;
double last_op_finish_;
@ -161,6 +168,7 @@ class Stats {
hist_.Clear();
done_ = 0;
writes_ = 0;
deletes_ = 0;
bytes_ = 0;
seconds_ = 0;
start_ = FLAGS_env->NowMicros();
@ -172,6 +180,7 @@ class Stats {
hist_.Merge(other.hist_);
done_ += other.done_;
writes_ += other.writes_;
deletes_ += other.deletes_;
bytes_ += other.bytes_;
seconds_ += other.seconds_;
if (other.start_ < start_) start_ = other.start_;
@ -214,6 +223,10 @@ class Stats {
bytes_ += n;
}
void AddOneDelete() {
deletes_ ++;
}
void Report(const char* name) {
std::string extra;
if (bytes_ < 1 || done_ < 1) {
@ -231,6 +244,7 @@ class Stats {
seconds_ * 1e6 / done_, (long)throughput);
fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n",
"", bytes_mb, rate, (100*writes_)/done_, done_);
fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_);
if (FLAGS_histogram) {
fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
@ -252,6 +266,7 @@ class SharedState {
num_threads_(FLAGS_threads),
num_initialized_(0),
num_populated_(0),
vote_reopen_(0),
num_done_(0),
start_(false),
start_verify_(false),
@ -297,7 +312,7 @@ class SharedState {
num_initialized_++;
}
void IncPopulated() {
void IncOperated() {
num_populated_++;
}
@ -305,11 +320,15 @@ class SharedState {
num_done_++;
}
void IncVotedReopen() {
vote_reopen_ = (vote_reopen_ + 1) % num_threads_;
}
bool AllInitialized() const {
return num_initialized_ >= num_threads_;
}
bool AllPopulated() const {
bool AllOperated() const {
return num_populated_ >= num_threads_;
}
@ -317,6 +336,10 @@ class SharedState {
return num_done_ >= num_threads_;
}
bool AllVotedReopen() {
return (vote_reopen_ == 0);
}
void SetStart() {
start_ = true;
}
@ -345,6 +368,10 @@ class SharedState {
return values_[key];
}
void Delete(long key) const {
values_[key] = SENTINEL;
}
uint32_t GetSeed() const {
return seed_;
}
@ -358,6 +385,7 @@ class SharedState {
const int num_threads_;
long num_initialized_;
long num_populated_;
long vote_reopen_;
long num_done_;
bool start_;
bool start_verify_;
@ -420,8 +448,8 @@ class StressTest {
FLAGS_env->StartThread(ThreadBody, threads[i]);
}
// Each thread goes through the following states:
// initializing -> wait for others to init -> populate
// wait for others to populate -> verify -> done
// initializing -> wait for others to init -> read/populate/depopulate
// wait for others to operate -> verify -> done
{
MutexLock l(shared.GetMutex());
@ -429,10 +457,11 @@ class StressTest {
shared.GetCondVar()->Wait();
}
fprintf(stdout, "Starting to populate db\n");
fprintf(stdout, "Starting database operations\n");
shared.SetStart();
shared.GetCondVar()->SignalAll();
while (!shared.AllPopulated()) {
while (!shared.AllOperated()) {
shared.GetCondVar()->Wait();
}
@ -453,7 +482,7 @@ class StressTest {
delete threads[i];
threads[i] = NULL;
}
fprintf(stdout, "Verification successfull\n");
fprintf(stdout, "Verification successful\n");
PrintStatistics();
}
@ -473,13 +502,12 @@ class StressTest {
shared->GetCondVar()->Wait();
}
}
thread->shared->GetStressTest()->PopulateDb(thread);
thread->shared->GetStressTest()->OperateDb(thread);
{
MutexLock l(shared->GetMutex());
shared->IncPopulated();
if (shared->AllPopulated()) {
shared->IncOperated();
if (shared->AllOperated()) {
shared->GetCondVar()->SignalAll();
}
while (!shared->VerifyStarted()) {
@ -499,10 +527,10 @@ class StressTest {
}
void PopulateDb(ThreadState* thread) {
void OperateDb(ThreadState* thread) {
ReadOptions read_opts(FLAGS_verify_checksum, true);
WriteOptions write_opts;
char value[100], prev_value[100];
char value[100];
long max_key = thread->shared->GetMaxKey();
std::string from_db;
if (FLAGS_sync) {
@ -511,21 +539,44 @@ class StressTest {
write_opts.disableWAL = FLAGS_disable_wal;
thread->stats.Start();
for (long i=0; i < FLAGS_ops_per_thread; i++) {
for (long i = 0; i < FLAGS_ops_per_thread; i++) {
if(i != 0 && (i % (FLAGS_ops_per_thread / (FLAGS_reopen + 1))) == 0) {
{
MutexLock l(thread->shared->GetMutex());
thread->shared->IncVotedReopen();
if (thread->shared->AllVotedReopen()) {
thread->shared->GetStressTest()->Reopen();
thread->shared->GetCondVar()->SignalAll();
}
else {
thread->shared->GetCondVar()->Wait();
}
}
}
long rand_key = thread->rand.Next() % max_key;
Slice key((char*)&rand_key, sizeof(rand_key));
if (FLAGS_readwritepercent > thread->rand.Uniform(100)) {
// introduce some read load.
//Read:10%;Delete:30%;Write:60%
unsigned int probability_operation = thread->rand.Uniform(100);
if (probability_operation < FLAGS_readpercent) {
// read load
db_->Get(read_opts, key, &from_db);
} else if (probability_operation < FLAGS_delpercent + FLAGS_readpercent) {
//introduce delete load
{
MutexLock l(thread->shared->GetMutexForKey(rand_key));
thread->shared->Delete(rand_key);
db_->Delete(write_opts, key);
}
thread->stats.AddOneDelete();
} else {
// write load
uint32_t value_base = thread->rand.Next();
size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz);
{
MutexLock l(thread->shared->GetMutexForKey(rand_key));
if (FLAGS_verify_before_write) {
VerifyValue(rand_key, read_opts, *(thread->shared), prev_value,
sizeof(prev_value), &from_db, true);
VerifyValue(rand_key, read_opts, *(thread->shared), &from_db, true);
}
thread->shared->Put(rand_key, value_base);
db_->Put(write_opts, key, v);
@ -540,12 +591,11 @@ class StressTest {
void VerifyDb(const SharedState &shared, long start) const {
ReadOptions options(FLAGS_verify_checksum, true);
char value[100];
long max_key = shared.GetMaxKey();
long step = shared.GetNumThreads();
for (long i = start; i < max_key; i+= step) {
std::string from_db;
VerifyValue(i, options, shared, value, sizeof(value), &from_db);
VerifyValue(i, options, shared, &from_db, true);
if (from_db.length()) {
PrintKeyValue(i, from_db.data(), from_db.length());
}
@ -553,15 +603,16 @@ class StressTest {
}
void VerificationAbort(std::string msg, long key) const {
fprintf(stderr, "Verification failed for key %ld: %s\n",
fprintf(stderr, "Verification failed for key %ld: %s\n",
key, msg.c_str());
exit(1);
}
void VerifyValue(long key, const ReadOptions &opts, const SharedState &shared,
char *value, size_t value_sz,
std::string *value_from_db, bool strict=false) const {
Slice k((char*)&key, sizeof(key));
char value[100];
size_t value_sz = 0;
uint32_t value_base = shared.Get(key);
if (value_base == SharedState::SENTINEL && !strict) {
return;
@ -609,8 +660,10 @@ class StressTest {
kMajorVersion, kMinorVersion);
fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
fprintf(stdout, "Ops per thread : %d\n", FLAGS_ops_per_thread);
fprintf(stdout, "Read percentage : %d\n", FLAGS_readwritepercent);
fprintf(stdout, "Read percentage : %d\n", FLAGS_readpercent);
fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent);
fprintf(stdout, "Max key : %ld\n", FLAGS_max_key);
fprintf(stdout, "Num times DB reopens: %d\n", FLAGS_reopen);
fprintf(stdout, "Num keys per lock : %d\n",
1 << FLAGS_log2_keys_per_lock);
@ -668,6 +721,11 @@ class StressTest {
}
}
void Reopen() {
delete db_;
Open();
}
void PrintStatistics() {
if (dbstats) {
fprintf(stdout, "File opened:%ld closed:%ld errors:%ld\n",
@ -733,6 +791,8 @@ int main(int argc, char** argv) {
FLAGS_cache_size = l;
} else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) {
FLAGS_block_size = n;
} else if (sscanf(argv[i], "--reopen=%d%c", &n, &junk) == 1 && n >= 0) {
FLAGS_reopen = n;
} else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
FLAGS_bloom_bits = n;
} else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
@ -759,9 +819,12 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--sync=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_sync = n;
} else if (sscanf(argv[i], "--readwritepercent=%d%c", &n, &junk) == 1 &&
(n > 0 || n < 100)) {
FLAGS_readwritepercent = n;
} else if (sscanf(argv[i], "--readpercent=%d%c", &n, &junk) == 1 &&
(n >= 0 && n <= 100)) {
FLAGS_readpercent = n;
} else if (sscanf(argv[i], "--delpercent=%d%c", &n, &junk) == 1 &&
(n >= 0 && n <= 100)) {
FLAGS_delpercent = n;
} else if (sscanf(argv[i], "--disable_data_sync=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_disable_data_sync = n;
@ -780,8 +843,8 @@ int main(int argc, char** argv) {
&n, &junk) == 1) {
FLAGS_target_file_size_multiplier = n;
} else if (
sscanf(argv[i], "--max_bytes_for_level_base=%d%c", &n, &junk) == 1) {
FLAGS_max_bytes_for_level_base = n;
sscanf(argv[i], "--max_bytes_for_level_base=%ld%c", &l, &junk) == 1) {
FLAGS_max_bytes_for_level_base = l;
} else if (sscanf(argv[i], "--max_bytes_for_level_multiplier=%d%c",
&n, &junk) == 1) {
FLAGS_max_bytes_for_level_multiplier = n;
@ -816,6 +879,11 @@ int main(int argc, char** argv) {
}
}
if ((FLAGS_readpercent + FLAGS_delpercent) > 100) {
fprintf(stderr, "Error: Read + Delete percents > 100!\n");
exit(1);
}
// Choose a location for the test database if none given with --db=<path>
if (FLAGS_db == NULL) {
leveldb::Env::Default()->GetTestDirectory(&default_db_path);

View File

@ -118,21 +118,16 @@ TEST(ReduceLevelTest, Top_Level) {
ASSERT_EQ(FilesOnLevel(0), 1);
CloseDB();
// The CompactRange(NULL, NULL) call in ReduceLevels
// will push this file to level-1
ASSERT_TRUE(ReduceLevels(4));
ASSERT_OK(OpenDB(true, 4, 0));
ASSERT_EQ(FilesOnLevel(1), 1);
CloseDB();
ASSERT_TRUE(ReduceLevels(3));
ASSERT_OK(OpenDB(true, 3, 0));
ASSERT_EQ(FilesOnLevel(1), 1);
CloseDB();
ASSERT_TRUE(ReduceLevels(2));
ASSERT_OK(OpenDB(true, 2, 0));
ASSERT_EQ(FilesOnLevel(1), 1);
CloseDB();
}

View File

@ -182,18 +182,39 @@ void DBDumper::DoCommand() {
const char* ReduceDBLevels::NEW_LEVLES_ARG = "--new_levels=";
const char* ReduceDBLevels::PRINT_OLD_LEVELS_ARG = "--print_old_levels";
const char* ReduceDBLevels::COMPRESSION_TYPE_ARG = "--compression=";
const char* ReduceDBLevels::FILE_SIZE_ARG = "--file_size=";
ReduceDBLevels::ReduceDBLevels(std::string& db_name,
std::vector<std::string>& args)
: LDBCommand(db_name, args),
old_levels_(1 << 16),
new_levels_(-1),
print_old_levels_(false) {
file_size_ = leveldb::Options().target_file_size_base;
compression_ = leveldb::Options().compression;
for (unsigned int i = 0; i < args.size(); i++) {
std::string& arg = args.at(i);
if (arg.find(NEW_LEVLES_ARG) == 0) {
new_levels_ = atoi(arg.substr(strlen(NEW_LEVLES_ARG)).c_str());
} else if (arg.find(PRINT_OLD_LEVELS_ARG) == 0) {
print_old_levels_ = true;
} else if (arg.find(COMPRESSION_TYPE_ARG) == 0) {
const char* type = arg.substr(strlen(COMPRESSION_TYPE_ARG)).c_str();
if (!strcasecmp(type, "none"))
compression_ = leveldb::kNoCompression;
else if (!strcasecmp(type, "snappy"))
compression_ = leveldb::kSnappyCompression;
else if (!strcasecmp(type, "zlib"))
compression_ = leveldb::kZlibCompression;
else if (!strcasecmp(type, "bzip2"))
compression_ = leveldb::kBZip2Compression;
else
exec_state_ = LDBCommandExecuteResult::FAILED(
"Invalid compression arg : " + arg);
} else if (arg.find(FILE_SIZE_ARG) == 0) {
file_size_ = atoi(arg.substr(strlen(FILE_SIZE_ARG)).c_str());
} else {
exec_state_ = LDBCommandExecuteResult::FAILED(
"Unknown argument." + arg);
@ -223,15 +244,45 @@ void ReduceDBLevels::Help(std::string& msg) {
LDBCommand::Help(msg);
msg.append("[--new_levels=New number of levels] ");
msg.append("[--print_old_levels] ");
msg.append("[--compression=none|snappy|zlib|bzip2] ");
msg.append("[--file_size= per-file size] ");
}
leveldb::Options ReduceDBLevels::PrepareOptionsForOpenDB() {
leveldb::Options opt = LDBCommand::PrepareOptionsForOpenDB();
// Set to a big value to make sure we can open the db
opt.num_levels = 1 << 16;
opt.num_levels = old_levels_;
// Disable size compaction
opt.max_bytes_for_level_base = 1UL << 50;
opt.max_bytes_for_level_multiplier = 1;
opt.max_mem_compaction_level = 0;
return opt;
}
Status ReduceDBLevels::GetOldNumOfLevels(leveldb::Options& opt, int* levels) {
TableCache* tc = new TableCache(db_path_, &opt, 10);
const InternalKeyComparator* cmp = new InternalKeyComparator(
opt.comparator);
VersionSet* versions = new VersionSet(db_path_, &opt,
tc, cmp);
// We rely the VersionSet::Recover to tell us the internal data structures
// in the db. And the Recover() should never do any change
// (like LogAndApply) to the manifest file.
Status st = versions->Recover();
if (!st.ok()) {
return st;
}
int max = -1;
for (int i = 0; i < versions->NumberLevels(); i++) {
if (versions->NumLevelFiles(i)) {
max = i;
}
}
*levels = max + 1;
delete versions;
return st;
}
void ReduceDBLevels::DoCommand() {
if (new_levels_ <= 1) {
exec_state_ = LDBCommandExecuteResult::FAILED(
@ -240,34 +291,27 @@ void ReduceDBLevels::DoCommand() {
}
leveldb::Status st;
leveldb::Options opt = PrepareOptionsForOpenDB();
if (print_old_levels_) {
TableCache* tc = new TableCache(db_path_, &opt, 10);
const InternalKeyComparator* cmp = new InternalKeyComparator(
opt.comparator);
VersionSet* versions = new VersionSet(db_path_, &opt,
tc, cmp);
// We rely the VersionSet::Recover to tell us the internal data structures
// in the db. And the Recover() should never do any change
// (like LogAndApply) to the manifest file.
st = versions->Recover();
int max = -1;
for(int i = 0; i<versions->NumberLevels(); i++) {
if (versions->NumLevelFiles(i)) {
max = i;
}
}
fprintf(stdout, "The old number of levels in use is %d\n", max + 1);
delete versions;
if (!st.ok()) {
exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString());
return;
}
int old_level_num = -1;
st = GetOldNumOfLevels(opt, &old_level_num);
if (!st.ok()) {
exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString());
return;
}
if (print_old_levels_) {
fprintf(stdout, "The old number of levels in use is %d\n", old_level_num);
}
if (old_level_num <= new_levels_) {
return;
}
old_levels_ = old_level_num;
OpenDB();
// Compact the whole DB to put all files to the highest level.
fprintf(stdout, "Compacting the db...\n");
db_->CompactRange(NULL, NULL);
CloseDB();

View File

@ -103,6 +103,10 @@ public:
return opt;
}
virtual bool NoDBOpen() {
return false;
}
virtual ~LDBCommand() {
if (db_ != NULL) {
delete db_;
@ -121,14 +125,18 @@ public:
return;
}
if (db_ == NULL) {
if (db_ == NULL && !NoDBOpen()) {
OpenDB();
}
DoCommand();
if (exec_state_.IsNotStarted()) {
exec_state_ = LDBCommandExecuteResult::SUCCEED("");
}
CloseDB ();
if (db_ != NULL) {
CloseDB ();
}
}
virtual void DoCommand() = 0;
@ -230,17 +238,28 @@ public:
virtual leveldb::Options PrepareOptionsForOpenDB();
virtual void DoCommand();
static void Help(std::string& msg);
virtual bool NoDBOpen() {
return true;
}
static void Help(std::string& msg);
static std::vector<std::string> PrepareArgs(int new_levels,
bool print_old_level = false);
private:
int old_levels_;
int new_levels_;
bool print_old_levels_;
int file_size_;
enum leveldb::CompressionType compression_;
static const char* NEW_LEVLES_ARG;
static const char* PRINT_OLD_LEVELS_ARG;
static const char* COMPRESSION_TYPE_ARG;
static const char* FILE_SIZE_ARG;
Status GetOldNumOfLevels(leveldb::Options& opt, int* levels);
};
}

View File

@ -47,9 +47,10 @@ Options::Options()
delete_obsolete_files_period_micros(0),
max_background_compactions(1),
max_log_file_size(0),
rate_limit(0.0),
rate_limit(0.0),
no_block_cache(false),
table_cache_numshardbits(4),
compaction_filter_args(NULL),
CompactionFilter(NULL) {
}
@ -107,7 +108,7 @@ Options::Dump(
target_file_size_base);
Log(log," Options.target_file_size_multiplier: %d",
target_file_size_multiplier);
Log(log," Options.max_bytes_for_level_base: %d",
Log(log," Options.max_bytes_for_level_base: %ld",
max_bytes_for_level_base);
Log(log," Options.max_bytes_for_level_multiplier: %d",
max_bytes_for_level_multiplier);
@ -129,6 +130,8 @@ Options::Dump(
max_background_compactions);
Log(log," Options.rate_limit: %.2f",
rate_limit);
Log(log," Options.compaction_filter_args: %p",
compaction_filter_args);
Log(log," Options.CompactionFilter: %p",
CompactionFilter);
} // Options::Dump