diff --git a/db/column_family.cc b/db/column_family.cc index b029bff211..b38fd42ef1 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -128,10 +128,12 @@ void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, refs.store(1, std::memory_order_relaxed); } -ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, - Version* dummy_versions, +ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, + const std::string& name, + Version* dummy_versions, Cache* table_cache, const ColumnFamilyOptions& options, - const Options* db_options) + const Options* db_options, + const EnvOptions& storage_options) : id_(id), name_(name), dummy_versions_(dummy_versions), @@ -148,9 +150,12 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, prev_(nullptr), log_number_(0), need_slowdown_for_num_level0_files_(false) { + // if db_options is nullptr, then this is a dummy column family. if (db_options != nullptr) { internal_stats_.reset(new InternalStats(options.num_levels, db_options->env, db_options->statistics.get())); + table_cache_.reset(new TableCache(dbname, db_options, &options_, + storage_options, table_cache)); if (options_.compaction_style == kCompactionStyleUniversal) { compaction_picker_.reset(new UniversalCompactionPicker( &options_, &internal_comparator_, db_options->info_log.get())); @@ -230,11 +235,18 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion( return nullptr; } -ColumnFamilySet::ColumnFamilySet(const Options* db_options) +ColumnFamilySet::ColumnFamilySet(const std::string& dbname, + const Options* db_options, + const EnvOptions& storage_options, + Cache* table_cache) : max_column_family_(0), - dummy_cfd_(new ColumnFamilyData(0, "", nullptr, ColumnFamilyOptions(), - nullptr)), - db_options_(db_options) { + dummy_cfd_(new ColumnFamilyData(dbname, 0, "", nullptr, nullptr, + ColumnFamilyOptions(), nullptr, + storage_options_)), + db_name_(dbname), + db_options_(db_options), + storage_options_(storage_options), + table_cache_(table_cache) { // initialize linked list dummy_cfd_->prev_.store(dummy_cfd_); dummy_cfd_->next_.store(dummy_cfd_); @@ -290,7 +302,8 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( assert(column_families_.find(name) == column_families_.end()); column_families_.insert({name, id}); ColumnFamilyData* new_cfd = - new ColumnFamilyData(id, name, dummy_versions, options, db_options_); + new ColumnFamilyData(db_name_, id, name, dummy_versions, table_cache_, + options, db_options_, storage_options_); column_family_data_.insert({id, new_cfd}); max_column_family_ = std::max(max_column_family_, id); // add to linked list diff --git a/db/column_family.h b/db/column_family.h index f912c33862..46b3a071ac 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -17,6 +17,7 @@ #include "rocksdb/env.h" #include "db/memtablelist.h" #include "db/write_batch_internal.h" +#include "db/table_cache.h" namespace rocksdb { @@ -64,11 +65,6 @@ extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, // column family metadata. not thread-safe. should be protected by db_mutex class ColumnFamilyData { public: - ColumnFamilyData(uint32_t id, const std::string& name, - Version* dummy_versions, const ColumnFamilyOptions& options, - const Options* db_options); - ~ColumnFamilyData(); - uint32_t GetID() const { return id_; } const std::string& GetName() { return name_; } @@ -88,6 +84,8 @@ class ColumnFamilyData { void SetCurrent(Version* current); void CreateNewMemtable(); + TableCache* table_cache() const { return table_cache_.get(); } + // See documentation in compaction_picker.h Compaction* PickCompaction(); Compaction* CompactRange(int input_level, int output_level, @@ -122,6 +120,12 @@ class ColumnFamilyData { private: friend class ColumnFamilySet; + ColumnFamilyData(const std::string& dbname, uint32_t id, + const std::string& name, Version* dummy_versions, + Cache* table_cache, const ColumnFamilyOptions& options, + const Options* db_options, + const EnvOptions& storage_options); + ~ColumnFamilyData(); ColumnFamilyData* next() { return next_.load(); } @@ -135,6 +139,8 @@ class ColumnFamilyData { ColumnFamilyOptions options_; + std::unique_ptr table_cache_; + std::unique_ptr internal_stats_; MemTable* mem_; @@ -186,7 +192,8 @@ class ColumnFamilySet { ColumnFamilyData* current_; }; - explicit ColumnFamilySet(const Options* db_options_); + ColumnFamilySet(const std::string& dbname, const Options* db_options_, + const EnvOptions& storage_options, Cache* table_cache); ~ColumnFamilySet(); ColumnFamilyData* GetDefault() const; @@ -219,8 +226,12 @@ class ColumnFamilySet { std::vector droppped_column_families_; uint32_t max_column_family_; ColumnFamilyData* dummy_cfd_; + + const std::string db_name_; // TODO(icanadi) change to DBOptions const Options* const db_options_; + const EnvOptions storage_options_; + Cache* table_cache_; }; class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { diff --git a/db/db_impl.cc b/db/db_impl.cc index 02e31cb581..730096361d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -39,6 +39,7 @@ #include "db/version_set.h" #include "db/write_batch_internal.h" #include "port/port.h" +#include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" #include "rocksdb/column_family.h" @@ -209,6 +210,10 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) options_(SanitizeOptions(dbname, &internal_comparator_, &internal_filter_policy_, options)), internal_filter_policy_(options.filter_policy), + // Reserve ten files or so for other uses and give the rest to TableCache. + table_cache_(NewLRUCache(options_.max_open_files - 10, + options_.table_cache_numshardbits, + options_.table_cache_remove_scan_count_limit)), db_lock_(nullptr), mutex_(options.use_adaptive_mutex), shutting_down_(nullptr), @@ -234,11 +239,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) env_->GetAbsolutePath(dbname, &db_absolute_path_); - // Reserve ten files or so for other uses and give the rest to TableCache. - const int table_cache_size = options_.max_open_files - 10; - table_cache_.reset(new TableCache(dbname_, &options_, - storage_options_, table_cache_size)); - versions_.reset( new VersionSet(dbname_, &options_, storage_options_, table_cache_.get())); column_family_memtables_.reset( @@ -551,7 +551,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { if (!keep) { if (type == kTableFile) { // evict from cache - table_cache_->Evict(number); + TableCache::Evict(table_cache_.get(), number); } std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) + "/" + state.all_files[i]; @@ -1014,7 +1014,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, { mutex_.Unlock(); s = BuildTable(dbname_, env_, options_, storage_options_, - table_cache_.get(), iter, &meta, cfd->user_comparator(), + cfd->table_cache(), iter, &meta, cfd->user_comparator(), newest_snapshot, earliest_seqno_in_memtable, GetCompressionFlush(options_)); LogFlush(options_.info_log); @@ -1079,7 +1079,7 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, (unsigned long)meta.number); s = BuildTable(dbname_, env_, options_, storage_options_, - table_cache_.get(), iter, &meta, cfd->user_comparator(), + cfd->table_cache(), iter, &meta, cfd->user_comparator(), newest_snapshot, earliest_seqno_in_memtable, GetCompressionFlush(options_)); LogFlush(options_.info_log); @@ -2031,7 +2031,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact, Status status) { // If this file was inserted into the table cache then remove // them here because this compaction was not committed. if (!status.ok()) { - table_cache_->Evict(out.number); + TableCache::Evict(table_cache_.get(), out.number); } } delete compact; @@ -2148,10 +2148,9 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, if (s.ok() && current_entries > 0) { // Verify that the table is usable - Iterator* iter = table_cache_->NewIterator(ReadOptions(), - storage_options_, - output_number, - current_bytes); + ColumnFamilyData* cfd = compact->compaction->column_family_data(); + Iterator* iter = cfd->table_cache()->NewIterator( + ReadOptions(), storage_options_, output_number, current_bytes); s = iter->status(); delete iter; if (s.ok()) { diff --git a/db/db_impl.h b/db/db_impl.h index c52264fa60..a0d5cdb719 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -251,7 +251,7 @@ class DBImpl : public DB { const std::string dbname_; unique_ptr versions_; const InternalKeyComparator internal_comparator_; - const Options options_; // options_.comparator == &internal_comparator_ + const Options options_; Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version); @@ -370,11 +370,10 @@ class DBImpl : public DB { const ReadOptions& options, ColumnFamilyData* cfd, uint64_t* superversion_number); - // Constant after construction const InternalFilterPolicy internal_filter_policy_; // table_cache_ provides its own synchronization - unique_ptr table_cache_; + std::shared_ptr table_cache_; // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; diff --git a/db/repair.cc b/db/repair.cc index 29524233f0..3b46b58549 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -55,14 +55,21 @@ class Repairer { icmp_(options.comparator), ipolicy_(options.filter_policy), options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)), + cf_options_(ColumnFamilyOptions(options_)), + raw_table_cache_( + // TableCache can be small since we expect each table to be opened + // once. + NewLRUCache(10, options_.table_cache_numshardbits, + options_.table_cache_remove_scan_count_limit)), next_file_number_(1) { - // TableCache can be small since we expect each table to be opened once. - table_cache_ = new TableCache(dbname_, &options_, storage_options_, 10); + table_cache_ = new TableCache(dbname_, &options_, &cf_options_, + storage_options_, raw_table_cache_.get()); edit_ = new VersionEdit(); } ~Repairer() { delete table_cache_; + raw_table_cache_.reset(); delete edit_; } @@ -102,6 +109,8 @@ class Repairer { InternalKeyComparator const icmp_; InternalFilterPolicy const ipolicy_; Options const options_; + ColumnFamilyOptions const cf_options_; + std::shared_ptr raw_table_cache_; TableCache* table_cache_; VersionEdit* edit_; diff --git a/db/table_cache.cc b/db/table_cache.cc index 593352ddea..33d720e942 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -34,18 +34,16 @@ static Slice GetSliceForFileNumber(uint64_t* file_number) { sizeof(*file_number)); } -TableCache::TableCache(const std::string& dbname, - const Options* options, - const EnvOptions& storage_options, - int entries) - : env_(options->env), +// TODO(icanadi) Options -> DBOptions +TableCache::TableCache(const std::string& dbname, const Options* db_options, + const ColumnFamilyOptions* cf_options, + const EnvOptions& storage_options, Cache* const cache) + : env_(db_options->env), dbname_(dbname), - options_(options), + db_options_(db_options), + cf_options_(cf_options), storage_options_(storage_options), - cache_( - NewLRUCache(entries, options->table_cache_numshardbits, - options->table_cache_remove_scan_count_limit)) { -} + cache_(cache) {} TableCache::~TableCache() { } @@ -68,20 +66,21 @@ Status TableCache::FindTable(const EnvOptions& toptions, unique_ptr file; unique_ptr table_reader; s = env_->NewRandomAccessFile(fname, &file, toptions); - RecordTick(options_->statistics.get(), NO_FILE_OPENS); + RecordTick(db_options_->statistics.get(), NO_FILE_OPENS); if (s.ok()) { - if (options_->advise_random_on_open) { + if (db_options_->advise_random_on_open) { file->Hint(RandomAccessFile::RANDOM); } - StopWatch sw(env_, options_->statistics.get(), TABLE_OPEN_IO_MICROS); - s = options_->table_factory->GetTableReader(*options_, toptions, - std::move(file), file_size, - &table_reader); + StopWatch sw(env_, db_options_->statistics.get(), TABLE_OPEN_IO_MICROS); + // TODO(icanadi) terrible hack. fix this + Options options(DBOptions(*db_options_), *cf_options_); + s = cf_options_->table_factory->GetTableReader( + options, toptions, std::move(file), file_size, &table_reader); } if (!s.ok()) { assert(table_reader == nullptr); - RecordTick(options_->statistics.get(), NO_FILE_ERRORS); + RecordTick(db_options_->statistics.get(), NO_FILE_ERRORS); // We do not cache error results so that if the error is transient, // or somebody repairs the file, we recover automatically. } else { @@ -112,7 +111,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, TableReader* table_reader = reinterpret_cast(cache_->Value(handle)); Iterator* result = table_reader->NewIterator(options); - result->RegisterCleanup(&UnrefEntry, cache_.get(), handle); + result->RegisterCleanup(&UnrefEntry, cache_, handle); if (table_reader_ptr != nullptr) { *table_reader_ptr = table_reader; } @@ -167,8 +166,8 @@ bool TableCache::PrefixMayMatch(const ReadOptions& options, return may_match; } -void TableCache::Evict(uint64_t file_number) { - cache_->Erase(GetSliceForFileNumber(&file_number)); +void TableCache::Evict(Cache* cache, uint64_t file_number) { + cache->Erase(GetSliceForFileNumber(&file_number)); } } // namespace rocksdb diff --git a/db/table_cache.h b/db/table_cache.h index 4b225af9b6..0a462acd9e 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -24,8 +24,9 @@ class Env; class TableCache { public: - TableCache(const std::string& dbname, const Options* options, - const EnvOptions& storage_options, int entries); + TableCache(const std::string& dbname, const Options* db_options, + const ColumnFamilyOptions* cf_options, + const EnvOptions& storage_options, Cache* cache); ~TableCache(); // Return an iterator for the specified file number (the corresponding @@ -61,14 +62,15 @@ class TableCache { bool* table_io); // Evict any entry for the specified file number - void Evict(uint64_t file_number); + static void Evict(Cache* cache, uint64_t file_number); private: Env* const env_; const std::string dbname_; - const Options* options_; + const Options* db_options_; + const ColumnFamilyOptions* cf_options_; const EnvOptions& storage_options_; - std::shared_ptr cache_; + Cache* const cache_; Status FindTable(const EnvOptions& toptions, uint64_t file_number, uint64_t file_size, Cache::Handle**, bool* table_io=nullptr, diff --git a/db/version_set.cc b/db/version_set.cc index 95bdf656b1..fd261a2d37 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -229,11 +229,10 @@ bool Version::PrefixMayMatch(const ReadOptions& options, // key() will always be the biggest value for this SST? may_match = true; } else { - may_match = vset_->table_cache_->PrefixMayMatch( - options, - DecodeFixed64(level_iter->value().data()), - DecodeFixed64(level_iter->value().data() + 8), - internal_prefix, nullptr); + may_match = cfd_->table_cache()->PrefixMayMatch( + options, DecodeFixed64(level_iter->value().data()), + DecodeFixed64(level_iter->value().data() + 8), internal_prefix, + nullptr); } return may_match; } @@ -252,8 +251,8 @@ Iterator* Version::NewConcatenatingIterator(const ReadOptions& options, return NewEmptyIterator(); } } - return NewTwoLevelIterator(level_iter, &GetFileIterator, - vset_->table_cache_, options, soptions); + return NewTwoLevelIterator(level_iter, &GetFileIterator, cfd_->table_cache(), + options, soptions); } void Version::AddIterators(const ReadOptions& options, @@ -261,9 +260,8 @@ void Version::AddIterators(const ReadOptions& options, std::vector* iters) { // Merge all level zero files together since they may overlap for (const FileMetaData* file : files_[0]) { - iters->push_back( - vset_->table_cache_->NewIterator( - options, soptions, file->number, file->file_size)); + iters->push_back(cfd_->table_cache()->NewIterator( + options, soptions, file->number, file->file_size)); } // For levels > 0, we can use a concatenating iterator that sequentially @@ -526,8 +524,8 @@ void Version::Get(const ReadOptions& options, prev_file = f; #endif bool tableIO = false; - *status = vset_->table_cache_->Get(options, f->number, f->file_size, - ikey, &saver, SaveValue, &tableIO, + *status = cfd_->table_cache()->Get(options, f->number, f->file_size, ikey, + &saver, SaveValue, &tableIO, MarkKeyMayExist); // TODO: examine the behavior for corrupted key if (!status->ok()) { @@ -1372,13 +1370,12 @@ class VersionSet::Builder { }; VersionSet::VersionSet(const std::string& dbname, const Options* options, - const EnvOptions& storage_options, - TableCache* table_cache) - : column_family_set_(new ColumnFamilySet(options)), + const EnvOptions& storage_options, Cache* table_cache) + : column_family_set_(new ColumnFamilySet(dbname, options, storage_options, + table_cache)), env_(options->env), dbname_(dbname), options_(options), - table_cache_(table_cache), next_file_number_(2), manifest_file_number_(0), // Filled by Recover() last_sequence_(0), @@ -1386,8 +1383,7 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options, current_version_number_(0), manifest_file_size_(0), storage_options_(storage_options), - storage_options_compactions_(storage_options_) { -} + storage_options_compactions_(storage_options_) {} VersionSet::~VersionSet() { for (auto cfd : *column_family_set_) { @@ -1936,8 +1932,11 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, "Number of levels needs to be bigger than 1"); } - TableCache tc(dbname, options, storage_options, 10); - VersionSet versions(dbname, options, storage_options, &tc); + ColumnFamilyOptions cf_options(*options); + std::shared_ptr tc(NewLRUCache( + options->max_open_files - 10, options->table_cache_numshardbits, + options->table_cache_remove_scan_count_limit)); + VersionSet versions(dbname, options, storage_options, tc.get()); Status status; std::vector dummy; @@ -2229,7 +2228,7 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { // "ikey" falls in the range for this table. Add the // approximate offset of "ikey" within the table. TableReader* table_reader_ptr; - Iterator* iter = table_cache_->NewIterator( + Iterator* iter = v->cfd_->table_cache()->NewIterator( ReadOptions(), storage_options_, files[i]->number, files[i]->file_size, &table_reader_ptr); if (table_reader_ptr != nullptr) { @@ -2285,7 +2284,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { if (!c->inputs(which)->empty()) { if (c->level() + which == 0) { for (const auto& file : *c->inputs(which)) { - list[num++] = table_cache_->NewIterator( + list[num++] = c->column_family_data()->table_cache()->NewIterator( options, storage_options_compactions_, file->number, file->file_size, nullptr, true /* for compaction */); } @@ -2295,8 +2294,8 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { new Version::LevelFileNumIterator( c->column_family_data()->internal_comparator(), c->inputs(which)), - &GetFileIterator, table_cache_, options, storage_options_, - true /* for compaction */); + &GetFileIterator, c->column_family_data()->table_cache(), options, + storage_options_, true /* for compaction */); } } } diff --git a/db/version_set.h b/db/version_set.h index 206370dc20..f2b5d113a8 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -41,7 +41,6 @@ class Compaction; class CompactionPicker; class Iterator; class MemTable; -class TableCache; class Version; class VersionSet; class MergeContext; @@ -281,7 +280,7 @@ class Version { class VersionSet { public: VersionSet(const std::string& dbname, const Options* options, - const EnvOptions& storage_options, TableCache* table_cache); + const EnvOptions& storage_options, Cache* table_cache); ~VersionSet(); // Apply *edit to the current version to form a new descriptor that @@ -424,7 +423,6 @@ class VersionSet { Env* const env_; const std::string dbname_; const Options* const options_; - TableCache* const table_cache_; uint64_t next_file_number_; uint64_t manifest_file_number_; std::atomic last_sequence_; diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 5cc57f76b2..ef9e928dc4 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -12,6 +12,7 @@ #include "db/write_batch_internal.h" #include "rocksdb/write_batch.h" #include "rocksdb/column_family.h" +#include "rocksdb/cache.h" #include "util/coding.h" #include @@ -535,9 +536,10 @@ void ManifestDumpCommand::DoCommand() { EnvOptions sopt; std::string file(manifestfile); std::string dbname("dummy"); - TableCache* tc = new TableCache(dbname, &options, sopt, 10); - - VersionSet* versions = new VersionSet(dbname, &options, sopt, tc); + std::shared_ptr tc(NewLRUCache( + options.max_open_files - 10, options.table_cache_numshardbits, + options.table_cache_remove_scan_count_limit)); + VersionSet* versions = new VersionSet(dbname, &options, sopt, tc.get()); Status s = versions->DumpManifest(options, file, verbose_, is_key_hex_); if (!s.ok()) { printf("Error in processing file %s %s\n", manifestfile.c_str(), @@ -1011,9 +1013,11 @@ Options ReduceDBLevelsCommand::PrepareOptionsForOpenDB() { Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, int* levels) { EnvOptions soptions; - TableCache tc(db_path_, &opt, soptions, 10); + std::shared_ptr tc( + NewLRUCache(opt.max_open_files - 10, opt.table_cache_numshardbits, + opt.table_cache_remove_scan_count_limit)); const InternalKeyComparator cmp(opt.comparator); - VersionSet versions(db_path_, &opt, soptions, &tc); + VersionSet versions(db_path_, &opt, soptions, tc.get()); std::vector dummy; ColumnFamilyDescriptor dummy_descriptor(default_column_family_name, ColumnFamilyOptions(opt));