diff --git a/db/c.cc b/db/c.cc index 08c1dcbf52..7614e7edf6 100644 --- a/db/c.cc +++ b/db/c.cc @@ -508,6 +508,16 @@ void leveldb_options_set_compression(leveldb_options_t* opt, int t) { opt->rep.compression = static_cast(t); } +void leveldb_options_set_compression_per_level(leveldb_options_t* opt, + int* level_values, + size_t num_levels) { + opt->rep.compression_per_level.resize(num_levels); + for (size_t i = 0; i < num_levels; ++i) { + opt->rep.compression_per_level[i] = + static_cast(level_values[i]); + } +} + void leveldb_options_set_compression_options( leveldb_options_t* opt, int w_bits, int level, int strategy) { opt->rep.compression_opts.window_bits = w_bits; diff --git a/db/c_test.c b/db/c_test.c index 2c36972b86..1d71b65080 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -188,6 +188,9 @@ int main(int argc, char** argv) { leveldb_options_set_block_restart_interval(options, 8); leveldb_options_set_compression(options, leveldb_no_compression); leveldb_options_set_compression_options(options, -14, -1, 0); + int compression_levels[] = {leveldb_no_compression, leveldb_no_compression, + leveldb_no_compression, leveldb_no_compression}; + leveldb_options_set_compression_per_level(options, compression_levels, 4); roptions = leveldb_readoptions_create(); leveldb_readoptions_set_verify_checksums(roptions, 1); diff --git a/db/db_bench.cc b/db/db_bench.cc index 6f0524e074..601d3b54e7 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -974,7 +974,7 @@ class Benchmark { options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds; if (FLAGS_min_level_to_compress >= 0) { assert(FLAGS_min_level_to_compress <= FLAGS_num_levels); - options.compression_per_level = new CompressionType[FLAGS_num_levels]; + options.compression_per_level.resize(FLAGS_num_levels); for (int i = 0; i < FLAGS_min_level_to_compress; i++) { options.compression_per_level[i] = kNoCompression; } @@ -1003,7 +1003,7 @@ class Benchmark { exit(1); } if (FLAGS_min_level_to_compress >= 0) { - delete options.compression_per_level; + options.compression_per_level.clear(); } } diff --git a/db/db_impl.cc b/db/db_impl.cc index dc2a3650a5..053ff93b70 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -158,12 +158,7 @@ Options SanitizeOptions(const std::string& dbname, if (result.block_cache == NULL && !result.no_block_cache) { result.block_cache = NewLRUCache(8 << 20); } - if (src.compression_per_level != NULL) { - result.compression_per_level = new CompressionType[src.num_levels]; - for (int i = 0; i < src.num_levels; i++) { - result.compression_per_level[i] = src.compression_per_level[i]; - } - } + result.compression_per_level = src.compression_per_level; return result; } @@ -245,10 +240,6 @@ DBImpl::~DBImpl() { delete tmp_batch_; delete[] stats_; - if (options_.compression_per_level != NULL) { - delete[] options_.compression_per_level; - } - delete logger_; } @@ -884,7 +875,7 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() { } Status DBImpl::GetUpdatesSince(SequenceNumber seq, - TransactionLogIterator** iter) { + unique_ptr* iter) { // Get All Log Files. // Sort Files @@ -916,9 +907,8 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, if (!s.ok()) { return s; } - TransactionLogIteratorImpl* impl = - new TransactionLogIteratorImpl(dbname_, &options_, seq, probableWALFiles); - *iter = impl; + iter->reset( + new TransactionLogIteratorImpl(dbname_, &options_, seq, probableWALFiles)); return Status::OK(); } diff --git a/db/db_impl.h b/db/db_impl.h index 68a338236c..fe09fb292e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -57,7 +57,7 @@ class DBImpl : public DB { uint64_t* manifest_file_size); virtual SequenceNumber GetLatestSequenceNumber(); virtual Status GetUpdatesSince(SequenceNumber seq_number, - TransactionLogIterator ** iter); + unique_ptr* iter); // Extra methods (for testing) that are not in the public DB interface diff --git a/db/db_test.cc b/db/db_test.cc index 55ecbb292c..37a9d2be80 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -524,6 +524,33 @@ TEST(DBTest, ReadWrite) { } while (ChangeOptions()); } +static std::string Key(int i) { + char buf[100]; + snprintf(buf, sizeof(buf), "key%06d", i); + return std::string(buf); +} + +TEST(DBTest, LevelLimitReopen) { + Options options = CurrentOptions(); + Reopen(&options); + + const std::string value(1024 * 1024, ' '); + int i = 0; + while (NumTableFilesAtLevel(2) == 0) { + ASSERT_OK(Put(Key(i++), value)); + } + + options.num_levels = 1; + Status s = TryReopen(&options); + ASSERT_EQ(s.IsCorruption(), true); + ASSERT_EQ(s.ToString(), + "Corruption: VersionEdit: db already has " + "more levels than options.num_levels"); + + options.num_levels = 10; + ASSERT_OK(TryReopen(&options)); +} + TEST(DBTest, Preallocation) { const std::string src = dbname_ + "/alloc_test"; unique_ptr srcfile; @@ -1026,12 +1053,6 @@ TEST(DBTest, RecoverDuringMemtableCompaction) { } while (ChangeOptions()); } -static std::string Key(int i) { - char buf[100]; - snprintf(buf, sizeof(buf), "key%06d", i); - return std::string(buf); -} - TEST(DBTest, MinorCompactionsHappen) { Options options = CurrentOptions(); options.write_buffer_size = 10000; @@ -1221,7 +1242,7 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits, fprintf(stderr, "skipping test, compression disabled\n"); return false; } - options.compression_per_level = new CompressionType[options.num_levels]; + options.compression_per_level.resize(options.num_levels); // do not compress L0 for (int i = 0; i < 1; i++) { @@ -2439,7 +2460,7 @@ TEST(DBTest, TransactionLogIterator) { Put("key2", value); ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U); { - TransactionLogIterator* iter; + unique_ptr iter; Status status = dbfull()->GetUpdatesSince(0, &iter); ASSERT_TRUE(status.ok()); ASSERT_TRUE(!iter->Valid()); @@ -2465,7 +2486,7 @@ TEST(DBTest, TransactionLogIterator) { Put("key6", value); } { - TransactionLogIterator* iter; + unique_ptr iter; Status status = dbfull()->GetUpdatesSince(0, &iter); ASSERT_TRUE(status.ok()); ASSERT_TRUE(!iter->Valid()); @@ -2750,7 +2771,7 @@ class ModelDB: public DB { return 0; } virtual Status GetUpdatesSince(leveldb::SequenceNumber, - leveldb::TransactionLogIterator**) { + unique_ptr*) { return Status::NotSupported("Not supported in Model DB"); } diff --git a/db/version_edit.cc b/db/version_edit.cc index 9a52310cae..198ca275af 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -95,13 +95,16 @@ static bool GetInternalKey(Slice* input, InternalKey* dst) { } } -bool VersionEdit::GetLevel(Slice* input, int* level) { +bool VersionEdit::GetLevel(Slice* input, int* level, const char** msg) { uint32_t v; if (GetVarint32(input, &v) && (int)v < number_levels_) { *level = v; return true; } else { + if ((int)v >= number_levels_) { + *msg = "db already has more levels than options.num_levels"; + } return false; } } @@ -163,32 +166,38 @@ Status VersionEdit::DecodeFrom(const Slice& src) { break; case kCompactPointer: - if (GetLevel(&input, &level) && + if (GetLevel(&input, &level, &msg) && GetInternalKey(&input, &key)) { compact_pointers_.push_back(std::make_pair(level, key)); } else { - msg = "compaction pointer"; + if (!msg) { + msg = "compaction pointer"; + } } break; case kDeletedFile: - if (GetLevel(&input, &level) && + if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number)) { deleted_files_.insert(std::make_pair(level, number)); } else { - msg = "deleted file"; + if (!msg) { + msg = "deleted file"; + } } break; case kNewFile: - if (GetLevel(&input, &level) && + if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &f.number) && GetVarint64(&input, &f.file_size) && GetInternalKey(&input, &f.smallest) && GetInternalKey(&input, &f.largest)) { new_files_.push_back(std::make_pair(level, f)); } else { - msg = "new-file entry"; + if (!msg) { + msg = "new-file entry"; + } } break; diff --git a/db/version_edit.h b/db/version_edit.h index c11eda8561..3776c2179e 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -91,7 +91,7 @@ class VersionEdit { typedef std::set< std::pair > DeletedFileSet; - bool GetLevel(Slice* input, int* level); + bool GetLevel(Slice* input, int* level, const char** msg); int number_levels_; std::string comparator_; diff --git a/include/leveldb/c.h b/include/leveldb/c.h index f62ffc07bb..8799a5fb8c 100644 --- a/include/leveldb/c.h +++ b/include/leveldb/c.h @@ -183,6 +183,10 @@ extern void leveldb_options_destroy(leveldb_options_t*); extern void leveldb_options_set_comparator( leveldb_options_t*, leveldb_comparator_t*); +extern void leveldb_options_set_compression_per_level( + leveldb_options_t* opt, + int* level_values, + size_t num_levels); extern void leveldb_options_set_filter_policy( leveldb_options_t*, leveldb_filterpolicy_t*); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 18a396d256..fe341d8ef4 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -7,6 +7,7 @@ #include #include +#include #include #include "leveldb/iterator.h" #include "leveldb/options.h" @@ -15,6 +16,8 @@ namespace leveldb { +using std::unique_ptr; + // Update Makefile if you change these static const int kMajorVersion = 1; static const int kMinorVersion = 5; @@ -197,7 +200,7 @@ class DB { // cleared aggressively and the iterator might keep getting invalid before // an update is read. virtual Status GetUpdatesSince(SequenceNumber seq_number, - TransactionLogIterator** iter) = 0; + unique_ptr* iter) = 0; private: // No copying allowed DB(const DB&); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 6a090b6b3d..bd59f9b00e 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include "leveldb/slice.h" @@ -170,7 +171,7 @@ struct Options { // array and it could be freed anytime after the return from Open(). // This could have been a std::vector but that makes the equivalent // java/C api hard to construct. - CompressionType* compression_per_level; + std::vector compression_per_level; //different options for compression algorithms CompressionOptions compression_opts; diff --git a/table/table_builder.cc b/table/table_builder.cc index d867a1ca92..975d773faf 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -156,15 +156,14 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { CompressionType type; // If the use has specified a different compression level for each level, // then pick the compresison for that level. - if (r->options.compression_per_level != NULL) { - if (level_ == -1) { - // this is mostly for backward compatibility. The builder does not - // know which level this file belongs to. Apply the compression level - // specified for level 0 to all levels. - type = r->options.compression_per_level[0]; - } else { - type = r->options.compression_per_level[level_]; - } + if (!r->options.compression_per_level.empty()) { + const int n = r->options.compression_per_level.size(); + // It is possible for level_ to be -1; in that case, we use level + // 0's compression. This occurs mostly in backwards compatibility + // situations when the builder doesn't know what level the file + // belongs to. Likewise, if level_ is beyond the end of the + // specified compression levels, use the last value. + type = r->options.compression_per_level[std::max(0, std::min(level_, n))]; } else { type = r->options.compression; } diff --git a/tools/db_repl_stress.cc b/tools/db_repl_stress.cc index 091b0ca584..582e567eee 100644 --- a/tools/db_repl_stress.cc +++ b/tools/db_repl_stress.cc @@ -51,18 +51,13 @@ struct ReplicationThread { volatile bool has_more; }; -// experimenting with isNull. Makes code more readable? -static inline bool isNull(const void * const ptr) { - return ptr == NULL; -} - static void ReplicationThreadBody(void* arg) { ReplicationThread* t = reinterpret_cast(arg); DB* db = t->db; - TransactionLogIterator* iter = NULL; + unique_ptr iter; SequenceNumber currentSeqNum = 0; while (t->stop.Acquire_Load() != NULL) { - if (isNull(iter)) { + if (!iter) { db->GetUpdatesSince(currentSeqNum, &iter); fprintf(stdout, "Refreshing iterator\n"); iter->Next(); @@ -83,8 +78,7 @@ static void ReplicationThreadBody(void* arg) { t->no_read++; } } - delete iter; - iter = NULL; + iter.reset(); } } diff --git a/util/options.cc b/util/options.cc index f4a4e27bbe..ea58893ab0 100644 --- a/util/options.cc +++ b/util/options.cc @@ -26,7 +26,6 @@ Options::Options() block_size(4096), block_restart_interval(16), compression(kSnappyCompression), - compression_per_level(NULL), filter_policy(NULL), num_levels(7), level0_file_num_compaction_trigger(4), @@ -80,8 +79,8 @@ Options::Dump(Logger* log) const } Log(log," Options.block_size: %zd", block_size); Log(log," Options.block_restart_interval: %d", block_restart_interval); - if (compression_per_level != NULL) { - for (int i = 0; i < num_levels; i++){ + if (!compression_per_level.empty()) { + for (int i = 0; i < compression_per_level.size(); i++) { Log(log," Options.compression[%d]: %d", i, compression_per_level[i]); }