Merge branch 'master' into performance

Conflicts:
	db/db_bench.cc
	util/options.cc
This commit is contained in:
Dhruba Borthakur 2012-10-29 14:18:00 -07:00
commit 53e04311b1
15 changed files with 396 additions and 44 deletions

View file

@ -120,12 +120,12 @@ esac
# prune take effect. # prune take effect.
DIRS="util db table" DIRS="util db table"
if test "$USE_THRIFT"; then if test "$USE_THRIFT"; then
DIRS+=" thrift/server_utils.cpp thrift/gen-cpp " DIRS="$DIRS thrift/server_utils.cpp thrift/gen-cpp "
THRIFTSERVER=leveldb_server THRIFTSERVER=leveldb_server
fi fi
if test "$USE_SCRIBE"; then if test "$USE_SCRIBE"; then
DIRS+=" scribe " DIRS="$DIRS scribe "
fi fi
set -f # temporarily disable globbing so that our patterns aren't expanded set -f # temporarily disable globbing so that our patterns aren't expanded
@ -204,24 +204,24 @@ if test "$USE_HDFS"; then
echo "JAVA_HOME has to be set for HDFS usage." echo "JAVA_HOME has to be set for HDFS usage."
exit 1 exit 1
fi fi
HDFS_CCFLAGS+=" -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS" HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS"
HDFS_LDFLAGS+=" -Wl,--no-whole-archive hdfs/libhdfs.a -L$JAVA_HOME/jre/lib/amd64" HDFS_LDFLAGS="$HDFS_LDFLAGS -Wl,--no-whole-archive hdfs/libhdfs.a -L$JAVA_HOME/jre/lib/amd64"
HDFS_LDFLAGS+=" -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib" HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib"
HDFS_LDFLAGS+=" -ldl -lverify -ljava -ljvm" HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm"
COMMON_FLAGS+=$HDFS_CCFLAGS COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS"
PLATFORM_LDFLAGS+=$HDFS_LDFLAGS PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS $HDFS_LDFLAGS"
fi fi
# shall we build thrift server or scribe logger # shall we build thrift server or scribe logger
if test "$USE_THRIFT" || test "$USE_SCRIBE" ; then if test "$USE_THRIFT" || test "$USE_SCRIBE" ; then
THRIFT_CCFLAGS=" -I./thrift -I./thrift/gen-cpp -I./thrift/lib/cpp -I/usr/include -std=gnu++0x" THRIFT_CCFLAGS=" -I./thrift -I./thrift/gen-cpp -I./thrift/lib/cpp -I/usr/include -std=gnu++0x"
THRIFT_LDFLAGS=" -lexample -lserver -lthrift_base -ltransport -lthrift_exception -lutil -L./thrift/libs " THRIFT_LDFLAGS=" -lexample -lserver -lthrift_base -ltransport -lthrift_exception -lutil -L./thrift/libs "
COMMON_FLAGS+=$THRIFT_CCFLAGS COMMON_FLAGS="$COMMON_FLAGS $THRIFT_CCFLAGS"
PLATFORM_LDFLAGS+=$THRIFT_LDFLAGS PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS $THRIFT_LDFLAGS"
fi fi
# if Intel SSE instruction set is supported, set USE_SSE=" -msse -msse4.2 " # if Intel SSE instruction set is supported, set USE_SSE=" -msse -msse4.2 "
COMMON_FLAGS+=$USE_SSE COMMON_FLAGS="$COMMON_FLAGS $USE_SSE"
PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS" PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS"
PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS" PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS"

View file

@ -32,7 +32,7 @@ Status BuildTable(const std::string& dbname,
return s; return s;
} }
TableBuilder* builder = new TableBuilder(options, file); TableBuilder* builder = new TableBuilder(options, file, 0);
meta->smallest.DecodeFrom(iter->key()); meta->smallest.DecodeFrom(iter->key());
for (; iter->Valid(); iter->Next()) { for (; iter->Valid(); iter->Next()) {
Slice key = iter->key(); Slice key = iter->key();

View file

@ -189,6 +189,10 @@ static uint64_t FLAGS_delete_obsolete_files_period_micros = 0;
static enum leveldb::CompressionType FLAGS_compression_type = static enum leveldb::CompressionType FLAGS_compression_type =
leveldb::kSnappyCompression; leveldb::kSnappyCompression;
// Allows compression for levels 0 and 1 to be disabled when
// other levels are compressed
static int FLAGS_min_level_to_compress = -1;
// posix or hdfs environment // posix or hdfs environment
static leveldb::Env* FLAGS_env = leveldb::Env::Default(); static leveldb::Env* FLAGS_env = leveldb::Env::Default();
@ -196,6 +200,15 @@ static leveldb::Env* FLAGS_env = leveldb::Env::Default();
// than zero. When 0 the interval grows over time. // than zero. When 0 the interval grows over time.
static int FLAGS_stats_interval = 0; static int FLAGS_stats_interval = 0;
// Reports additional stats per interval when this is greater
// than 0.
static int FLAGS_stats_per_interval = 0;
// When not equal to 0 this make threads sleep at each stats
// reporting interval until the compaction score for all levels is
// less than or equal to this value.
static double FLAGS_rate_limit = 0;
extern bool useOsBuffer; extern bool useOsBuffer;
extern bool useFsReadAhead; extern bool useFsReadAhead;
extern bool useMmapRead; extern bool useMmapRead;
@ -339,17 +352,21 @@ class Stats {
} else { } else {
double now = FLAGS_env->NowMicros(); double now = FLAGS_env->NowMicros();
fprintf(stderr, fprintf(stderr,
"%s thread %d: (%ld,%ld) ops (interval,total) in %.6f seconds and %.2f ops/sec\n", "%s ... thread %d: (%ld,%ld) ops and (%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(), FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(),
id_, id_,
done_ - last_report_done_, done_, done_ - last_report_done_, done_,
(now - last_report_finish_) / 1000000.0,
(done_ - last_report_done_) / (done_ - last_report_done_) /
((now - last_report_finish_) / 1000000.0)); ((now - last_report_finish_) / 1000000.0),
done_ / ((now - start_) / 1000000.0),
(now - last_report_finish_) / 1000000.0,
(now - start_) / 1000000.0);
std::string stats; if (FLAGS_stats_per_interval) {
if (db && db->GetProperty("leveldb.stats", &stats)) std::string stats;
fprintf(stderr, stats.c_str()); if (db && db->GetProperty("leveldb.stats", &stats))
fprintf(stderr, stats.c_str());
}
fflush(stderr); fflush(stderr);
next_report_ += FLAGS_stats_interval; next_report_ += FLAGS_stats_interval;
@ -913,14 +930,29 @@ class Benchmark {
options.level0_slowdown_writes_trigger = options.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger; FLAGS_level0_slowdown_writes_trigger;
options.compression = FLAGS_compression_type; options.compression = FLAGS_compression_type;
if (FLAGS_min_level_to_compress >= 0) {
assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
options.compression_per_level = new CompressionType[FLAGS_num_levels];
for (unsigned int i = 0; i < FLAGS_min_level_to_compress; i++) {
options.compression_per_level[i] = kNoCompression;
}
for (unsigned int i = FLAGS_min_level_to_compress;
i < FLAGS_num_levels; i++) {
options.compression_per_level[i] = FLAGS_compression_type;
}
}
options.disable_seek_compaction = FLAGS_disable_seek_compaction; options.disable_seek_compaction = FLAGS_disable_seek_compaction;
options.delete_obsolete_files_period_micros = options.delete_obsolete_files_period_micros =
FLAGS_delete_obsolete_files_period_micros; FLAGS_delete_obsolete_files_period_micros;
options.rate_limit = FLAGS_rate_limit;
Status s = DB::Open(options, FLAGS_db, &db_); Status s = DB::Open(options, FLAGS_db, &db_);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str()); fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1); exit(1);
} }
if (FLAGS_min_level_to_compress >= 0) {
delete options.compression_per_level;
}
} }
void WriteSeq(ThreadState* thread) { void WriteSeq(ThreadState* thread) {
@ -1327,6 +1359,9 @@ int main(int argc, char** argv) {
else { else {
fprintf(stdout, "Cannot parse %s\n", argv[i]); fprintf(stdout, "Cannot parse %s\n", argv[i]);
} }
} else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1
&& n >= 0) {
FLAGS_min_level_to_compress = n;
} else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1 } else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1
&& (n == 0 || n == 1)) { && (n == 0 || n == 1)) {
FLAGS_disable_seek_compaction = n; FLAGS_disable_seek_compaction = n;
@ -1336,6 +1371,12 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--stats_interval=%d%c", &n, &junk) == 1 && } else if (sscanf(argv[i], "--stats_interval=%d%c", &n, &junk) == 1 &&
n >= 0 && n < 2000000000) { n >= 0 && n < 2000000000) {
FLAGS_stats_interval = n; FLAGS_stats_interval = n;
} else if (sscanf(argv[i], "--stats_per_interval=%d%c", &n, &junk) == 1
&& (n == 0 || n == 1)) {
FLAGS_stats_per_interval = n;
} else if (sscanf(argv[i], "--rate_limit=%lf%c", &d, &junk) == 1 &&
d > 1.0) {
FLAGS_rate_limit = d;
} else { } else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]); fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1); exit(1);

View file

@ -35,11 +35,40 @@
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/build_version.h" #include "util/build_version.h"
#include "util/auto_split_logger.h"
namespace leveldb { namespace leveldb {
void dumpLeveldbBuildVersion(Logger * log); void dumpLeveldbBuildVersion(Logger * log);
static Status NewLogger(const std::string& dbname,
const std::string& db_log_dir,
Env* env,
size_t max_log_file_size,
Logger** logger) {
std::string db_absolute_path;
env->GetAbsolutePath(dbname, &db_absolute_path);
if (max_log_file_size > 0) { // need to auto split the log file?
AutoSplitLogger<Logger>* auto_split_logger =
new AutoSplitLogger<Logger>(env, dbname, db_log_dir, max_log_file_size);
Status s = auto_split_logger->GetStatus();
if (!s.ok()) {
delete auto_split_logger;
} else {
*logger = auto_split_logger;
}
return s;
} else {
// Open a log file in the same directory as the db
env->CreateDir(dbname); // In case it does not exist
std::string fname = InfoLogFileName(dbname, db_absolute_path, db_log_dir);
env->RenameFile(fname, OldInfoLogFileName(dbname, env->NowMicros(),
db_absolute_path, db_log_dir));
return env->NewLogger(fname, logger);
}
}
// Information kept for every waiting writer // Information kept for every waiting writer
struct DBImpl::Writer { struct DBImpl::Writer {
Status status; Status status;
@ -118,16 +147,9 @@ Options SanitizeOptions(const std::string& dbname,
ClipToRange(&result.max_open_files, 20, 50000); ClipToRange(&result.max_open_files, 20, 50000);
ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
ClipToRange(&result.block_size, 1<<10, 4<<20); ClipToRange(&result.block_size, 1<<10, 4<<20);
std::string db_absolute_path;
src.env->GetAbsolutePath(dbname, &db_absolute_path);
if (result.info_log == NULL) { if (result.info_log == NULL) {
// Open a log file in the same directory as the db Status s = NewLogger(dbname, result.db_log_dir, src.env,
src.env->CreateDir(dbname); // In case it does not exist result.max_log_file_size, &result.info_log);
src.env->RenameFile(InfoLogFileName(dbname, db_absolute_path,
result.db_log_dir), OldInfoLogFileName(dbname,src.env->NowMicros(),
db_absolute_path, result.db_log_dir));
Status s = src.env->NewLogger(InfoLogFileName(dbname, db_absolute_path,
result.db_log_dir), &result.info_log);
if (!s.ok()) { if (!s.ok()) {
// No place suitable for logging // No place suitable for logging
result.info_log = NULL; result.info_log = NULL;
@ -136,6 +158,12 @@ Options SanitizeOptions(const std::string& dbname,
if (result.block_cache == NULL) { if (result.block_cache == NULL) {
result.block_cache = NewLRUCache(8 << 20); result.block_cache = NewLRUCache(8 << 20);
} }
if (src.compression_per_level != NULL) {
result.compression_per_level = new CompressionType[src.num_levels];
for (unsigned int i = 0; i < src.num_levels; i++) {
result.compression_per_level[i] = src.compression_per_level[i];
}
}
return result; return result;
} }
@ -163,6 +191,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
disable_delete_obsolete_files_(false), disable_delete_obsolete_files_(false),
delete_obsolete_files_last_run_(0), delete_obsolete_files_last_run_(0),
stall_level0_slowdown_(0), stall_level0_slowdown_(0),
stall_leveln_slowdown_(0),
stall_memtable_compaction_(0), stall_memtable_compaction_(0),
stall_level0_num_files_(0), stall_level0_num_files_(0),
started_at_(options.env->NowMicros()), started_at_(options.env->NowMicros()),
@ -224,6 +253,9 @@ DBImpl::~DBImpl() {
if (owns_cache_) { if (owns_cache_) {
delete options_.block_cache; delete options_.block_cache;
} }
if (options_.compression_per_level != NULL) {
delete options_.compression_per_level;
}
delete logger_; delete logger_;
} }
@ -1097,7 +1129,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
std::string fname = TableFileName(dbname_, file_number); std::string fname = TableFileName(dbname_, file_number);
Status s = env_->NewWritableFile(fname, &compact->outfile); Status s = env_->NewWritableFile(fname, &compact->outfile);
if (s.ok()) { if (s.ok()) {
compact->builder = new TableBuilder(options_, compact->outfile); compact->builder = new TableBuilder(options_, compact->outfile,
compact->compaction->level() + 1);
} }
return s; return s;
} }
@ -1656,6 +1689,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
assert(!writers_.empty()); assert(!writers_.empty());
bool allow_delay = !force; bool allow_delay = !force;
Status s; Status s;
double score;
while (true) { while (true) {
if (!bg_error_.ok()) { if (!bg_error_.ok()) {
@ -1701,6 +1735,18 @@ Status DBImpl::MakeRoomForWrite(bool force) {
Log(options_.info_log, "wait for fewer level0 files...\n"); Log(options_.info_log, "wait for fewer level0 files...\n");
bg_cv_.Wait(); bg_cv_.Wait();
stall_level0_num_files_ += env_->NowMicros() - t1; stall_level0_num_files_ += env_->NowMicros() - t1;
} else if (
allow_delay &&
options_.rate_limit > 1.0 &&
(score = versions_->MaxCompactionScore()) > options_.rate_limit) {
// Delay a write when the compaction score for any level is too large.
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
stall_leveln_slowdown_ += 1000;
allow_delay = false; // Do not delay a single write more than once
Log(options_.info_log,
"delaying write for rate limits with max score %.2f\n", score);
mutex_.Lock();
} else { } else {
// Attempt to switch to a new memtable and trigger compaction of old // Attempt to switch to a new memtable and trigger compaction of old
DelayLoggingAndReset(); DelayLoggingAndReset();
@ -1789,8 +1835,9 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
stats_[level].bytes_readnp1 / 1048576.0, stats_[level].bytes_readnp1 / 1048576.0,
bytes_new / 1048576.0, bytes_new / 1048576.0,
amplify, amplify,
bytes_read / 1048576.0 / seconds_up, (bytes_read / 1048576.0) / (stats_[level].micros / 1000000.0),
stats_[level].bytes_written / 1048576.0 / seconds_up, (stats_[level].bytes_written / 1048576.0) /
(stats_[level].micros / 1000000.0),
stats_[level].files_in_leveln, stats_[level].files_in_leveln,
stats_[level].files_in_levelnp1, stats_[level].files_in_levelnp1,
stats_[level].files_out_levelnp1, stats_[level].files_out_levelnp1,
@ -1801,10 +1848,12 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
} }
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"Amplification: %.1f rate, %.2f GB in, %.2f GB out\n", "Amplification: %.1f rate, %.2f GB in, %.2f GB out, %.2f MB/sec in, %.2f MB/sec out\n",
(double) total_bytes / stats_[0].bytes_written, (double) total_bytes / stats_[0].bytes_written,
stats_[0].bytes_written / (1048576.0 * 1024), stats_[0].bytes_written / (1048576.0 * 1024),
total_bytes / (1048576.0 * 1024)); total_bytes / (1048576.0 * 1024),
stats_[0].bytes_written / 1048576.0 / seconds_up,
total_bytes / 1048576.0 / seconds_up);
value->append(buf); value->append(buf);
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f\n", seconds_up); snprintf(buf, sizeof(buf), "Uptime(secs): %.1f\n", seconds_up);
@ -1812,10 +1861,11 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, "
"%.3f memtable_compaction\n", "%.3f memtable_compaction, %.3f leveln_slowdown\n",
stall_level0_slowdown_ / 1000000.0, stall_level0_slowdown_ / 1000000.0,
stall_level0_num_files_ / 1000000.0, stall_level0_num_files_ / 1000000.0,
stall_memtable_compaction_ / 1000000.0); stall_memtable_compaction_ / 1000000.0,
stall_leveln_slowdown_ / 1000000.0);
value->append(buf); value->append(buf);
return true; return true;

View file

@ -228,6 +228,7 @@ class DBImpl : public DB {
uint64_t stall_level0_slowdown_; uint64_t stall_level0_slowdown_;
uint64_t stall_memtable_compaction_; uint64_t stall_memtable_compaction_;
uint64_t stall_level0_num_files_; uint64_t stall_level0_num_files_;
uint64_t stall_leveln_slowdown_;
// Time at which this instance was started. // Time at which this instance was started.
const uint64_t started_at_; const uint64_t started_at_;

View file

@ -20,6 +20,24 @@
namespace leveldb { namespace leveldb {
static bool SnappyCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(in.data(), in.size(), &out);
}
static bool ZlibCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Zlib_Compress(in.data(), in.size(), &out);
}
static bool BZip2CompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::BZip2_Compress(in.data(), in.size(), &out);
}
static std::string RandomString(Random* rnd, int len) { static std::string RandomString(Random* rnd, int len) {
std::string r; std::string r;
test::RandomString(rnd, len, &r); test::RandomString(rnd, len, &r);
@ -1058,6 +1076,80 @@ TEST(DBTest, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1), 1); ASSERT_EQ(NumTableFilesAtLevel(1), 1);
} }
void MinLevelHelper(DBTest* self, Options& options) {
Random rnd(301);
for (int num = 0;
num < options.level0_file_num_compaction_trigger - 1;
num++)
{
std::vector<std::string> values;
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(self->Put(Key(i), values[i]));
}
self->dbfull()->TEST_WaitForCompactMemTable();
ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1);
}
//generate one more file in level-0, and should trigger level-0 compaction
std::vector<std::string> values;
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(self->Put(Key(i), values[i]));
}
self->dbfull()->TEST_WaitForCompact();
ASSERT_EQ(self->NumTableFilesAtLevel(0), 0);
ASSERT_EQ(self->NumTableFilesAtLevel(1), 1);
}
TEST(DBTest, MinLevelToCompress) {
Options options = CurrentOptions();
options.write_buffer_size = 100<<10; //100KB
options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 3;
options.create_if_missing = true;
CompressionType type;
if (SnappyCompressionSupported()) {
type = kSnappyCompression;
fprintf(stderr, "using snappy\n");
} else if (ZlibCompressionSupported()) {
type = kZlibCompression;
fprintf(stderr, "using zlib\n");
} else if (BZip2CompressionSupported()) {
type = kBZip2Compression;
fprintf(stderr, "using bzip2\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return;
}
options.compression_per_level = new CompressionType[options.num_levels];
// do not compress L0
for (int i = 0; i < 1; i++) {
options.compression_per_level[i] = kNoCompression;
}
for (int i = 1; i < options.num_levels; i++) {
options.compression_per_level[i] = type;
}
Reopen(&options);
MinLevelHelper(this, options);
// do not compress L0 and L1
for (int i = 0; i < 2; i++) {
options.compression_per_level[i] = kNoCompression;
}
for (int i = 2; i < options.num_levels; i++) {
options.compression_per_level[i] = type;
}
DestroyAndReopen(&options);
MinLevelHelper(this, options);
}
TEST(DBTest, RepeatedWritesToSameKey) { TEST(DBTest, RepeatedWritesToSameKey) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env_; options.env = env_;

View file

@ -1231,6 +1231,7 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
void VersionSet::Finalize(Version* v) { void VersionSet::Finalize(Version* v) {
double max_score = 0;
for (int level = 0; level < NumberLevels()-1; level++) { for (int level = 0; level < NumberLevels()-1; level++) {
double score; double score;
if (level == 0) { if (level == 0) {
@ -1274,11 +1275,17 @@ void VersionSet::Finalize(Version* v) {
if (score > 1) { if (score > 1) {
// Log(options_->info_log, "XXX score l%d = %d ", level, (int)score); // Log(options_->info_log, "XXX score l%d = %d ", level, (int)score);
} }
if (max_score < score) {
max_score = score;
}
} }
v->compaction_level_[level] = level; v->compaction_level_[level] = level;
v->compaction_score_[level] = score; v->compaction_score_[level] = score;
} }
// update the max compaction score in levels 1 to n-1
v->max_compaction_score_ = max_score;
// sort all the levels based on their score. Higher scores get listed // sort all the levels based on their score. Higher scores get listed
// first. Use bubble sort because the number of entries are small. // first. Use bubble sort because the number of entries are small.
for(int i = 0; i < NumberLevels()-2; i++) { for(int i = 0; i < NumberLevels()-2; i++) {

View file

@ -145,6 +145,7 @@ class Version {
// These are used to pick the best compaction level // These are used to pick the best compaction level
std::vector<double> compaction_score_; std::vector<double> compaction_score_;
std::vector<int> compaction_level_; std::vector<int> compaction_level_;
double max_compaction_score_; // max score in l1 to ln-1
// The offset in the manifest file where this version is stored. // The offset in the manifest file where this version is stored.
uint64_t offset_manifest_file_; uint64_t offset_manifest_file_;
@ -264,6 +265,11 @@ class VersionSet {
NeedsSizeCompaction()); NeedsSizeCompaction());
} }
// Returns the maxmimum compaction score for levels 1 to max
double MaxCompactionScore() const {
return current_->max_compaction_score_;
}
// Add all files listed in any live version to *live. // Add all files listed in any live version to *live.
// May also mutate some internal state. // May also mutate some internal state.
void AddLiveFiles(std::set<uint64_t>* live); void AddLiveFiles(std::set<uint64_t>* live);

View file

@ -226,9 +226,9 @@ class WritableFile {
virtual Status Flush() = 0; virtual Status Flush() = 0;
virtual Status Sync() = 0; // sync data virtual Status Sync() = 0; // sync data
/* /*
* Sync data and/or metadata as well. * Sync data and/or metadata as well.
* By default, sync only metadata. * By default, sync only metadata.
* Override this method for environments where we need to sync * Override this method for environments where we need to sync
* metadata as well. * metadata as well.
*/ */
@ -252,11 +252,15 @@ class WritableFile {
// An interface for writing log messages. // An interface for writing log messages.
class Logger { class Logger {
public: public:
enum { DO_NOT_SUPPORT_GET_LOG_FILE_SIZE = -1 };
Logger() { } Logger() { }
virtual ~Logger(); virtual ~Logger();
// Write an entry to the log file with the specified format. // Write an entry to the log file with the specified format.
virtual void Logv(const char* format, va_list ap) = 0; virtual void Logv(const char* format, va_list ap) = 0;
virtual size_t GetLogFileSize() const {
return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE;
}
private: private:
// No copying allowed // No copying allowed

View file

@ -141,6 +141,20 @@ struct Options {
// efficiently detect that and will switch to uncompressed mode. // efficiently detect that and will switch to uncompressed mode.
CompressionType compression; CompressionType compression;
// Different levels can have different compression policies. There
// are cases where most lower levels would like to quick compression
// algorithm while the higher levels (which have more data) use
// compression algorithms that have better compression but could
// be slower. This array, if non NULL, should have an entry for
// each level of the database. This array, if non NULL, overides the
// value specified in the previous field 'compression'. The caller is
// reponsible for allocating memory and initializing the values in it
// before invoking Open(). The caller is responsible for freeing this
// array and it could be freed anytime after the return from Open().
// This could have been a std::vector but that makes the equivalent
// java/C api hard to construct.
CompressionType* compression_per_level;
// If non-NULL, use the specified filter policy to reduce disk reads. // If non-NULL, use the specified filter policy to reduce disk reads.
// Many applications will benefit from passing the result of // Many applications will benefit from passing the result of
// NewBloomFilterPolicy() here. // NewBloomFilterPolicy() here.
@ -257,6 +271,17 @@ struct Options {
// Default: 1 // Default: 1
int max_background_compactions; int max_background_compactions;
// Specify the maximal size of the info log file. If the log file
// is larger than `max_log_file_size`, a new info log file will
// be created.
// If max_log_file_size == 0, all logs will be written to one
// log file.
size_t max_log_file_size;
// Puts are delayed when any level has a compaction score that
// exceeds rate_limit. This is ignored when <= 1.0.
double rate_limit;
// Create an Options object with default values for all fields. // Create an Options object with default values for all fields.
Options(); Options();

View file

@ -27,8 +27,10 @@ class TableBuilder {
public: public:
// Create a builder that will store the contents of the table it is // Create a builder that will store the contents of the table it is
// building in *file. Does not close the file. It is up to the // building in *file. Does not close the file. It is up to the
// caller to close the file after calling Finish(). // caller to close the file after calling Finish(). The output file
TableBuilder(const Options& options, WritableFile* file); // will be part of level specified by 'level'. A value of -1 means
// that the caller does not know which level the output file will reside.
TableBuilder(const Options& options, WritableFile* file, int level=-1);
// REQUIRES: Either Finish() or Abandon() has been called. // REQUIRES: Either Finish() or Abandon() has been called.
~TableBuilder(); ~TableBuilder();
@ -81,6 +83,7 @@ class TableBuilder {
struct Rep; struct Rep;
Rep* rep_; Rep* rep_;
int level_;
// No copying allowed // No copying allowed
TableBuilder(const TableBuilder&); TableBuilder(const TableBuilder&);

View file

@ -60,8 +60,9 @@ struct TableBuilder::Rep {
} }
}; };
TableBuilder::TableBuilder(const Options& options, WritableFile* file) TableBuilder::TableBuilder(const Options& options, WritableFile* file,
: rep_(new Rep(options, file)) { int level)
: rep_(new Rep(options, file)), level_(level) {
if (rep_->filter_block != NULL) { if (rep_->filter_block != NULL) {
rep_->filter_block->StartBlock(0); rep_->filter_block->StartBlock(0);
} }
@ -152,7 +153,21 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
Slice block_contents; Slice block_contents;
std::string* compressed = &r->compressed_output; std::string* compressed = &r->compressed_output;
CompressionType type = r->options.compression; CompressionType type;
// If the use has specified a different compression level for each level,
// then pick the compresison for that level.
if (r->options.compression_per_level != NULL) {
if (level_ == -1) {
// this is mostly for backward compatibility. The builder does not
// know which level this file belongs to. Apply the compression level
// specified for level 0 to all levels.
type = r->options.compression_per_level[0];
} else {
type = r->options.compression_per_level[level_];
}
} else {
type = r->options.compression;
}
switch (type) { switch (type) {
case kNoCompression: case kNoCompression:
block_contents = raw; block_contents = raw;

81
util/auto_split_logger.h Normal file
View file

@ -0,0 +1,81 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// Logger implementation that can be shared by all environments
// where enough posix functionality is available.
#ifndef STORAGE_LEVELDB_UTIL_SPLIT_LOGGER_LOGGER_H_
#define STORAGE_LEVELDB_UTIL_SPLIT_LOGGER_LOGGER_H_
#include "util/posix_logger.h"
#include "db/filename.h"
namespace leveldb {
// AutoSplitLogger can automatically create a new log file
// if the file size exceeds the limit.
//
// The template parameter "UnderlyingLogger" can be any Logger class
// that has the method "GetLogFileSize()" and "ResetFile()"
template<class UnderlyingLogger>
class AutoSplitLogger : public Logger {
private:
std::string log_fname_; // Current active info log's file name.
std::string dbname_;
std::string db_log_dir_;
std::string db_absolute_path_;
Env* env_;
UnderlyingLogger* logger_;
const size_t MAX_LOG_FILE_SIZE;
Status status_;
public:
AutoSplitLogger(Env* env, const std::string& dbname,
const std::string& db_log_dir, size_t log_max_size):
env_(env), dbname_(dbname), db_log_dir_(db_log_dir),
MAX_LOG_FILE_SIZE(log_max_size), status_(Status::OK()) {
env->GetAbsolutePath(dbname, &db_absolute_path_);
log_fname_ = InfoLogFileName(dbname_, db_absolute_path_, db_log_dir_);
InitLogger();
}
~AutoSplitLogger() { delete logger_; }
virtual void Logv(const char* format, va_list ap) {
assert(GetStatus().ok());
logger_->Logv(format, ap);
// Check if the log file should be splitted.
if (logger_->GetLogFileSize() > MAX_LOG_FILE_SIZE) {
delete logger_;
std::string old_fname = OldInfoLogFileName(
dbname_, env_->NowMicros(), db_absolute_path_, db_log_dir_);
env_->RenameFile(log_fname_, old_fname);
InitLogger();
}
}
// check if the logger has any problem.
Status GetStatus() {
return status_;
}
private:
Status InitLogger() {
status_ = env_->NewLogger(log_fname_, &logger_);
if (!status_.ok()) {
logger_ = NULL;
}
if (logger_->GetLogFileSize() ==
Logger::DO_NOT_SUPPORT_GET_LOG_FILE_SIZE) {
status_ = Status::NotSupported(
"The underlying logger doesn't support GetLogFileSize()");
}
return status_;
}
}; // class AutoSplitLogger
} // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_SPLIT_LOGGER_LOGGER_H_

View file

@ -25,6 +25,7 @@ Options::Options()
block_size(4096), block_size(4096),
block_restart_interval(16), block_restart_interval(16),
compression(kSnappyCompression), compression(kSnappyCompression),
compression_per_level(NULL),
filter_policy(NULL), filter_policy(NULL),
num_levels(7), num_levels(7),
level0_file_num_compaction_trigger(4), level0_file_num_compaction_trigger(4),
@ -44,7 +45,9 @@ Options::Options()
db_log_dir(""), db_log_dir(""),
disable_seek_compaction(false), disable_seek_compaction(false),
delete_obsolete_files_period_micros(0), delete_obsolete_files_period_micros(0),
max_background_compactions(1) { max_background_compactions(1),
max_log_file_size(0),
rate_limit(0.0) {
} }
void void
@ -70,7 +73,21 @@ Options::Dump(
Log(log," Options.num_levels: %d", num_levels); Log(log," Options.num_levels: %d", num_levels);
Log(log," Options.disableDataSync: %d", disableDataSync); Log(log," Options.disableDataSync: %d", disableDataSync);
Log(log," Options.use_fsync: %d", use_fsync); Log(log," Options.use_fsync: %d", use_fsync);
Log(log," Options.db_stats_log_interval: %d", if (compression_per_level != NULL) {
for (unsigned int i = 0; i < num_levels; i++){
Log(log," Options.compression[%d]: %d",
i, compression_per_level[i]);
}
} else {
Log(log," Options.compression: %d", compression);
}
Log(log," Options.filter_policy: %s",
filter_policy == NULL ? "NULL" : filter_policy->Name());
Log(log," Options.num_levels: %d", num_levels);
Log(log," Options.disableDataSync: %d", disableDataSync);
Log(log," Options.use_fsync: %d", use_fsync);
Log(log," Options.max_log_file_size: %d", max_log_file_size);
Log(log," Options.db_stats_log_interval: %d",
db_stats_log_interval); db_stats_log_interval);
Log(log," Options.level0_file_num_compaction_trigger: %d", Log(log," Options.level0_file_num_compaction_trigger: %d",
level0_file_num_compaction_trigger); level0_file_num_compaction_trigger);
@ -100,6 +117,8 @@ Options::Dump(
delete_obsolete_files_period_micros); delete_obsolete_files_period_micros);
Log(log," Options.max_background_compactions: %d", Log(log," Options.max_background_compactions: %d",
max_background_compactions); max_background_compactions);
Log(log," Options.rate_limit: %.2f",
rate_limit);
} // Options::Dump } // Options::Dump

View file

@ -20,8 +20,11 @@ class PosixLogger : public Logger {
private: private:
FILE* file_; FILE* file_;
uint64_t (*gettid_)(); // Return the thread id for the current thread uint64_t (*gettid_)(); // Return the thread id for the current thread
size_t log_size_;
public: public:
PosixLogger(FILE* f, uint64_t (*gettid)()) : file_(f), gettid_(gettid) { } PosixLogger(FILE* f, uint64_t (*gettid)()) :
file_(f), gettid_(gettid), log_size_(0) { }
virtual ~PosixLogger() { virtual ~PosixLogger() {
fclose(file_); fclose(file_);
} }
@ -85,12 +88,17 @@ class PosixLogger : public Logger {
assert(p <= limit); assert(p <= limit);
fwrite(base, 1, p - base, file_); fwrite(base, 1, p - base, file_);
fflush(file_); fflush(file_);
log_size_ += (p - base);
if (base != buffer) { if (base != buffer) {
delete[] base; delete[] base;
} }
break; break;
} }
} }
size_t GetLogFileSize() const {
return log_size_;
}
}; };
} // namespace leveldb } // namespace leveldb