Fix poor error on num_levels mismatch and few other minor improvements

Summary:
Previously, if you opened a db with num_levels set lower than
the database, you received the unhelpful message "Corruption:
VersionEdit: new-file entry."  Now you get a more verbose message
describing the issue.

Also, fix handling of compression_levels (both the run-over-the-end
issue and the memory management of it).

Lastly, unique_ptr'ify a couple of minor calls.

Test Plan: make check

Reviewers: dhruba

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D8151
This commit is contained in:
Chip Turner 2013-01-24 10:54:26 -08:00
parent 16e96b1242
commit 0b83a83191
14 changed files with 91 additions and 58 deletions

10
db/c.cc
View File

@ -508,6 +508,16 @@ void leveldb_options_set_compression(leveldb_options_t* opt, int t) {
opt->rep.compression = static_cast<CompressionType>(t); opt->rep.compression = static_cast<CompressionType>(t);
} }
void leveldb_options_set_compression_per_level(leveldb_options_t* opt,
int* level_values,
size_t num_levels) {
opt->rep.compression_per_level.resize(num_levels);
for (size_t i = 0; i < num_levels; ++i) {
opt->rep.compression_per_level[i] =
static_cast<CompressionType>(level_values[i]);
}
}
void leveldb_options_set_compression_options( void leveldb_options_set_compression_options(
leveldb_options_t* opt, int w_bits, int level, int strategy) { leveldb_options_t* opt, int w_bits, int level, int strategy) {
opt->rep.compression_opts.window_bits = w_bits; opt->rep.compression_opts.window_bits = w_bits;

View File

@ -188,6 +188,9 @@ int main(int argc, char** argv) {
leveldb_options_set_block_restart_interval(options, 8); leveldb_options_set_block_restart_interval(options, 8);
leveldb_options_set_compression(options, leveldb_no_compression); leveldb_options_set_compression(options, leveldb_no_compression);
leveldb_options_set_compression_options(options, -14, -1, 0); leveldb_options_set_compression_options(options, -14, -1, 0);
int compression_levels[] = {leveldb_no_compression, leveldb_no_compression,
leveldb_no_compression, leveldb_no_compression};
leveldb_options_set_compression_per_level(options, compression_levels, 4);
roptions = leveldb_readoptions_create(); roptions = leveldb_readoptions_create();
leveldb_readoptions_set_verify_checksums(roptions, 1); leveldb_readoptions_set_verify_checksums(roptions, 1);

View File

@ -974,7 +974,7 @@ class Benchmark {
options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds; options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds;
if (FLAGS_min_level_to_compress >= 0) { if (FLAGS_min_level_to_compress >= 0) {
assert(FLAGS_min_level_to_compress <= FLAGS_num_levels); assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
options.compression_per_level = new CompressionType[FLAGS_num_levels]; options.compression_per_level.resize(FLAGS_num_levels);
for (int i = 0; i < FLAGS_min_level_to_compress; i++) { for (int i = 0; i < FLAGS_min_level_to_compress; i++) {
options.compression_per_level[i] = kNoCompression; options.compression_per_level[i] = kNoCompression;
} }
@ -1003,7 +1003,7 @@ class Benchmark {
exit(1); exit(1);
} }
if (FLAGS_min_level_to_compress >= 0) { if (FLAGS_min_level_to_compress >= 0) {
delete options.compression_per_level; options.compression_per_level.clear();
} }
} }

View File

@ -158,12 +158,7 @@ Options SanitizeOptions(const std::string& dbname,
if (result.block_cache == NULL && !result.no_block_cache) { if (result.block_cache == NULL && !result.no_block_cache) {
result.block_cache = NewLRUCache(8 << 20); result.block_cache = NewLRUCache(8 << 20);
} }
if (src.compression_per_level != NULL) { result.compression_per_level = src.compression_per_level;
result.compression_per_level = new CompressionType[src.num_levels];
for (int i = 0; i < src.num_levels; i++) {
result.compression_per_level[i] = src.compression_per_level[i];
}
}
return result; return result;
} }
@ -245,10 +240,6 @@ DBImpl::~DBImpl() {
delete tmp_batch_; delete tmp_batch_;
delete[] stats_; delete[] stats_;
if (options_.compression_per_level != NULL) {
delete[] options_.compression_per_level;
}
delete logger_; delete logger_;
} }
@ -884,7 +875,7 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() {
} }
Status DBImpl::GetUpdatesSince(SequenceNumber seq, Status DBImpl::GetUpdatesSince(SequenceNumber seq,
TransactionLogIterator** iter) { unique_ptr<TransactionLogIterator>* iter) {
// Get All Log Files. // Get All Log Files.
// Sort Files // Sort Files
@ -916,9 +907,8 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
TransactionLogIteratorImpl* impl = iter->reset(
new TransactionLogIteratorImpl(dbname_, &options_, seq, probableWALFiles); new TransactionLogIteratorImpl(dbname_, &options_, seq, probableWALFiles));
*iter = impl;
return Status::OK(); return Status::OK();
} }

View File

@ -57,7 +57,7 @@ class DBImpl : public DB {
uint64_t* manifest_file_size); uint64_t* manifest_file_size);
virtual SequenceNumber GetLatestSequenceNumber(); virtual SequenceNumber GetLatestSequenceNumber();
virtual Status GetUpdatesSince(SequenceNumber seq_number, virtual Status GetUpdatesSince(SequenceNumber seq_number,
TransactionLogIterator ** iter); unique_ptr<TransactionLogIterator>* iter);
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface

View File

@ -524,6 +524,33 @@ TEST(DBTest, ReadWrite) {
} while (ChangeOptions()); } while (ChangeOptions());
} }
static std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key%06d", i);
return std::string(buf);
}
TEST(DBTest, LevelLimitReopen) {
Options options = CurrentOptions();
Reopen(&options);
const std::string value(1024 * 1024, ' ');
int i = 0;
while (NumTableFilesAtLevel(2) == 0) {
ASSERT_OK(Put(Key(i++), value));
}
options.num_levels = 1;
Status s = TryReopen(&options);
ASSERT_EQ(s.IsCorruption(), true);
ASSERT_EQ(s.ToString(),
"Corruption: VersionEdit: db already has "
"more levels than options.num_levels");
options.num_levels = 10;
ASSERT_OK(TryReopen(&options));
}
TEST(DBTest, Preallocation) { TEST(DBTest, Preallocation) {
const std::string src = dbname_ + "/alloc_test"; const std::string src = dbname_ + "/alloc_test";
unique_ptr<WritableFile> srcfile; unique_ptr<WritableFile> srcfile;
@ -1026,12 +1053,6 @@ TEST(DBTest, RecoverDuringMemtableCompaction) {
} while (ChangeOptions()); } while (ChangeOptions());
} }
static std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key%06d", i);
return std::string(buf);
}
TEST(DBTest, MinorCompactionsHappen) { TEST(DBTest, MinorCompactionsHappen) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.write_buffer_size = 10000; options.write_buffer_size = 10000;
@ -1221,7 +1242,7 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
fprintf(stderr, "skipping test, compression disabled\n"); fprintf(stderr, "skipping test, compression disabled\n");
return false; return false;
} }
options.compression_per_level = new CompressionType[options.num_levels]; options.compression_per_level.resize(options.num_levels);
// do not compress L0 // do not compress L0
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
@ -2439,7 +2460,7 @@ TEST(DBTest, TransactionLogIterator) {
Put("key2", value); Put("key2", value);
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U); ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
{ {
TransactionLogIterator* iter; unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(0, &iter); Status status = dbfull()->GetUpdatesSince(0, &iter);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
ASSERT_TRUE(!iter->Valid()); ASSERT_TRUE(!iter->Valid());
@ -2465,7 +2486,7 @@ TEST(DBTest, TransactionLogIterator) {
Put("key6", value); Put("key6", value);
} }
{ {
TransactionLogIterator* iter; unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(0, &iter); Status status = dbfull()->GetUpdatesSince(0, &iter);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
ASSERT_TRUE(!iter->Valid()); ASSERT_TRUE(!iter->Valid());
@ -2750,7 +2771,7 @@ class ModelDB: public DB {
return 0; return 0;
} }
virtual Status GetUpdatesSince(leveldb::SequenceNumber, virtual Status GetUpdatesSince(leveldb::SequenceNumber,
leveldb::TransactionLogIterator**) { unique_ptr<leveldb::TransactionLogIterator>*) {
return Status::NotSupported("Not supported in Model DB"); return Status::NotSupported("Not supported in Model DB");
} }

View File

@ -95,13 +95,16 @@ static bool GetInternalKey(Slice* input, InternalKey* dst) {
} }
} }
bool VersionEdit::GetLevel(Slice* input, int* level) { bool VersionEdit::GetLevel(Slice* input, int* level, const char** msg) {
uint32_t v; uint32_t v;
if (GetVarint32(input, &v) && if (GetVarint32(input, &v) &&
(int)v < number_levels_) { (int)v < number_levels_) {
*level = v; *level = v;
return true; return true;
} else { } else {
if ((int)v >= number_levels_) {
*msg = "db already has more levels than options.num_levels";
}
return false; return false;
} }
} }
@ -163,32 +166,38 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
break; break;
case kCompactPointer: case kCompactPointer:
if (GetLevel(&input, &level) && if (GetLevel(&input, &level, &msg) &&
GetInternalKey(&input, &key)) { GetInternalKey(&input, &key)) {
compact_pointers_.push_back(std::make_pair(level, key)); compact_pointers_.push_back(std::make_pair(level, key));
} else { } else {
msg = "compaction pointer"; if (!msg) {
msg = "compaction pointer";
}
} }
break; break;
case kDeletedFile: case kDeletedFile:
if (GetLevel(&input, &level) && if (GetLevel(&input, &level, &msg) &&
GetVarint64(&input, &number)) { GetVarint64(&input, &number)) {
deleted_files_.insert(std::make_pair(level, number)); deleted_files_.insert(std::make_pair(level, number));
} else { } else {
msg = "deleted file"; if (!msg) {
msg = "deleted file";
}
} }
break; break;
case kNewFile: case kNewFile:
if (GetLevel(&input, &level) && if (GetLevel(&input, &level, &msg) &&
GetVarint64(&input, &f.number) && GetVarint64(&input, &f.number) &&
GetVarint64(&input, &f.file_size) && GetVarint64(&input, &f.file_size) &&
GetInternalKey(&input, &f.smallest) && GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest)) { GetInternalKey(&input, &f.largest)) {
new_files_.push_back(std::make_pair(level, f)); new_files_.push_back(std::make_pair(level, f));
} else { } else {
msg = "new-file entry"; if (!msg) {
msg = "new-file entry";
}
} }
break; break;

View File

@ -91,7 +91,7 @@ class VersionEdit {
typedef std::set< std::pair<int, uint64_t> > DeletedFileSet; typedef std::set< std::pair<int, uint64_t> > DeletedFileSet;
bool GetLevel(Slice* input, int* level); bool GetLevel(Slice* input, int* level, const char** msg);
int number_levels_; int number_levels_;
std::string comparator_; std::string comparator_;

View File

@ -183,6 +183,10 @@ extern void leveldb_options_destroy(leveldb_options_t*);
extern void leveldb_options_set_comparator( extern void leveldb_options_set_comparator(
leveldb_options_t*, leveldb_options_t*,
leveldb_comparator_t*); leveldb_comparator_t*);
extern void leveldb_options_set_compression_per_level(
leveldb_options_t* opt,
int* level_values,
size_t num_levels);
extern void leveldb_options_set_filter_policy( extern void leveldb_options_set_filter_policy(
leveldb_options_t*, leveldb_options_t*,
leveldb_filterpolicy_t*); leveldb_filterpolicy_t*);

View File

@ -7,6 +7,7 @@
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <memory>
#include <vector> #include <vector>
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
#include "leveldb/options.h" #include "leveldb/options.h"
@ -15,6 +16,8 @@
namespace leveldb { namespace leveldb {
using std::unique_ptr;
// Update Makefile if you change these // Update Makefile if you change these
static const int kMajorVersion = 1; static const int kMajorVersion = 1;
static const int kMinorVersion = 5; static const int kMinorVersion = 5;
@ -197,7 +200,7 @@ class DB {
// cleared aggressively and the iterator might keep getting invalid before // cleared aggressively and the iterator might keep getting invalid before
// an update is read. // an update is read.
virtual Status GetUpdatesSince(SequenceNumber seq_number, virtual Status GetUpdatesSince(SequenceNumber seq_number,
TransactionLogIterator** iter) = 0; unique_ptr<TransactionLogIterator>* iter) = 0;
private: private:
// No copying allowed // No copying allowed
DB(const DB&); DB(const DB&);

View File

@ -8,6 +8,7 @@
#include <stddef.h> #include <stddef.h>
#include <string> #include <string>
#include <memory> #include <memory>
#include <vector>
#include <stdint.h> #include <stdint.h>
#include "leveldb/slice.h" #include "leveldb/slice.h"
@ -170,7 +171,7 @@ struct Options {
// array and it could be freed anytime after the return from Open(). // array and it could be freed anytime after the return from Open().
// This could have been a std::vector but that makes the equivalent // This could have been a std::vector but that makes the equivalent
// java/C api hard to construct. // java/C api hard to construct.
CompressionType* compression_per_level; std::vector<CompressionType> compression_per_level;
//different options for compression algorithms //different options for compression algorithms
CompressionOptions compression_opts; CompressionOptions compression_opts;

View File

@ -156,15 +156,14 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
CompressionType type; CompressionType type;
// If the use has specified a different compression level for each level, // If the use has specified a different compression level for each level,
// then pick the compresison for that level. // then pick the compresison for that level.
if (r->options.compression_per_level != NULL) { if (!r->options.compression_per_level.empty()) {
if (level_ == -1) { const int n = r->options.compression_per_level.size();
// this is mostly for backward compatibility. The builder does not // It is possible for level_ to be -1; in that case, we use level
// know which level this file belongs to. Apply the compression level // 0's compression. This occurs mostly in backwards compatibility
// specified for level 0 to all levels. // situations when the builder doesn't know what level the file
type = r->options.compression_per_level[0]; // belongs to. Likewise, if level_ is beyond the end of the
} else { // specified compression levels, use the last value.
type = r->options.compression_per_level[level_]; type = r->options.compression_per_level[std::max(0, std::min(level_, n))];
}
} else { } else {
type = r->options.compression; type = r->options.compression;
} }

View File

@ -51,18 +51,13 @@ struct ReplicationThread {
volatile bool has_more; volatile bool has_more;
}; };
// experimenting with isNull. Makes code more readable?
static inline bool isNull(const void * const ptr) {
return ptr == NULL;
}
static void ReplicationThreadBody(void* arg) { static void ReplicationThreadBody(void* arg) {
ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg); ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
DB* db = t->db; DB* db = t->db;
TransactionLogIterator* iter = NULL; unique_ptr<TransactionLogIterator> iter;
SequenceNumber currentSeqNum = 0; SequenceNumber currentSeqNum = 0;
while (t->stop.Acquire_Load() != NULL) { while (t->stop.Acquire_Load() != NULL) {
if (isNull(iter)) { if (!iter) {
db->GetUpdatesSince(currentSeqNum, &iter); db->GetUpdatesSince(currentSeqNum, &iter);
fprintf(stdout, "Refreshing iterator\n"); fprintf(stdout, "Refreshing iterator\n");
iter->Next(); iter->Next();
@ -83,8 +78,7 @@ static void ReplicationThreadBody(void* arg) {
t->no_read++; t->no_read++;
} }
} }
delete iter; iter.reset();
iter = NULL;
} }
} }

View File

@ -26,7 +26,6 @@ Options::Options()
block_size(4096), block_size(4096),
block_restart_interval(16), block_restart_interval(16),
compression(kSnappyCompression), compression(kSnappyCompression),
compression_per_level(NULL),
filter_policy(NULL), filter_policy(NULL),
num_levels(7), num_levels(7),
level0_file_num_compaction_trigger(4), level0_file_num_compaction_trigger(4),
@ -80,8 +79,8 @@ Options::Dump(Logger* log) const
} }
Log(log," Options.block_size: %zd", block_size); Log(log," Options.block_size: %zd", block_size);
Log(log," Options.block_restart_interval: %d", block_restart_interval); Log(log," Options.block_restart_interval: %d", block_restart_interval);
if (compression_per_level != NULL) { if (!compression_per_level.empty()) {
for (int i = 0; i < num_levels; i++){ for (int i = 0; i < compression_per_level.size(); i++) {
Log(log," Options.compression[%d]: %d", Log(log," Options.compression[%d]: %d",
i, compression_per_level[i]); i, compression_per_level[i]);
} }