From b1194f490383e1d6e415a8fc767987de2adb474a Mon Sep 17 00:00:00 2001 From: Mike Lin Date: Mon, 23 Dec 2013 08:54:50 -0800 Subject: [PATCH 01/12] Minor compaction logging improvements 1) make summary less likely to be truncated 2) format human-readable file sizes in summary 3) log the motivation for each universal compaction --- db/compaction.cc | 33 ++++++++++++++++++++++++++++----- db/compaction_picker.cc | 23 ++++++++++++++--------- db/db_impl.cc | 2 +- 3 files changed, 43 insertions(+), 15 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index 703e7aeaeb..8a843b650a 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -174,14 +174,37 @@ void Compaction::ResetNextCompactionIndex() { input_version_->ResetNextCompactionIndex(level_); } +/* +for sizes >=10TB, print "XXTB" +for sizes >=10GB, print "XXGB" +etc. +*/ +static void FileSizeSummary(unsigned long long sz, char* output, int len) { + const unsigned long long ull10 = 10; + if (sz >= ull10<<40) { + snprintf(output, len, "%lluTB", sz>>40); + } else if (sz >= ull10<<30) { + snprintf(output, len, "%lluGB", sz>>30); + } else if (sz >= ull10<<20) { + snprintf(output, len, "%lluMB", sz>>20); + } else if (sz >= ull10<<10) { + snprintf(output, len, "%lluKB", sz>>10); + } else { + snprintf(output, len, "%lluB", sz); + } +} + static void InputSummary(std::vector& files, char* output, int len) { int write = 0; for (unsigned int i = 0; i < files.size(); i++) { int sz = len - write; - int ret = snprintf(output + write, sz, "%lu(%lu) ", - (unsigned long)files.at(i)->number, - (unsigned long)files.at(i)->file_size); + int ret; + char sztxt[16]; + FileSizeSummary((unsigned long long)files.at(i)->file_size, sztxt, 16); + ret = snprintf(output + write, sz, "%lu(%s) ", + (unsigned long)files.at(i)->number, + sztxt); if (ret < 0 || ret >= sz) break; write += ret; @@ -198,9 +221,9 @@ void Compaction::Summary(char* output, int len) { return; } - char level_low_summary[100]; + char level_low_summary[1024]; InputSummary(inputs_[0], level_low_summary, sizeof(level_low_summary)); - char level_up_summary[100]; + char level_up_summary[1024]; if (inputs_[1].size()) { InputSummary(inputs_[1], level_up_summary, sizeof(level_up_summary)); } else { diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index fa2fbc6635..da49ce02d7 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -551,22 +551,27 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { version->LevelFileSummary(&tmp, 0)); // Check for size amplification first. - Compaction* c = PickCompactionUniversalSizeAmp(version, score); - if (c == nullptr) { + Compaction* c; + if ((c = PickCompactionUniversalSizeAmp(version, score)) != nullptr) { + Log(options_->info_log, "Universal: compacting for size amp\n"); + } else { // Size amplification is within limits. Try reducing read // amplification while maintaining file size ratios. unsigned int ratio = options_->compaction_options_universal.size_ratio; - c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX); - // Size amplification and file size ratios are within configured limits. - // If max read amplification is exceeding configured limits, then force - // compaction without looking at filesize ratios and try to reduce - // the number of files to fewer than level0_file_num_compaction_trigger. - if (c == nullptr) { + if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX)) != nullptr) { + Log(options_->info_log, "Universal: compacting for size ratio\n"); + } else { + // Size amplification and file size ratios are within configured limits. + // If max read amplification is exceeding configured limits, then force + // compaction without looking at filesize ratios and try to reduce + // the number of files to fewer than level0_file_num_compaction_trigger. unsigned int num_files = version->files_[level].size() - options_->level0_file_num_compaction_trigger; - c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files); + if ((c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files)) != nullptr) { + Log(options_->info_log, "Universal: compacting for file num\n"); + } } } if (c == nullptr) { diff --git a/db/db_impl.cc b/db/db_impl.cc index e84817b9b2..0b2cc0e691 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2232,7 +2232,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, compact->compaction->output_level(), compact->compaction->score(), options_.max_background_compactions - bg_compaction_scheduled_); - char scratch[256]; + char scratch[2345]; compact->compaction->Summary(scratch, sizeof(scratch)); Log(options_.info_log, "Compaction start summary: %s\n", scratch); From 5e3aeb5f8e029aafbf46b61e7345e8e25cb5dda1 Mon Sep 17 00:00:00 2001 From: Mike Lin Date: Tue, 24 Dec 2013 01:09:09 -0800 Subject: [PATCH 02/12] An initial implementation of kCompactionStopStyleSimilarSize for universal compaction --- db/compaction_picker.cc | 22 +++++++++- db/db_test.cc | 92 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 2 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index da49ce02d7..a3c679409b 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -683,14 +683,32 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( if (f->being_compacted) { break; } - // pick files if the total candidate file size (increased by the + // Pick files if the total/last candidate file size (increased by the // specified ratio) is still larger than the next candidate file. + // candidate_size is the total size of files picked so far with the + // default kCompactionStopStyleTotalSize; with + // kCompactionStopStyleSimilarSize, it's simply the size of the last + // picked file. uint64_t sz = (candidate_size * (100L + ratio)) /100; if (sz < f->file_size) { break; + } + if (options_->compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) { + // Similar-size stopping rule: also check the last picked file isn't + // far larger than the next candidate file. + sz = (f->file_size * (100L + ratio)) / 100; + if (sz < candidate_size) { + // If the small file we've encountered begins a run of similar-size + // files, we'll pick them up on a future iteration of the outer + // loop. If it's some lonely straggler, it'll eventually get picked + // by the last-resort read amp strategy which disregards size ratios. + break; + } + candidate_size = f->file_size; + } else { // default kCompactionStopStyleTotalSize + candidate_size += f->file_size; } candidate_count++; - candidate_size += f->file_size; } // Found a series of consecutive files that need compaction. diff --git a/db/db_test.cc b/db/db_test.cc index 9c8a97f936..9953cc2f7a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2043,6 +2043,98 @@ TEST(DBTest, UniversalCompactionOptions) { } } +TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.write_buffer_size = 100<<10; //100KB + // trigger compaction if there are >= 4 files + options.level0_file_num_compaction_trigger = 4; + options.compaction_options_universal.size_ratio = 10; + options.compaction_options_universal.stop_style = kCompactionStopStyleSimilarSize; + Reopen(&options); + + Random rnd(301); + int key_idx = 0; + + // Stage 1: + // Generate a set of files at level 0, but don't trigger level-0 + // compaction. + for (int num = 0; + num < options.level0_file_num_compaction_trigger-1; + num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(NumTableFilesAtLevel(0), num + 1); + } + + // Generate one more file at level-0, which should trigger level-0 + // compaction. + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForCompact(); + // Suppose each file flushed from mem table has size 1. Now we compact + // (level0_file_num_compaction_trigger+1)=4 files and should have a big + // file of size 4. + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + for (int i = 1; i < options.num_levels ; i++) { + ASSERT_EQ(NumTableFilesAtLevel(i), 0); + } + + // Stage 2: + // Now we have one file at level 0, with size 4. We also have some data in + // mem table. Let's continue generating new files at level 0, but don't + // trigger level-0 compaction. + // First, clean up memtable before inserting new data. This will generate + // a level-0 file, with size around 0.4 (according to previously written + // data amount). + dbfull()->Flush(FlushOptions()); + for (int num = 0; + num < options.level0_file_num_compaction_trigger-3; + num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(NumTableFilesAtLevel(0), num + 3); + } + + // Generate one more file at level-0, which should trigger level-0 + // compaction. + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForCompact(); + // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1. + // After compaction, we should have 3 files, with size 4, 0.4, 2. + ASSERT_EQ(NumTableFilesAtLevel(0), 3); + for (int i = 1; i < options.num_levels ; i++) { + ASSERT_EQ(NumTableFilesAtLevel(i), 0); + } + + // Stage 3: + // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one + // more file at level-0, which should trigger level-0 compaction. + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForCompact(); + // Level-0 compaction is triggered, but no file will be picked up. + ASSERT_EQ(NumTableFilesAtLevel(0), 4); + for (int i = 1; i < options.num_levels ; i++) { + ASSERT_EQ(NumTableFilesAtLevel(i), 0); + } +} + #if defined(SNAPPY) && defined(ZLIB) && defined(BZIP2) TEST(DBTest, CompressedCache) { int num_iter = 80; From af7838de36fd54dd7bb02969a2e4b47215e9cb05 Mon Sep 17 00:00:00 2001 From: Mike Lin Date: Sat, 25 Jan 2014 14:12:24 -0800 Subject: [PATCH 03/12] address code review comments on 5e3aeb5f8e - reduce string copying in Compaction::Summary - simplify file number checking in UniversalCompactionStopStyleSimilarSize unit test --- db/compaction.cc | 32 ++++++++++++++++++++------------ db/db_test.cc | 11 +---------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index 8a843b650a..ce3cd86276 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -194,7 +194,7 @@ static void FileSizeSummary(unsigned long long sz, char* output, int len) { } } -static void InputSummary(std::vector& files, char* output, +static int InputSummary(std::vector& files, char* output, int len) { int write = 0; for (unsigned int i = 0; i < files.size(); i++) { @@ -209,29 +209,37 @@ static void InputSummary(std::vector& files, char* output, break; write += ret; } + return write; } void Compaction::Summary(char* output, int len) { int write = snprintf(output, len, - "Base version %lu Base level %d, seek compaction:%d, inputs:", + "Base version %lu Base level %d, seek compaction:%d, inputs: [", (unsigned long)input_version_->GetVersionNumber(), level_, seek_compaction_); - if (write < 0 || write > len) { + if (write < 0 || write >= len) { return; } - char level_low_summary[1024]; - InputSummary(inputs_[0], level_low_summary, sizeof(level_low_summary)); - char level_up_summary[1024]; - if (inputs_[1].size()) { - InputSummary(inputs_[1], level_up_summary, sizeof(level_up_summary)); - } else { - level_up_summary[0] = '\0'; + write += InputSummary(inputs_[0], output+write, len-write); + if (write < 0 || write >= len) { + return; } - snprintf(output + write, len - write, "[%s],[%s]", - level_low_summary, level_up_summary); + write += snprintf(output+write, len-write, "],["); + if (write < 0 || write >= len) { + return; + } + + if (inputs_[1].size()) { + write += InputSummary(inputs_[1], output+write, len-write); + } + if (write < 0 || write >= len) { + return; + } + + snprintf(output+write, len-write, "]"); } } // namespace rocksdb diff --git a/db/db_test.cc b/db/db_test.cc index 9953cc2f7a..3a46282e2d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2051,6 +2051,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { options.level0_file_num_compaction_trigger = 4; options.compaction_options_universal.size_ratio = 10; options.compaction_options_universal.stop_style = kCompactionStopStyleSimilarSize; + options.num_levels=1; Reopen(&options); Random rnd(301); @@ -2082,9 +2083,6 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { // (level0_file_num_compaction_trigger+1)=4 files and should have a big // file of size 4. ASSERT_EQ(NumTableFilesAtLevel(0), 1); - for (int i = 1; i < options.num_levels ; i++) { - ASSERT_EQ(NumTableFilesAtLevel(i), 0); - } // Stage 2: // Now we have one file at level 0, with size 4. We also have some data in @@ -2116,10 +2114,6 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1. // After compaction, we should have 3 files, with size 4, 0.4, 2. ASSERT_EQ(NumTableFilesAtLevel(0), 3); - for (int i = 1; i < options.num_levels ; i++) { - ASSERT_EQ(NumTableFilesAtLevel(i), 0); - } - // Stage 3: // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one // more file at level-0, which should trigger level-0 compaction. @@ -2130,9 +2124,6 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { dbfull()->TEST_WaitForCompact(); // Level-0 compaction is triggered, but no file will be picked up. ASSERT_EQ(NumTableFilesAtLevel(0), 4); - for (int i = 1; i < options.num_levels ; i++) { - ASSERT_EQ(NumTableFilesAtLevel(i), 0); - } } #if defined(SNAPPY) && defined(ZLIB) && defined(BZIP2) From e3f396f1eaea52fdfb65f7248afd039abc3b275c Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 4 Mar 2014 17:02:25 -0800 Subject: [PATCH 04/12] Some fixes to BackupableDB Summary: (1) Report corruption if backup meta file has tailing data that was not read. This should fix: https://github.com/facebook/rocksdb/issues/81 (also, @sdong reported similar issue) (2) Don't use OS buffer when copying file to backup directory. We don't need the file in cache since we won't be reading it twice (3) Don't delete newer backups when somebody tries to backup the diverged DB (restore from older backup, add new data, try to backup). Rather, just fail the new backup. Test Plan: backupable_db_test Reviewers: ljin, dhruba, sdong Reviewed By: ljin CC: leveldb, sdong Differential Revision: https://reviews.facebook.net/D16287 --- HISTORY.md | 1 + include/utilities/backupable_db.h | 64 +++++++++--------- utilities/backupable/backupable_db.cc | 77 +++++++++++++--------- utilities/backupable/backupable_db_test.cc | 34 ++++++---- 4 files changed, 101 insertions(+), 75 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 4133dd2ad7..0227580ad4 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * By default, checksums are verified on every read from database * Added is_manual_compaction to CompactionFilter::Context * Added "virtual void WaitForJoin() = 0" in class Env +* Removed BackupEngine::DeleteBackupsNewerThan() function ### New Features * If we find one truncated record at the end of the MANIFEST or WAL files, diff --git a/include/utilities/backupable_db.h b/include/utilities/backupable_db.h index ab3a1ed808..abf05978c2 100644 --- a/include/utilities/backupable_db.h +++ b/include/utilities/backupable_db.h @@ -58,14 +58,13 @@ struct BackupableDBOptions { explicit BackupableDBOptions(const std::string& _backup_dir, Env* _backup_env = nullptr, bool _share_table_files = true, - Logger* _info_log = nullptr, - bool _sync = true, - bool _destroy_old_data = false) : - backup_dir(_backup_dir), - backup_env(_backup_env), - info_log(_info_log), - sync(_sync), - destroy_old_data(_destroy_old_data) { } + Logger* _info_log = nullptr, bool _sync = true, + bool _destroy_old_data = false) + : backup_dir(_backup_dir), + backup_env(_backup_env), + info_log(_info_log), + sync(_sync), + destroy_old_data(_destroy_old_data) {} }; typedef uint32_t BackupID; @@ -99,8 +98,6 @@ class BackupEngine { const std::string& wal_dir) = 0; virtual Status RestoreDBFromLatestBackup(const std::string& db_dir, const std::string& wal_dir) = 0; - - virtual void DeleteBackupsNewerThan(uint64_t sequence_number) = 0; }; // Stack your DB with BackupableDB to be able to backup the DB @@ -138,32 +135,33 @@ class BackupableDB : public StackableDB { // Use this class to access information about backups and restore from them class RestoreBackupableDB { - public: - RestoreBackupableDB(Env* db_env, const BackupableDBOptions& options); - ~RestoreBackupableDB(); + public: + RestoreBackupableDB(Env* db_env, const BackupableDBOptions& options); + ~RestoreBackupableDB(); - // Returns info about backups in backup_info - void GetBackupInfo(std::vector* backup_info); + // Returns info about backups in backup_info + void GetBackupInfo(std::vector* backup_info); - // restore from backup with backup_id - // IMPORTANT -- if options_.share_table_files == true and you restore DB - // from some backup that is not the latest, and you start creating new - // backups from the new DB, all the backups that were newer than the - // backup you restored from will be deleted - // - // Example: Let's say you have backups 1, 2, 3, 4, 5 and you restore 3. - // If you try creating a new backup now, old backups 4 and 5 will be deleted - // and new backup with ID 4 will be created. - Status RestoreDBFromBackup(BackupID backup_id, const std::string& db_dir, - const std::string& wal_dir); + // restore from backup with backup_id + // IMPORTANT -- if options_.share_table_files == true and you restore DB + // from some backup that is not the latest, and you start creating new + // backups from the new DB, they will probably fail + // + // Example: Let's say you have backups 1, 2, 3, 4, 5 and you restore 3. + // If you add new data to the DB and try creating a new backup now, the + // database will diverge from backups 4 and 5 and the new backup will fail. + // If you want to create new backup, you will first have to delete backups 4 + // and 5. + Status RestoreDBFromBackup(BackupID backup_id, const std::string& db_dir, + const std::string& wal_dir); - // restore from the latest backup - Status RestoreDBFromLatestBackup(const std::string& db_dir, - const std::string& wal_dir); - // deletes old backups, keeping latest num_backups_to_keep alive - Status PurgeOldBackups(uint32_t num_backups_to_keep); - // deletes a specific backup - Status DeleteBackup(BackupID backup_id); + // restore from the latest backup + Status RestoreDBFromLatestBackup(const std::string& db_dir, + const std::string& wal_dir); + // deletes old backups, keeping latest num_backups_to_keep alive + Status PurgeOldBackups(uint32_t num_backups_to_keep); + // deletes a specific backup + Status DeleteBackup(BackupID backup_id); private: BackupEngine* backup_engine_; diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 89051f25aa..4225344702 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -46,8 +46,6 @@ class BackupEngineImpl : public BackupEngine { return RestoreDBFromBackup(latest_backup_id_, db_dir, wal_dir); } - void DeleteBackupsNewerThan(uint64_t sequence_number); - private: struct FileInfo { FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum) @@ -185,6 +183,12 @@ class BackupEngineImpl : public BackupEngine { Env* db_env_; Env* backup_env_; + // directories + unique_ptr backup_directory_; + unique_ptr shared_directory_; + unique_ptr meta_directory_; + unique_ptr private_directory_; + static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB }; @@ -203,11 +207,17 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, // create all the dirs we need backup_env_->CreateDirIfMissing(GetAbsolutePath()); + backup_env_->NewDirectory(GetAbsolutePath(), &backup_directory_); if (options_.share_table_files) { backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel())); + backup_env_->NewDirectory(GetAbsolutePath(GetSharedFileRel()), + &shared_directory_); } backup_env_->CreateDirIfMissing(GetAbsolutePath(GetPrivateDirRel())); + backup_env_->NewDirectory(GetAbsolutePath(GetPrivateDirRel()), + &private_directory_); backup_env_->CreateDirIfMissing(GetBackupMetaDir()); + backup_env_->NewDirectory(GetBackupMetaDir(), &meta_directory_); std::vector backup_meta_files; backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files); @@ -279,26 +289,6 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, BackupEngineImpl::~BackupEngineImpl() { LogFlush(options_.info_log); } -void BackupEngineImpl::DeleteBackupsNewerThan(uint64_t sequence_number) { - for (auto backup : backups_) { - if (backup.second.GetSequenceNumber() > sequence_number) { - Log(options_.info_log, - "Deleting backup %u because sequence number (%" PRIu64 - ") is newer than %" PRIu64 "", - backup.first, backup.second.GetSequenceNumber(), sequence_number); - backup.second.Delete(); - obsolete_backups_.push_back(backup.first); - } - } - for (auto ob : obsolete_backups_) { - backups_.erase(backups_.find(ob)); - } - auto itr = backups_.end(); - latest_backup_id_ = (itr == backups_.begin()) ? 0 : (--itr)->first; - PutLatestBackupFileContents(latest_backup_id_); // Ignore errors - GarbageCollection(false); -} - Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { Status s; std::vector live_files; @@ -348,9 +338,8 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { return Status::Corruption("Can't parse file name. This is very bad"); } // we should only get sst, manifest and current files here - assert(type == kTableFile || - type == kDescriptorFile || - type == kCurrentFile); + assert(type == kTableFile || type == kDescriptorFile || + type == kCurrentFile); // rules: // * if it's kTableFile, than it's shared @@ -394,6 +383,28 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { // install the newly created backup meta! (atomic) s = PutLatestBackupFileContents(new_backup_id); } + if (s.ok() && options_.sync) { + unique_ptr backup_private_directory; + backup_env_->NewDirectory( + GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)), + &backup_private_directory); + if (backup_private_directory != nullptr) { + backup_private_directory->Fsync(); + } + if (private_directory_ != nullptr) { + private_directory_->Fsync(); + } + if (meta_directory_ != nullptr) { + meta_directory_->Fsync(); + } + if (shared_directory_ != nullptr) { + shared_directory_->Fsync(); + } + if (backup_directory_ != nullptr) { + backup_directory_->Fsync(); + } + } + if (!s.ok()) { // clean all the files we might have created Log(options_.info_log, "Backup failed -- %s", s.ToString().c_str()); @@ -591,6 +602,7 @@ Status BackupEngineImpl::CopyFile(const std::string& src, unique_ptr src_file; EnvOptions env_options; env_options.use_mmap_writes = false; + env_options.use_os_buffer = false; if (size != nullptr) { *size = 0; } @@ -706,6 +718,7 @@ Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env, EnvOptions env_options; env_options.use_mmap_writes = false; + env_options.use_os_buffer = false; std::unique_ptr src_file; Status s = src_env->NewSequentialFile(src, &src_file, env_options); @@ -893,6 +906,9 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( uint64_t size; s = env_->GetFileSize(backup_dir + "/" + filename, &size); + if (!s.ok()) { + return s; + } if (line.empty()) { return Status::Corruption("File checksum is missing"); @@ -913,6 +929,11 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( files.emplace_back(filename, size, checksum_value); } + if (s.ok() && data.size() > 0) { + // file has to be read completely. if not, we count it as corruption + s = Status::Corruption("Tailing data in backup meta file"); + } + if (s.ok()) { for (const auto& file_info : files) { s = AddFile(file_info); @@ -968,11 +989,7 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) { BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options) : StackableDB(db), - backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) { - if (options.share_table_files) { - backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber()); - } -} + backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) {} BackupableDB::~BackupableDB() { delete backup_engine_; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index ade2d954f6..aaff224f72 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -709,27 +709,37 @@ TEST(BackupableDBTest, OnlineIntegrationTest) { CloseRestoreDB(); } -TEST(BackupableDBTest, DeleteNewerBackups) { +TEST(BackupableDBTest, FailOverwritingBackups) { + options_.write_buffer_size = 1024 * 1024 * 1024; // 1GB // create backups 1, 2, 3, 4, 5 OpenBackupableDB(true); for (int i = 0; i < 5; ++i) { FillDB(db_.get(), 100 * i, 100 * (i + 1)); - ASSERT_OK(db_->CreateNewBackup(!!(i % 2))); + ASSERT_OK(db_->CreateNewBackup(true)); + CloseBackupableDB(); + OpenBackupableDB(false); } CloseBackupableDB(); - // backup 3 is fine - AssertBackupConsistency(3, 0, 300, 500); - // this should delete backups 4 and 5 - OpenBackupableDB(); - CloseBackupableDB(); - // backups 4 and 5 don't exist + // restore 3 OpenRestoreDB(); - Status s = restore_db_->RestoreDBFromBackup(4, dbname_, dbname_); - ASSERT_TRUE(s.IsNotFound()); - s = restore_db_->RestoreDBFromBackup(5, dbname_, dbname_); - ASSERT_TRUE(s.IsNotFound()); + ASSERT_OK(restore_db_->RestoreDBFromBackup(3, dbname_, dbname_)); CloseRestoreDB(); + + OpenBackupableDB(false); + FillDB(db_.get(), 0, 300); + Status s = db_->CreateNewBackup(true); + // the new backup fails because new table files + // clash with old table files from backups 4 and 5 + // (since write_buffer_size is huge, we can be sure that + // each backup will generate only one sst file and that + // a file generated by a new backup is the same as + // sst file generated by backup 4) + ASSERT_TRUE(s.IsCorruption()); + ASSERT_OK(db_->DeleteBackup(4)); + ASSERT_OK(db_->DeleteBackup(5)); + // now, the backup can succeed + ASSERT_OK(db_->CreateNewBackup(true)); } TEST(BackupableDBTest, NoShareTableFiles) { From a5b1d2f146e89600a76384c6546d40838978d0ff Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Tue, 4 Mar 2014 17:08:05 -0800 Subject: [PATCH 05/12] make key evenly distributed between 0 and FLAGS_num Summary: The issue is that when FLAGS_num is small, the leading bytes of the key are padded with 0s. This makes all keys have the same prefix 00000000 Most of the changes are just to make lint happy Test Plan: ran db_bench Reviewers: sdong, haobo, igor Reviewed By: sdong CC: leveldb Differential Revision: https://reviews.facebook.net/D16317 --- db/db_bench.cc | 363 ++++++++++++++++++++++++++++--------------------- 1 file changed, 205 insertions(+), 158 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 291a0ce8c6..b01828b1e7 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -7,6 +7,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#define __STDC_FORMAT_MACROS +#include #include #include #include @@ -487,6 +489,9 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) { } DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and " "plain table"); +DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated " + "per prefix, 0 means no special handling of the prefix, " + "i.e. use the prefix comes with the generated random number."); enum RepFactory { kSkipList, @@ -593,9 +598,9 @@ class Stats { double start_; double finish_; double seconds_; - long long done_; - long long last_report_done_; - long long next_report_; + int64_t done_; + int64_t last_report_done_; + int64_t next_report_; int64_t bytes_; double last_op_finish_; double last_report_finish_; @@ -672,12 +677,12 @@ class Stats { else if (next_report_ < 100000) next_report_ += 10000; else if (next_report_ < 500000) next_report_ += 50000; else next_report_ += 100000; - fprintf(stderr, "... finished %lld ops%30s\r", done_, ""); + fprintf(stderr, "... finished %" PRIu64 " ops%30s\r", done_, ""); fflush(stderr); } else { double now = FLAGS_env->NowMicros(); fprintf(stderr, - "%s ... thread %d: (%lld,%lld) ops and " + "%s ... thread %d: (%" PRIu64 ",%" PRIu64 ") ops and " "(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n", FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(), id_, @@ -773,7 +778,7 @@ struct ThreadState { class Duration { public: - Duration(int max_seconds, long long max_ops) { + Duration(int max_seconds, int64_t max_ops) { max_seconds_ = max_seconds; max_ops_= max_ops; ops_ = 0; @@ -799,8 +804,8 @@ class Duration { private: int max_seconds_; - long long max_ops_; - long long ops_; + int64_t max_ops_; + int64_t ops_; double start_at_; }; @@ -811,24 +816,27 @@ class Benchmark { const FilterPolicy* filter_policy_; const SliceTransform* prefix_extractor_; DB* db_; - long long num_; + int64_t num_; int value_size_; int key_size_; + int prefix_size_; + int64_t keys_per_prefix_; int entries_per_batch_; WriteOptions write_options_; - long long reads_; - long long writes_; - long long readwrites_; - long long merge_keys_; + int64_t reads_; + int64_t writes_; + int64_t readwrites_; + int64_t merge_keys_; int heap_counter_; - char keyFormat_[100]; // will contain the format of key. e.g "%016d" void PrintHeader() { PrintEnvironment(); fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size); fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n", FLAGS_value_size, static_cast(FLAGS_value_size * FLAGS_compression_ratio + 0.5)); - fprintf(stdout, "Entries: %lld\n", num_); + fprintf(stdout, "Entries: %" PRIu64 "\n", num_); + fprintf(stdout, "Prefix: %d bytes\n", FLAGS_prefix_size); + fprintf(stdout, "Keys per prefix: %" PRIu64 "\n", keys_per_prefix_); fprintf(stdout, "RawSize: %.1f MB (estimated)\n", ((static_cast(FLAGS_key_size + FLAGS_value_size) * num_) / 1048576.0)); @@ -1006,6 +1014,8 @@ class Benchmark { num_(FLAGS_num), value_size_(FLAGS_value_size), key_size_(FLAGS_key_size), + prefix_size_(FLAGS_prefix_size), + keys_per_prefix_(FLAGS_keys_per_prefix), entries_per_batch_(1), reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes), @@ -1014,6 +1024,11 @@ class Benchmark { ), merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys), heap_counter_(0) { + if (FLAGS_prefix_size > FLAGS_key_size) { + fprintf(stderr, "prefix size is larger than key size"); + exit(1); + } + std::vector files; FLAGS_env->GetChildren(FLAGS_db, &files); for (unsigned int i = 0; i < files.size(); i++) { @@ -1032,17 +1047,55 @@ class Benchmark { delete prefix_extractor_; } - //this function will construct string format for key. e.g "%016lld" - void ConstructStrFormatForKey(char* str, int keySize) { - str[0] = '%'; - str[1] = '0'; - sprintf(str+2, "%dlld%s", keySize, "%s"); - } + // Generate key according to the given specification and random number. + // The resulting key will have the following format (if keys_per_prefix_ + // is positive), extra trailing bytes are either cut off or paddd with '0'. + // The prefix value is derived from key value. + // ---------------------------- + // | prefix 00000 | key 00000 | + // ---------------------------- + // If keys_per_prefix_ is 0, the key is simply a binary representation of + // random number followed by trailing '0's + // ---------------------------- + // | key 00000 | + // ---------------------------- + std::string GenerateKeyFromInt(uint64_t v, int64_t num_keys) { + std::string key; + key.resize(key_size_); + char* start = &(key[0]); + char* pos = start; + if (keys_per_prefix_ > 0) { + int64_t num_prefix = num_keys / keys_per_prefix_; + int64_t prefix = v % num_prefix; + int bytes_to_fill = std::min(prefix_size_, 8); + if (port::kLittleEndian) { + for (int i = 0; i < bytes_to_fill; ++i) { + pos[i] = (prefix >> ((bytes_to_fill - i - 1) << 3)) & 0xFF; + } + } else { + memcpy(pos, static_cast(&prefix), bytes_to_fill); + } + if (prefix_size_ > 8) { + // fill the rest with 0s + memset(pos + 8, '0', prefix_size_ - 8); + } + pos += prefix_size_; + } - unique_ptr GenerateKeyFromInt(long long v, const char* suffix = "") { - unique_ptr keyInStr(new char[kMaxKeySize + 1]); - snprintf(keyInStr.get(), kMaxKeySize + 1, keyFormat_, v, suffix); - return keyInStr; + int bytes_to_fill = std::min(key_size_ - static_cast(pos - start), 8); + if (port::kLittleEndian) { + for (int i = 0; i < bytes_to_fill; ++i) { + pos[i] = (v >> ((bytes_to_fill - i - 1) << 3)) & 0xFF; + } + } else { + memcpy(pos, static_cast(&v), bytes_to_fill); + } + pos += bytes_to_fill; + if (key_size_ > pos - start) { + memset(pos, '0', key_size_ - (pos - start)); + } + + return key; } void Run() { @@ -1066,7 +1119,6 @@ class Benchmark { writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes); value_size_ = FLAGS_value_size; key_size_ = FLAGS_key_size; - ConstructStrFormatForKey(keyFormat_, key_size_); entries_per_batch_ = 1; write_options_ = WriteOptions(); if (FLAGS_sync) { @@ -1698,7 +1750,7 @@ class Benchmark { if (num_ != FLAGS_num) { char msg[100]; - snprintf(msg, sizeof(msg), "(%lld ops)", num_); + snprintf(msg, sizeof(msg), "(%" PRIu64 " ops)", num_); thread->stats.AddMessage(msg); } @@ -1710,7 +1762,7 @@ class Benchmark { while (!duration.Done(entries_per_batch_)) { batch.Clear(); for (int j = 0; j < entries_per_batch_; j++) { - long long k = 0; + int64_t k = 0; switch(write_mode) { case SEQUENTIAL: k = i +j; @@ -1720,7 +1772,7 @@ class Benchmark { break; case UNIQUE_RANDOM: { - const long long t = thread->rand.Next() % FLAGS_num; + const int64_t t = thread->rand.Next() % FLAGS_num; if (!bit_set->test(t)) { // best case k = t; @@ -1748,9 +1800,9 @@ class Benchmark { break; } }; - unique_ptr key = GenerateKeyFromInt(k); - batch.Put(key.get(), gen.Generate(value_size_)); - bytes += value_size_ + strlen(key.get()); + std::string key = GenerateKeyFromInt(k, FLAGS_num); + batch.Put(key, gen.Generate(value_size_)); + bytes += value_size_ + key.size(); thread->stats.FinishedSingleOp(db_); } s = db_->Write(write_options_, &batch); @@ -1765,7 +1817,7 @@ class Benchmark { void ReadSequential(ThreadState* thread) { Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true)); - long long i = 0; + int64_t i = 0; int64_t bytes = 0; for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { bytes += iter->key().size() + iter->value().size(); @@ -1778,7 +1830,7 @@ class Benchmark { void ReadReverse(ThreadState* thread) { Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true)); - long long i = 0; + int64_t i = 0; int64_t bytes = 0; for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { bytes += iter->key().size() + iter->value().size(); @@ -1792,20 +1844,20 @@ class Benchmark { // Calls MultiGet over a list of keys from a random distribution. // Returns the total number of keys found. long MultiGetRandom(ReadOptions& options, int num_keys, - Random64& rand, long long range, const char* suffix) { + Random64* rand, int64_t range, const char* suffix) { assert(num_keys > 0); std::vector keys(num_keys); std::vector values(num_keys); - std::vector > gen_keys(num_keys); + std::vector gen_keys(num_keys); int i; - long long k; + int64_t k; // Fill the keys vector for(i=0; iNext() % range; + gen_keys[i] = GenerateKeyFromInt(k, range) + suffix; + keys[i] = gen_keys[i]; } if (FLAGS_use_snapshot) { @@ -1841,7 +1893,7 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); Duration duration(FLAGS_duration, reads_); - long long found = 0; + int64_t found = 0; if (FLAGS_use_multiget) { // MultiGet const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group @@ -1850,7 +1902,8 @@ class Benchmark { // Recalculate number of keys per group, and call MultiGet until done long num_keys; while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { - found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, ""); + found += + MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, ""); thread->stats.FinishedSingleOp(db_); keys_left -= num_keys; } @@ -1858,11 +1911,11 @@ class Benchmark { options.tailing = true; Iterator* iter = db_->NewIterator(options); while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); - iter->Seek(key.get()); - if (iter->Valid() && iter->key().compare(Slice(key.get())) == 0) { + iter->Seek(key); + if (iter->Valid() && iter->key().compare(Slice(key)) == 0) { ++found; } @@ -1873,30 +1926,29 @@ class Benchmark { Iterator* iter = db_->NewIterator(options); std::string value; while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); if (FLAGS_use_snapshot) { options.snapshot = db_->GetSnapshot(); } if (FLAGS_read_range < 2) { - if (db_->Get(options, key.get(), &value).ok()) { + if (db_->Get(options, key, &value).ok()) { found++; } } else { - Slice skey(key.get()); int count = 1; if (FLAGS_get_approx) { - unique_ptr key2 = - GenerateKeyFromInt(k + (int) FLAGS_read_range); - Slice skey2(key2.get()); - Range range(skey, skey2); + std::string key2 = + GenerateKeyFromInt(k + static_cast(FLAGS_read_range), + FLAGS_num + FLAGS_read_range); + Range range(key, key2); uint64_t sizes; db_->GetApproximateSizes(&range, 1, &sizes); } - for (iter->Seek(skey); + for (iter->Seek(key); iter->Valid() && count <= FLAGS_read_range; ++count, iter->Next()) { found++; @@ -1915,7 +1967,8 @@ class Benchmark { } char msg[100]; - snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_); + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + found, reads_); thread->stats.AddMessage(msg); } @@ -1928,13 +1981,13 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); Duration duration(FLAGS_duration, reads_); - long long found = 0; + int64_t found = 0; while (!duration.Done(1)) { std::string value; const int k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); - Slice skey(key.get()); + std::string key = GenerateKeyFromInt(k, FLAGS_num); + Slice skey(key); Slice prefix = prefix_extractor_->Transform(skey); options.prefix = FLAGS_use_prefix_api ? &prefix : nullptr; @@ -1950,7 +2003,8 @@ class Benchmark { } char msg[100]; - snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_); + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + found, reads_); thread->stats.AddMessage(msg); } @@ -1968,7 +2022,8 @@ class Benchmark { long num_keys; long found; while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { - found = MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, "."); + found = + MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, "."); // We should not find any key since the key we try to get has a // different suffix @@ -1983,9 +2038,9 @@ class Benchmark { std::string value; Status s; while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k, "."); - s = db_->Get(options, key.get(), &value); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num) + "."; + s = db_->Get(options, key, &value); assert(!s.ok() && s.IsNotFound()); thread->stats.FinishedSingleOp(db_); } @@ -1995,26 +2050,26 @@ class Benchmark { void ReadHot(ThreadState* thread) { Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true); - const long long range = (FLAGS_num + 99) / 100; - long long found = 0; + const int64_t range = (FLAGS_num + 99) / 100; + int64_t found = 0; if (FLAGS_use_multiget) { - const long long kpg = FLAGS_keys_per_multiget; // keys per multiget group - long long keys_left = reads_; + const int64_t kpg = FLAGS_keys_per_multiget; // keys per multiget group + int64_t keys_left = reads_; // Recalculate number of keys per group, and call MultiGet until done long num_keys; while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { - found += MultiGetRandom(options, num_keys, thread->rand, range, ""); + found += MultiGetRandom(options, num_keys, &thread->rand, range, ""); thread->stats.FinishedSingleOp(db_); keys_left -= num_keys; } } else { std::string value; while (!duration.Done(1)) { - const long long k = thread->rand.Next() % range; - unique_ptr key = GenerateKeyFromInt(k); - if (db_->Get(options, key.get(), &value).ok()){ + const int64_t k = thread->rand.Next() % range; + std::string key = GenerateKeyFromInt(k, range); + if (db_->Get(options, key, &value).ok()) { ++found; } thread->stats.FinishedSingleOp(db_); @@ -2022,7 +2077,8 @@ class Benchmark { } char msg[100]; - snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_); + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + found, reads_); thread->stats.AddMessage(msg); } @@ -2040,18 +2096,19 @@ class Benchmark { Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true); std::string value; - long long found = 0; + int64_t found = 0; while (!duration.Done(1)) { Iterator* iter = db_->NewIterator(options); - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); - iter->Seek(key.get()); - if (iter->Valid() && iter->key() == key.get()) found++; + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); + iter->Seek(key); + if (iter->Valid() && iter->key() == Slice(key)) found++; delete iter; thread->stats.FinishedSingleOp(db_); } char msg[100]; - snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, num_); + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + found, num_); thread->stats.AddMessage(msg); } @@ -2063,9 +2120,9 @@ class Benchmark { while (!duration.Done(entries_per_batch_)) { batch.Clear(); for (int j = 0; j < entries_per_batch_; j++) { - const long long k = seq ? i+j : (thread->rand.Next() % FLAGS_num); - unique_ptr key = GenerateKeyFromInt(k); - batch.Delete(key.get()); + const int64_t k = seq ? i+j : (thread->rand.Next() % FLAGS_num); + std::string key = GenerateKeyFromInt(k, FLAGS_num); + batch.Delete(key); thread->stats.FinishedSingleOp(db_); } s = db_->Write(write_options_, &batch); @@ -2113,10 +2170,9 @@ class Benchmark { } } - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); - Status s = db_->Put(write_options_, key.get(), - gen.Generate(value_size_)); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); + Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -2228,18 +2284,18 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; std::string value; - long long found = 0; + int64_t found = 0; int get_weight = 0; int put_weight = 0; int delete_weight = 0; - long long gets_done = 0; - long long puts_done = 0; - long long deletes_done = 0; + int64_t gets_done = 0; + int64_t puts_done = 0; + int64_t deletes_done = 0; // the number of iterations is the larger of read_ or write_ - for (long long i = 0; i < readwrites_; i++) { - const long long k = thread->rand.Next() % (FLAGS_numdistinct); - unique_ptr key = GenerateKeyFromInt(k); + for (int64_t i = 0; i < readwrites_; i++) { + const int64_t k = thread->rand.Next() % (FLAGS_numdistinct); + std::string key = GenerateKeyFromInt(k, FLAGS_numdistinct); if (get_weight == 0 && put_weight == 0 && delete_weight == 0) { // one batch completed, reinitialize for next batch get_weight = FLAGS_readwritepercent; @@ -2248,7 +2304,7 @@ class Benchmark { } if (get_weight > 0) { // do all the gets first - Status s = GetMany(options, key.get(), &value); + Status s = GetMany(options, key, &value); if (!s.ok() && !s.IsNotFound()) { fprintf(stderr, "getmany error: %s\n", s.ToString().c_str()); // we continue after error rather than exiting so that we can @@ -2261,8 +2317,7 @@ class Benchmark { } else if (put_weight > 0) { // then do all the corresponding number of puts // for all the gets we have done earlier - Status s = PutMany(write_options_, key.get(), - gen.Generate(value_size_)); + Status s = PutMany(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "putmany error: %s\n", s.ToString().c_str()); exit(1); @@ -2270,7 +2325,7 @@ class Benchmark { put_weight--; puts_done++; } else if (delete_weight > 0) { - Status s = DeleteMany(write_options_, key.get()); + Status s = DeleteMany(write_options_, key); if (!s.ok()) { fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str()); exit(1); @@ -2283,7 +2338,8 @@ class Benchmark { } char msg[100]; snprintf(msg, sizeof(msg), - "( get:%lld put:%lld del:%lld total:%lld found:%lld)", + "( get:%" PRIu64 " put:%" PRIu64 " del:%" PRIu64 " total:%" \ + PRIu64 " found:%" PRIu64 ")", gets_done, puts_done, deletes_done, readwrites_, found); thread->stats.AddMessage(msg); } @@ -2300,17 +2356,17 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; std::string value; - long long found = 0; + int64_t found = 0; int get_weight = 0; int put_weight = 0; - long long reads_done = 0; - long long writes_done = 0; + int64_t reads_done = 0; + int64_t writes_done = 0; Duration duration(FLAGS_duration, readwrites_); // the number of iterations is the larger of read_ or write_ while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); if (get_weight == 0 && put_weight == 0) { // one batch completed, reinitialize for next batch get_weight = FLAGS_readwritepercent; @@ -2323,17 +2379,14 @@ class Benchmark { } if (FLAGS_get_approx) { - char key2[100]; - snprintf(key2, sizeof(key2), "%016lld", k + 1); - Slice skey2(key2); - Slice skey(key2); - Range range(skey, skey2); + std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1); + Range range(key, key2); uint64_t sizes; db_->GetApproximateSizes(&range, 1, &sizes); } // do all the gets first - Status s = db_->Get(options, key.get(), &value); + Status s = db_->Get(options, key, &value); if (!s.ok() && !s.IsNotFound()) { fprintf(stderr, "get error: %s\n", s.ToString().c_str()); // we continue after error rather than exiting so that we can @@ -2352,8 +2405,7 @@ class Benchmark { } else if (put_weight > 0) { // then do all the corresponding number of puts // for all the gets we have done earlier - Status s = db_->Put(write_options_, key.get(), - gen.Generate(value_size_)); + Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -2364,8 +2416,8 @@ class Benchmark { thread->stats.FinishedSingleOp(db_); } char msg[100]; - snprintf(msg, sizeof(msg), - "( reads:%lld writes:%lld total:%lld found:%lld)", + snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \ + " total:%" PRIu64 " found:%" PRIu64 ")", reads_done, writes_done, readwrites_, found); thread->stats.AddMessage(msg); } @@ -2388,10 +2440,10 @@ class Benchmark { long num_keys; // number of keys to read in current group long num_put_keys; // number of keys to put in current group - long found = 0; - long reads_done = 0; - long writes_done = 0; - long multigets_done = 0; + int64_t found = 0; + int64_t reads_done = 0; + int64_t writes_done = 0; + int64_t multigets_done = 0; // the number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); @@ -2415,18 +2467,18 @@ class Benchmark { assert(num_keys + num_put_keys <= keys_left); // Apply the MultiGet operations - found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, ""); + found += MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, ""); ++multigets_done; reads_done+=num_keys; thread->stats.FinishedSingleOp(db_); // Now do the puts int i; - long long k; + int64_t k; for(i=0; irand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); - Status s = db_->Put(write_options_, key.get(), + std::string key = GenerateKeyFromInt(k, FLAGS_num); + Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); @@ -2440,7 +2492,8 @@ class Benchmark { } char msg[100]; snprintf(msg, sizeof(msg), - "( reads:%ld writes:%ld total:%lld multiget_ops:%ld found:%ld)", + "( reads:%" PRIu64 " writes:%" PRIu64 " total:%" PRIu64 \ + " multiget_ops:%" PRIu64 " found:%" PRIu64 ")", reads_done, writes_done, readwrites_, multigets_done, found); thread->stats.AddMessage(msg); } @@ -2451,29 +2504,26 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; std::string value; - long long found = 0; + int64_t found = 0; Duration duration(FLAGS_duration, readwrites_); // the number of iterations is the larger of read_ or write_ while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); if (FLAGS_use_snapshot) { options.snapshot = db_->GetSnapshot(); } if (FLAGS_get_approx) { - char key2[100]; - snprintf(key2, sizeof(key2), "%016lld", k + 1); - Slice skey2(key2); - Slice skey(key2); - Range range(skey, skey2); + std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1); + Range range(key, key2); uint64_t sizes; db_->GetApproximateSizes(&range, 1, &sizes); } - if (db_->Get(options, key.get(), &value).ok()) { + if (db_->Get(options, key, &value).ok()) { found++; } @@ -2481,7 +2531,7 @@ class Benchmark { db_->ReleaseSnapshot(options.snapshot); } - Status s = db_->Put(write_options_, key.get(), gen.Generate(value_size_)); + Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -2490,7 +2540,7 @@ class Benchmark { } char msg[100]; snprintf(msg, sizeof(msg), - "( updates:%lld found:%lld)", readwrites_, found); + "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found); thread->stats.AddMessage(msg); } @@ -2501,30 +2551,27 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; std::string value; - long found = 0; + int64_t found = 0; // The number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); if (FLAGS_use_snapshot) { options.snapshot = db_->GetSnapshot(); } if (FLAGS_get_approx) { - char key2[100]; - snprintf(key2, sizeof(key2), "%016lld", k + 1); - Slice skey2(key2); - Slice skey(key2); - Range range(skey, skey2); + std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1); + Range range(key, key2); uint64_t sizes; db_->GetApproximateSizes(&range, 1, &sizes); } // Get the existing value - if (db_->Get(options, key.get(), &value).ok()) { + if (db_->Get(options, key, &value).ok()) { found++; } else { // If not existing, then just assume an empty string of data @@ -2544,7 +2591,7 @@ class Benchmark { value.append(operand.data(), operand.size()); // Write back to the database - Status s = db_->Put(write_options_, key.get(), value); + Status s = db_->Put(write_options_, key, value); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -2552,7 +2599,8 @@ class Benchmark { thread->stats.FinishedSingleOp(db_); } char msg[100]; - snprintf(msg, sizeof(msg), "( updates:%lld found:%ld)", readwrites_, found); + snprintf(msg, sizeof(msg), "( updates:%" PRIu64 " found:%" PRIu64 ")", + readwrites_, found); thread->stats.AddMessage(msg); } @@ -2572,11 +2620,10 @@ class Benchmark { // The number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { - const long long k = thread->rand.Next() % merge_keys_; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % merge_keys_; + std::string key = GenerateKeyFromInt(k, merge_keys_); - Status s = db_->Merge(write_options_, key.get(), - gen.Generate(value_size_)); + Status s = db_->Merge(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "merge error: %s\n", s.ToString().c_str()); @@ -2587,7 +2634,7 @@ class Benchmark { // Print some statistics char msg[100]; - snprintf(msg, sizeof(msg), "( updates:%lld)", readwrites_); + snprintf(msg, sizeof(msg), "( updates:%" PRIu64 ")", readwrites_); thread->stats.AddMessage(msg); } @@ -2602,23 +2649,22 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; std::string value; - long long num_hits = 0; - long long num_gets = 0; - long long num_merges = 0; + int64_t num_hits = 0; + int64_t num_gets = 0; + int64_t num_merges = 0; size_t max_length = 0; // the number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { - const long long k = thread->rand.Next() % merge_keys_; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % merge_keys_; + std::string key = GenerateKeyFromInt(k, merge_keys_); bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent; if (do_merge) { - Status s = db_->Merge(write_options_, key.get(), - gen.Generate(value_size_)); + Status s = db_->Merge(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "merge error: %s\n", s.ToString().c_str()); exit(1); @@ -2627,7 +2673,7 @@ class Benchmark { num_merges++; } else { - Status s = db_->Get(options, key.get(), &value); + Status s = db_->Get(options, key, &value); if (value.length() > max_length) max_length = value.length(); @@ -2647,7 +2693,8 @@ class Benchmark { } char msg[100]; snprintf(msg, sizeof(msg), - "(reads:%lld merges:%lld total:%lld hits:%lld maxlength:%zu)", + "(reads:%" PRIu64 " merges:%" PRIu64 " total:%" PRIu64 " hits:%" \ + PRIu64 " maxlength:%zu)", num_gets, num_merges, readwrites_, num_hits, max_length); thread->stats.AddMessage(msg); } From a01bda09977ae72798e53fb00efc18ef29f38faf Mon Sep 17 00:00:00 2001 From: kailiu Date: Tue, 4 Mar 2014 16:59:27 -0800 Subject: [PATCH 06/12] Fix a buggy assert Summary: The assert was pointless since if if prefix is the same as the whole key, assertion will surely fail. Reason behind is when performing the internal key comparison, if user keys are the same, *key with smaller transaction id* wins. Test Plan: make -j32 && make check Reviewers: sdong, dhruba, haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16551 --- table/filter_block.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/table/filter_block.cc b/table/filter_block.cc index d7be78e1c5..7d1bfccaa4 100644 --- a/table/filter_block.cc +++ b/table/filter_block.cc @@ -82,7 +82,6 @@ void FilterBlockBuilder::AddKey(const Slice& key) { Slice prefix = prefix_extractor_->Transform(user_key); InternalKey internal_prefix_tmp(prefix, 0, kTypeValue); Slice internal_prefix = internal_prefix_tmp.Encode(); - assert(comparator_->Compare(internal_prefix, key) <= 0); start_.push_back(entries_.size()); entries_.append(internal_prefix.data(), internal_prefix.size()); } From e8ecca9e8657cba9b8bc1777e61c5396f4a94cf2 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 4 Mar 2014 18:17:27 -0800 Subject: [PATCH 07/12] CleanupIteratorState() only to initialize DeletionState when super version cleanup needed Summary: Two changes: 1. DeletionState is only constructed when cleaning up is needed 2. Fix the bug of deletion state construction bug. A change was made in a previous patch: https://reviews.facebook.net/rROCKSDB774ed89c2405ee058086b099cbc8b29e243739cc#71a34e2e However, it somehow got lost when merging Test Plan: make all check Reviewers: kailiu, haobo, igor Reviewed By: igor CC: igor, dhruba, i.am.jin.lei, yhchiang, leveldb Differential Revision: https://reviews.facebook.net/D16233 --- db/db_impl.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 2fdd7bf875..b86737a2f4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2743,11 +2743,11 @@ struct IterState { static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); - DBImpl::DeletionState deletion_state(state->db->GetOptions(). - max_write_buffer_number); bool need_cleanup = state->super_version->Unref(); if (need_cleanup) { + DBImpl::DeletionState deletion_state; + state->mu->Lock(); state->super_version->Cleanup(); state->db->FindObsoleteFiles(deletion_state, false, true); From 2b5155fb29d47389a67d6e8c9f15a98a821b750b Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 5 Mar 2014 09:00:53 -0800 Subject: [PATCH 08/12] CloseDB in BackupableDBTest to make valgrind happy --- utilities/backupable/backupable_db_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index aaff224f72..a5f146e057 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -740,6 +740,7 @@ TEST(BackupableDBTest, FailOverwritingBackups) { ASSERT_OK(db_->DeleteBackup(5)); // now, the backup can succeed ASSERT_OK(db_->CreateNewBackup(true)); + CloseBackupableDB(); } TEST(BackupableDBTest, NoShareTableFiles) { From c0ccf436488819b3101ea0305c7388757f2c237e Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 5 Mar 2014 09:13:07 -0800 Subject: [PATCH 09/12] MergingIterator assertion Summary: I wrote a test that triggers assertion in MergingIterator. I have not touched that code ever, so I'm looking for somebody with good understanding of the MergingIterator code to fix this. The solution is probably a one-liner. Let me know if you're willing to take a look. Test Plan: This test fails with an assertion `use_heap_ == false` Reviewers: dhruba, haobo, sdong, kailiu Reviewed By: sdong CC: leveldb Differential Revision: https://reviews.facebook.net/D16521 --- db/db_test.cc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/db/db_test.cc b/db/db_test.cc index 7310fd66b0..959daf26d4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1334,6 +1334,23 @@ TEST(DBTest, FilterDeletes) { } while (ChangeCompactOptions()); } + +TEST(DBTest, IterSeekBeforePrev) { + ASSERT_OK(Put("a", "b")); + ASSERT_OK(Put("c", "d")); + dbfull()->Flush(FlushOptions()); + ASSERT_OK(Put("0", "f")); + ASSERT_OK(Put("1", "h")); + dbfull()->Flush(FlushOptions()); + ASSERT_OK(Put("2", "j")); + auto iter = db_->NewIterator(ReadOptions()); + iter->Seek(Slice("c")); + iter->Prev(); + iter->Seek(Slice("a")); + iter->Prev(); + delete iter; +} + TEST(DBTest, IterEmpty) { do { Iterator* iter = db_->NewIterator(ReadOptions()); From 51560ba7556073ffa14ac97143fefc8297b359e7 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Wed, 5 Mar 2014 10:27:17 -0800 Subject: [PATCH 10/12] config max_background_flushes in db_bench Summary: as title Test Plan: make release Reviewers: haobo, sdong, igor Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16437 --- db/db_bench.cc | 42 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index b01828b1e7..1a050a48dc 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -189,6 +189,11 @@ DEFINE_int32(max_background_compactions, "The maximum number of concurrent background compactions" " that can occur in parallel."); +DEFINE_int32(max_background_flushes, + rocksdb::Options().max_background_flushes, + "The maximum number of concurrent background flushes" + " that can occur in parallel."); + static rocksdb::CompactionStyle FLAGS_compaction_style_e; DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style, "style of compaction: level-based vs universal"); @@ -225,6 +230,8 @@ DEFINE_int32(open_files, rocksdb::Options().max_open_files, DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means" " use default settings."); +DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. " + "Negative means no bloom filter."); DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing" " database. If you set this flag and also specify a benchmark that" @@ -496,7 +503,8 @@ DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated " enum RepFactory { kSkipList, kPrefixHash, - kVectorRep + kVectorRep, + kHashLinkedList }; enum RepFactory StringToRepFactory(const char* ctype) { assert(ctype); @@ -507,12 +515,15 @@ enum RepFactory StringToRepFactory(const char* ctype) { return kPrefixHash; else if (!strcasecmp(ctype, "vector")) return kVectorRep; + else if (!strcasecmp(ctype, "hash_linkedlist")) + return kHashLinkedList; fprintf(stdout, "Cannot parse memreptable %s\n", ctype); return kSkipList; } static enum RepFactory FLAGS_rep_factory; DEFINE_string(memtablerep, "skip_list", ""); +DEFINE_int64(hash_bucket_count, 1024 * 1024, "hash bucket count"); DEFINE_bool(use_plain_table, false, "if use plain table " "instead of block-based table format"); @@ -864,7 +875,7 @@ class Benchmark { case rocksdb::kLZ4HCCompression: fprintf(stdout, "Compression: lz4hc\n"); break; - } + } switch (FLAGS_rep_factory) { case kPrefixHash: @@ -876,6 +887,9 @@ class Benchmark { case kVectorRep: fprintf(stdout, "Memtablerep: vector\n"); break; + case kHashLinkedList: + fprintf(stdout, "Memtablerep: hash_linkedlist\n"); + break; } fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level); @@ -1521,12 +1535,14 @@ class Benchmark { options.min_write_buffer_number_to_merge = FLAGS_min_write_buffer_number_to_merge; options.max_background_compactions = FLAGS_max_background_compactions; + options.max_background_flushes = FLAGS_max_background_flushes; options.compaction_style = FLAGS_compaction_style_e; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.prefix_extractor = (FLAGS_use_plain_table || FLAGS_use_prefix_blooms) ? prefix_extractor_ : nullptr; + options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits; options.max_open_files = FLAGS_open_files; options.statistics = dbstats; options.env = FLAGS_env; @@ -1540,19 +1556,26 @@ class Benchmark { options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier; options.filter_deletes = FLAGS_filter_deletes; - if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) { - fprintf(stderr, "prefix_size should be non-zero iff memtablerep " - "== prefix_hash\n"); + if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash || + FLAGS_rep_factory == kHashLinkedList)) { + fprintf(stderr, "prefix_size should be non-zero if PrefixHash or " + "HashLinkedList memtablerep is used\n"); exit(1); } switch (FLAGS_rep_factory) { case kPrefixHash: options.memtable_factory.reset(NewHashSkipListRepFactory( - NewFixedPrefixTransform(FLAGS_prefix_size))); + NewFixedPrefixTransform(FLAGS_prefix_size), + FLAGS_hash_bucket_count)); break; case kSkipList: // no need to do anything break; + case kHashLinkedList: + options.memtable_factory.reset(NewHashLinkListRepFactory( + NewFixedPrefixTransform(FLAGS_prefix_size), + FLAGS_hash_bucket_count)); + break; case kVectorRep: options.memtable_factory.reset( new VectorRepFactory @@ -1560,7 +1583,8 @@ class Benchmark { break; } if (FLAGS_use_plain_table) { - if (FLAGS_rep_factory != kPrefixHash) { + if (FLAGS_rep_factory != kPrefixHash && + FLAGS_rep_factory != kHashLinkedList) { fprintf(stderr, "Waring: plain table is used with skipList\n"); } if (!FLAGS_mmap_read && !FLAGS_mmap_write) { @@ -1740,7 +1764,7 @@ class Benchmark { void DoWrite(ThreadState* thread, WriteMode write_mode) { const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0; - const int num_ops = writes_ == 0 ? num_ : writes_ ; + const int64_t num_ops = writes_ == 0 ? num_ : writes_; Duration duration(test_duration, num_ops); unique_ptr bit_set; @@ -1923,6 +1947,8 @@ class Benchmark { } delete iter; } else { // Regular case. Do one "get" at a time Get + options.tailing = true; + options.prefix_seek = (FLAGS_prefix_size == 0); Iterator* iter = db_->NewIterator(options); std::string value; while (!duration.Done(1)) { From 64138b5d9cae2990aa46157da97cdf1656c1a046 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Wed, 5 Mar 2014 10:28:53 -0800 Subject: [PATCH 11/12] fix db_bench to use HashSkipList for real Summary: For HashSkipList case, DBImpl has sanity check to see if prefix_extractor in options is the same as the one in memtable factory. If not, it falls back to SkipList. As result, I was experimenting with SkipList performance. No wonder it is much worse than LinkedList Test Plan: ran benchmark Reviewers: haobo, sdong, igor Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D16569 --- db/db_bench.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 1a050a48dc..085039cc5c 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -1022,8 +1022,7 @@ class Benchmark { filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits) : nullptr), - prefix_extractor_(NewFixedPrefixTransform(FLAGS_use_plain_table ? - FLAGS_prefix_size : FLAGS_key_size-1)), + prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)), db_(nullptr), num_(FLAGS_num), value_size_(FLAGS_value_size), @@ -1565,7 +1564,7 @@ class Benchmark { switch (FLAGS_rep_factory) { case kPrefixHash: options.memtable_factory.reset(NewHashSkipListRepFactory( - NewFixedPrefixTransform(FLAGS_prefix_size), + prefix_extractor_, FLAGS_hash_bucket_count)); break; case kSkipList: @@ -1573,7 +1572,7 @@ class Benchmark { break; case kHashLinkedList: options.memtable_factory.reset(NewHashLinkListRepFactory( - NewFixedPrefixTransform(FLAGS_prefix_size), + prefix_extractor_, FLAGS_hash_bucket_count)); break; case kVectorRep: From 04298f8c33cfd9a044bc8e0ee4bb5aae6f9fb158 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Wed, 5 Mar 2014 10:32:54 -0800 Subject: [PATCH 12/12] output perf_context in db_bench readrandom Summary: Add helper function to print perf context data in db_bench if enabled. I didn't find any code that actually exports perf context data. Not sure if I missed anything Test Plan: ran db_bench Reviewers: haobo, sdong, igor Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D16575 --- db/db_bench.cc | 5 +++++ include/rocksdb/perf_context.h | 3 +++ util/perf_context.cc | 31 +++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+) diff --git a/db/db_bench.cc b/db/db_bench.cc index 085039cc5c..efb6f210f8 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -1994,7 +1994,12 @@ class Benchmark { char msg[100]; snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", found, reads_); + thread->stats.AddMessage(msg); + + if (FLAGS_perf_level > 0) { + thread->stats.AddMessage(perf_context.ToString()); + } } void PrefixScanRandom(ThreadState* thread) { diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index 551ca8fe66..61adad6b73 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -7,6 +7,7 @@ #define STORAGE_ROCKSDB_INCLUDE_PERF_CONTEXT_H #include +#include namespace rocksdb { @@ -26,6 +27,8 @@ struct PerfContext { void Reset(); // reset all performance counters to zero + std::string ToString() const; + uint64_t user_key_comparison_count; // total number of user key comparisons uint64_t block_cache_hit_count; // total number of block cache hits uint64_t block_read_count; // total number of block reads (with IO) diff --git a/util/perf_context.cc b/util/perf_context.cc index 6833f6836a..650abebca0 100644 --- a/util/perf_context.cc +++ b/util/perf_context.cc @@ -3,6 +3,8 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // + +#include #include "util/perf_context_imp.h" namespace rocksdb { @@ -38,6 +40,35 @@ void PerfContext::Reset() { write_memtable_time = 0; } +#define OUTPUT(counter) #counter << " = " << counter << ", " + +std::string PerfContext::ToString() const { + std::ostringstream ss; + ss << OUTPUT(user_key_comparison_count) + << OUTPUT(block_cache_hit_count) + << OUTPUT(block_read_count) + << OUTPUT(block_read_byte) + << OUTPUT(block_read_time) + << OUTPUT(block_checksum_time) + << OUTPUT(block_decompress_time) + << OUTPUT(internal_key_skipped_count) + << OUTPUT(internal_delete_skipped_count) + << OUTPUT(write_wal_time) + << OUTPUT(get_snapshot_time) + << OUTPUT(get_from_memtable_time) + << OUTPUT(get_from_memtable_count) + << OUTPUT(get_post_process_time) + << OUTPUT(get_from_output_files_time) + << OUTPUT(seek_child_seek_time) + << OUTPUT(seek_child_seek_count) + << OUTPUT(seek_min_heap_time) + << OUTPUT(seek_internal_seek_time) + << OUTPUT(find_next_user_entry_time) + << OUTPUT(write_pre_and_post_process_time) + << OUTPUT(write_memtable_time); + return ss.str(); +} + __thread PerfContext perf_context; }