diff --git a/db/db_impl.cc b/db/db_impl.cc index ac1f84dc35..cfd5eef5e1 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -214,6 +214,10 @@ Options SanitizeOptions(const std::string& dbname, std::make_shared() ); + if (!result.flush_block_policy_factory) { + result.SetUpDefaultFlushBlockPolicyFactory(); + } + return result; } diff --git a/db/table_stats_collector_test.cc b/db/table_stats_collector_test.cc index 52f4b4bb89..0f92de3ef7 100644 --- a/db/table_stats_collector_test.cc +++ b/db/table_stats_collector_test.cc @@ -83,10 +83,13 @@ class DumbLogger : public Logger { // Utilities test functions void MakeBuilder( - const Options& options, + Options options, std::unique_ptr* writable, std::unique_ptr* builder) { writable->reset(new FakeWritableFile); + if (!options.flush_block_policy_factory) { + options.SetUpDefaultFlushBlockPolicyFactory(); + } builder->reset( options.table_factory->GetTableBuilder(options, writable->get(), options.compression)); diff --git a/include/rocksdb/flush_block_policy.h b/include/rocksdb/flush_block_policy.h new file mode 100644 index 0000000000..776f476a57 --- /dev/null +++ b/include/rocksdb/flush_block_policy.h @@ -0,0 +1,64 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include + +namespace rocksdb { + +class Slice; +class BlockBuilder; + +// FlushBlockPolicy provides a configurable way to determine when to flush a +// block in the block based tables, +class FlushBlockPolicy { + public: + // Keep track of the key/value sequences and return the boolean value to + // determine if table builder should flush current data block. + virtual bool Update(const Slice& key, + const Slice& value) = 0; + + virtual ~FlushBlockPolicy() { } +}; + +class FlushBlockPolicyFactory { + public: + // Return the name of the flush block policy. + virtual const char* Name() const = 0; + + // Return a new block flush policy that flushes data blocks by data size. + // FlushBlockPolicy may need to access the metadata of the data block + // builder to determine when to flush the blocks. + // + // Callers must delete the result after any database that is using the + // result has been closed. + virtual FlushBlockPolicy* NewFlushBlockPolicy( + const BlockBuilder& data_block_builder) const = 0; + + virtual ~FlushBlockPolicyFactory() { } +}; + +class FlushBlockBySizePolicyFactory : public FlushBlockPolicyFactory { + public: + FlushBlockBySizePolicyFactory(const uint64_t block_size, + const uint64_t block_size_deviation) : + block_size_(block_size), + block_size_deviation_(block_size_deviation) { + } + + virtual const char* Name() const override { + return "FlushBlockBySizePolicyFactory"; + } + + virtual FlushBlockPolicy* NewFlushBlockPolicy( + const BlockBuilder& data_block_builder) const override; + + private: + const uint64_t block_size_; + const uint64_t block_size_deviation_; +}; + +} // rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8eca471334..7bbbee41a2 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -11,26 +11,27 @@ #include #include +#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" +#include "rocksdb/slice_transform.h" #include "rocksdb/statistics.h" #include "rocksdb/table_stats.h" #include "rocksdb/universal_compaction.h" -#include "rocksdb/memtablerep.h" -#include "rocksdb/slice_transform.h" namespace rocksdb { class Cache; +class CompactionFilter; +class CompactionFilterFactory; class Comparator; class Env; class FilterPolicy; +class FlushBlockPolicyFactory; class Logger; class MergeOperator; class Snapshot; -class CompactionFilter; -class CompactionFilterFactory; class TableFactory; using std::shared_ptr; @@ -485,6 +486,13 @@ struct Options { // from the database, because otherwise the read can be very slow. Options* PrepareForBulkLoad(); + // Set up the default flush-block policy factory. By default, we'll use + // `FlushBlockBySizePolicyFactory` as the policy factory. + // Note: Please call this method after block_size and block_size_deviation + // is set. + // REQUIRES: flush_block_policy_factory is not set. + Options* SetUpDefaultFlushBlockPolicyFactory(); + // Disable automatic compactions. Manual compactions can still // be issued on this database. bool disable_auto_compactions; @@ -623,6 +631,13 @@ struct Options { // Number of locks used for inplace update // Default: 10000, if inplace_update_support = true, else 0. size_t inplace_update_num_locks; + + // Creates the instances of flush block policy. + // A flush-block policy provides a configurable way to determine when to + // flush a block in the block based tables, + // Default: nullptr. User can call FlushBlockBySizePolicyFactory() to set + // up default policy factory (`FlushBlockBySizePolicyFactory`). + std::shared_ptr flush_block_policy_factory; }; // diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 7b00d67087..c2af122bca 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -12,6 +12,7 @@ #include #include +#include "rocksdb/flush_block_policy.h" #include "rocksdb/cache.h" #include "rocksdb/comparator.h" #include "rocksdb/table.h" @@ -96,19 +97,11 @@ struct BlockBasedTableBuilder::Rep { char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize]; size_t compressed_cache_key_prefix_size; - // We do not emit the index entry for a block until we have seen the - // first key for the next data block. This allows us to use shorter - // keys in the index block. For example, consider a block boundary - // between the keys "the quick brown fox" and "the who". We can use - // "the r" as the key for the index block entry since it is >= all - // entries in the first block and < all entries in subsequent - // blocks. - // - // Invariant: r->pending_index_entry is true only if data_block is empty. - bool pending_index_entry; + BlockHandle pending_handle; // Handle to add to index block std::string compressed_output; + std::unique_ptr flush_block_policy; Rep(const Options& opt, WritableFile* f, CompressionType compression_type) : options(opt), @@ -118,8 +111,10 @@ struct BlockBasedTableBuilder::Rep { index_block(1, index_block_options.comparator), compression_type(compression_type), filter_block(opt.filter_policy == nullptr ? nullptr - : new FilterBlockBuilder(opt)), - pending_index_entry(false) { + : new FilterBlockBuilder(opt)) { + assert(options.flush_block_policy_factory); + auto factory = options.flush_block_policy_factory; + flush_block_policy.reset(factory->NewFlushBlockPolicy(data_block)); } }; @@ -151,29 +146,25 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); } - const size_t curr_size = r->data_block.CurrentSizeEstimate(); - const size_t estimated_size_after = r->data_block.EstimateSizeAfterKV(key, - value); - // Do flush if one of the below two conditions is true: - // 1) if the current estimated size already exceeds the block size, - // 2) block_size_deviation is set and the estimated size after appending - // the kv will exceed the block size and the current size is under the - // the deviation. - if (curr_size >= r->options.block_size || - (estimated_size_after > r->options.block_size && - r->options.block_size_deviation > 0 && - (curr_size * 100) > - r->options.block_size * (100 - r->options.block_size_deviation))) { + auto should_flush = r->flush_block_policy->Update(key, value); + if (should_flush) { + assert(!r->data_block.empty()); Flush(); - } - if (r->pending_index_entry) { - assert(r->data_block.empty()); - r->options.comparator->FindShortestSeparator(&r->last_key, key); - std::string handle_encoding; - r->pending_handle.EncodeTo(&handle_encoding); - r->index_block.Add(r->last_key, Slice(handle_encoding)); - r->pending_index_entry = false; + // Add item to index block. + // We do not emit the index entry for a block until we have seen the + // first key for the next data block. This allows us to use shorter + // keys in the index block. For example, consider a block boundary + // between the keys "the quick brown fox" and "the who". We can use + // "the r" as the key for the index block entry since it is >= all + // entries in the first block and < all entries in subsequent + // blocks. + if (ok()) { + r->options.comparator->FindShortestSeparator(&r->last_key, key); + std::string handle_encoding; + r->pending_handle.EncodeTo(&handle_encoding); + r->index_block.Add(r->last_key, Slice(handle_encoding)); + } } if (r->filter_block != nullptr) { @@ -203,10 +194,8 @@ void BlockBasedTableBuilder::Flush() { assert(!r->closed); if (!ok()) return; if (r->data_block.empty()) return; - assert(!r->pending_index_entry); WriteBlock(&r->data_block, &r->pending_handle); if (ok()) { - r->pending_index_entry = true; r->status = r->file->Flush(); } if (r->filter_block != nullptr) { @@ -358,11 +347,14 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, Status BlockBasedTableBuilder::Finish() { Rep* r = rep_; + bool empty_data_block = r->data_block.empty(); Flush(); assert(!r->closed); r->closed = true; - BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle; + BlockHandle filter_block_handle, + metaindex_block_handle, + index_block_handle; // Write filter block if (ok() && r->filter_block != nullptr) { @@ -373,12 +365,12 @@ Status BlockBasedTableBuilder::Finish() { // To make sure stats block is able to keep the accurate size of index // block, we will finish writing all index entries here and flush them // to storage after metaindex block is written. - if (ok() && (r->pending_index_entry)) { - r->options.comparator->FindShortSuccessor(&r->last_key); - std::string handle_encoding; - r->pending_handle.EncodeTo(&handle_encoding); - r->index_block.Add(r->last_key, Slice(handle_encoding)); - r->pending_index_entry = false; + if (ok() && !empty_data_block) { + r->options.comparator->FindShortSuccessor(&r->last_key); + + std::string handle_encoding; + r->pending_handle.EncodeTo(&handle_encoding); + r->index_block.Add(r->last_key, handle_encoding); } // Write meta blocks and metaindex block with the following order. diff --git a/table/flush_block_policy.cc b/table/flush_block_policy.cc new file mode 100644 index 0000000000..7b52545325 --- /dev/null +++ b/table/flush_block_policy.cc @@ -0,0 +1,65 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "rocksdb/flush_block_policy.h" +#include "rocksdb/slice.h" +#include "table/block_builder.h" + +#include + +namespace rocksdb { + +// Flush block by size +class FlushBlockBySizePolicy : public FlushBlockPolicy { + public: + // @params block_size: Approximate size of user data packed per + // block. + // @params block_size_deviation: This is used to close a block before it + // reaches the configured + FlushBlockBySizePolicy(const uint64_t block_size, + const uint64_t block_size_deviation, + const BlockBuilder& data_block_builder) : + block_size_(block_size), + block_size_deviation_(block_size_deviation), + data_block_builder_(data_block_builder) { + } + + virtual bool Update(const Slice& key, + const Slice& value) override { + auto curr_size = data_block_builder_.CurrentSizeEstimate(); + + // Do flush if one of the below two conditions is true: + // 1) if the current estimated size already exceeds the block size, + // 2) block_size_deviation is set and the estimated size after appending + // the kv will exceed the block size and the current size is under the + // the deviation. + return curr_size >= block_size_ || BlockAlmostFull(key, value); + } + + private: + bool BlockAlmostFull(const Slice& key, const Slice& value) const { + const auto curr_size = data_block_builder_.CurrentSizeEstimate(); + const auto estimated_size_after = + data_block_builder_.EstimateSizeAfterKV(key, value); + + return + estimated_size_after > block_size_ && + block_size_deviation_ > 0 && + curr_size * 100 > block_size_ * (100 - block_size_deviation_); + } + + const uint64_t block_size_; + const uint64_t block_size_deviation_; + const BlockBuilder& data_block_builder_; +}; + +FlushBlockPolicy* FlushBlockBySizePolicyFactory::NewFlushBlockPolicy( + const BlockBuilder& data_block_builder) const { + return new FlushBlockBySizePolicy(block_size_, + block_size_deviation_, + data_block_builder); +} + +} // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index ad4129901d..0ba49b1439 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -30,6 +30,7 @@ namespace rocksdb { +namespace { // Return reverse of "key". // Used to test non-lexicographic comparators. static std::string Reverse(const Slice& key) { @@ -42,7 +43,12 @@ static std::string Reverse(const Slice& key) { return rev; } -namespace { +static Options GetDefaultOptions() { + Options options; + options.SetUpDefaultFlushBlockPolicyFactory(); + return options; +} + class ReverseKeyComparator : public Comparator { public: virtual const char* Name() const { @@ -423,7 +429,7 @@ class DBConstructor: public Constructor { void NewDB() { std::string name = test::TmpDir() + "/table_testdb"; - Options options; + Options options = GetDefaultOptions(); options.comparator = comparator_; Status status = DestroyDB(name, options); ASSERT_TRUE(status.ok()) << status.ToString(); @@ -442,14 +448,16 @@ class DBConstructor: public Constructor { static bool SnappyCompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Snappy_Compress(Options().compression_opts, in.data(), in.size(), + return port::Snappy_Compress(GetDefaultOptions().compression_opts, + in.data(), in.size(), &out); } static bool ZlibCompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Zlib_Compress(Options().compression_opts, in.data(), in.size(), + return port::Zlib_Compress(GetDefaultOptions().compression_opts, + in.data(), in.size(), &out); } @@ -457,7 +465,8 @@ static bool ZlibCompressionSupported() { static bool BZip2CompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::BZip2_Compress(Options().compression_opts, in.data(), in.size(), + return port::BZip2_Compress(GetDefaultOptions().compression_opts, + in.data(), in.size(), &out); } #endif @@ -527,7 +536,7 @@ class Harness { void Init(const TestArgs& args) { delete constructor_; constructor_ = nullptr; - options_ = Options(); + options_ = GetDefaultOptions(); options_.block_restart_interval = args.restart_interval; options_.compression = args.compression; @@ -727,7 +736,7 @@ class Harness { DB* db() const { return constructor_->db(); } private: - Options options_; + Options options_ = GetDefaultOptions(); Constructor* constructor_; }; @@ -827,7 +836,7 @@ TEST(MemTableTest, Simple) { MemTable* memtable = new MemTable(cmp, table_factory); memtable->Ref(); WriteBatch batch; - Options options; + Options options = GetDefaultOptions(); WriteBatchInternal::SetSequence(&batch, 100); batch.Put(std::string("k1"), std::string("v1")); batch.Put(std::string("k2"), std::string("v2")); @@ -878,7 +887,7 @@ TEST(TableTest, BasicTableStats) { std::vector keys; KVMap kvmap; - Options options; + Options options = GetDefaultOptions(); options.compression = kNoCompression; options.block_restart_interval = 1; @@ -912,7 +921,7 @@ TEST(TableTest, FilterPolicyNameStats) { c.Add("a1", "val1"); std::vector keys; KVMap kvmap; - Options options; + Options options = GetDefaultOptions(); std::unique_ptr filter_policy( NewBloomFilterPolicy(10) ); @@ -955,7 +964,7 @@ TEST(TableTest, IndexSizeStat) { std::vector ks; KVMap kvmap; - Options options; + Options options = GetDefaultOptions(); options.compression = kNoCompression; options.block_restart_interval = 1; @@ -974,6 +983,9 @@ TEST(TableTest, NumBlockStat) { options.compression = kNoCompression; options.block_restart_interval = 1; options.block_size = 1000; + // Block Size changed, need to set up a new flush policy to reflect the + // change. + options.SetUpDefaultFlushBlockPolicyFactory(); for (int i = 0; i < 10; ++i) { // the key/val are slightly smaller than block size, so that each block @@ -1001,7 +1013,7 @@ TEST(TableTest, ApproximateOffsetOfPlain) { c.Add("k07", std::string(100000, 'x')); std::vector keys; KVMap kvmap; - Options options; + Options options = GetDefaultOptions(); options.block_size = 1024; options.compression = kNoCompression; c.Finish(options, &keys, &kvmap); @@ -1030,7 +1042,7 @@ static void Do_Compression_Test(CompressionType comp) { c.Add("k04", test::CompressibleString(&rnd, 0.25, 10000, &tmp)); std::vector keys; KVMap kvmap; - Options options; + Options options = GetDefaultOptions(); options.block_size = 1024; options.compression = comp; c.Finish(options, &keys, &kvmap); @@ -1072,7 +1084,7 @@ TEST(TableTest, BlockCacheLeak) { // in the cache. This test checks whether the Table actually makes use of the // unique ID from the file. - Options opt; + Options opt = GetDefaultOptions(); opt.block_size = 1024; opt.compression = kNoCompression; opt.block_cache = NewLRUCache(16*1024*1024); // big enough so we don't ever diff --git a/util/options.cc b/util/options.cc index 3952f45fae..eae112e840 100644 --- a/util/options.cc +++ b/util/options.cc @@ -16,6 +16,7 @@ #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" +#include "rocksdb/flush_block_policy.h" #include "rocksdb/merge_operator.h" #include "table/block_based_table_factory.h" @@ -286,12 +287,14 @@ Options::Dump(Logger* log) const collector_names.append(collector->Name()); collector_names.append("; "); } - Log(log," Options.table_stats_collectors: %s", + Log(log, " Options.table_stats_collectors: %s", collector_names.c_str()); - Log(log," Options.inplace_update_support: %d", + Log(log, " Options.inplace_update_support: %d", inplace_update_support); - Log(log," Options.inplace_update_num_locks: %zd", + Log(log, " Options.inplace_update_num_locks: %zd", inplace_update_num_locks); + Log(log, " Options.flush_block_policy_factory: %s", + flush_block_policy_factory ? flush_block_policy_factory->Name() : ""); } // Options::Dump // @@ -331,4 +334,12 @@ Options::PrepareForBulkLoad() return this; } +Options* Options::SetUpDefaultFlushBlockPolicyFactory() { + assert(!flush_block_policy_factory); + flush_block_policy_factory = + std::make_shared( + block_size, block_size_deviation); + return this; +} + } // namespace rocksdb