From 9ed96703d11a1cf4af0e1c1db0e4a6057a8e5d42 Mon Sep 17 00:00:00 2001 From: Changneng Chen Date: Fri, 25 Feb 2022 23:13:11 -0800 Subject: [PATCH] Add support for BlobDB to ldb (#9630) Summary: Add the configuration options and help messages of BlobDB to `ldb` Pull Request resolved: https://github.com/facebook/rocksdb/pull/9630 Test Plan: `python ./tools/ldb_test.py` Reviewed By: ltamasi Differential Revision: D34443176 Pulled By: changneng fbshipit-source-id: 5b3f185cdfc2561e06dd37215c7edfbca07dbe80 --- HISTORY.md | 1 + include/rocksdb/utilities/ldb_cmd.h | 21 +++ tools/ldb_cmd.cc | 260 ++++++++++++++++++++++------ tools/ldb_test.py | 44 +++++ tools/ldb_tool.cc | 14 ++ 5 files changed, 290 insertions(+), 50 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index f1e27e6471..80147d0808 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### New Features * Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp. +* Added BlobDB options to `ldb` ### Bug Fixes * * Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496) diff --git a/include/rocksdb/utilities/ldb_cmd.h b/include/rocksdb/utilities/ldb_cmd.h index e900abefee..fdf1b5d869 100644 --- a/include/rocksdb/utilities/ldb_cmd.h +++ b/include/rocksdb/utilities/ldb_cmd.h @@ -61,6 +61,14 @@ class LDBCommand { static const std::string ARG_CREATE_IF_MISSING; static const std::string ARG_NO_VALUE; static const std::string ARG_DISABLE_CONSISTENCY_CHECKS; + static const std::string ARG_ENABLE_BLOB_FILES; + static const std::string ARG_MIN_BLOB_SIZE; + static const std::string ARG_BLOB_FILE_SIZE; + static const std::string ARG_BLOB_COMPRESSION_TYPE; + static const std::string ARG_ENABLE_BLOB_GARBAGE_COLLECTION; + static const std::string ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF; + static const std::string ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD; + static const std::string ARG_BLOB_COMPACTION_READAHEAD_SIZE; struct ParsedParams { std::string cmd; @@ -173,6 +181,10 @@ class LDBCommand { // The value passed to options.force_consistency_checks. bool force_consistency_checks_; + bool enable_blob_files_; + + bool enable_blob_garbage_collection_; + bool create_if_missing_; /** @@ -233,9 +245,18 @@ class LDBCommand { const std::string& option, int& value, LDBCommandExecuteResult& exec_state); + bool ParseDoubleOption(const std::map& options, + const std::string& option, double& value, + LDBCommandExecuteResult& exec_state); + bool ParseStringOption(const std::map& options, const std::string& option, std::string* value); + bool ParseCompressionTypeOption( + const std::map& options, + const std::string& option, CompressionType& value, + LDBCommandExecuteResult& exec_state); + /** * Returns the value of the specified option as a boolean. * default_val is used if the option is not found in options. diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 7497354939..43f2c1c850 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -82,6 +82,19 @@ const std::string LDBCommand::ARG_WRITE_BUFFER_SIZE = "write_buffer_size"; const std::string LDBCommand::ARG_FILE_SIZE = "file_size"; const std::string LDBCommand::ARG_CREATE_IF_MISSING = "create_if_missing"; const std::string LDBCommand::ARG_NO_VALUE = "no_value"; +const std::string LDBCommand::ARG_ENABLE_BLOB_FILES = "enable_blob_files"; +const std::string LDBCommand::ARG_MIN_BLOB_SIZE = "min_blob_size"; +const std::string LDBCommand::ARG_BLOB_FILE_SIZE = "blob_file_size"; +const std::string LDBCommand::ARG_BLOB_COMPRESSION_TYPE = + "blob_compression_type"; +const std::string LDBCommand::ARG_ENABLE_BLOB_GARBAGE_COLLECTION = + "enable_blob_garbage_collection"; +const std::string LDBCommand::ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF = + "blob_garbage_collection_age_cutoff"; +const std::string LDBCommand::ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD = + "blob_garbage_collection_force_threshold"; +const std::string LDBCommand::ARG_BLOB_COMPACTION_READAHEAD_SIZE = + "blob_compaction_readahead_size"; const char* LDBCommand::DELIM = " ==> "; @@ -348,7 +361,7 @@ LDBCommand::LDBCommand(const std::map& options, option_map_(options), flags_(flags), valid_cmd_line_options_(valid_cmd_line_options) { - std::map::const_iterator itr = options.find(ARG_DB); + auto itr = options.find(ARG_DB); if (itr != options.end()) { db_path_ = itr->second; } @@ -383,6 +396,9 @@ LDBCommand::LDBCommand(const std::map& options, try_load_options_ = IsFlagPresent(flags, ARG_TRY_LOAD_OPTIONS); force_consistency_checks_ = !IsFlagPresent(flags, ARG_DISABLE_CONSISTENCY_CHECKS); + enable_blob_files_ = IsFlagPresent(flags, ARG_ENABLE_BLOB_FILES); + enable_blob_garbage_collection_ = + IsFlagPresent(flags, ARG_ENABLE_BLOB_GARBAGE_COLLECTION); config_options_.ignore_unknown_options = IsFlagPresent(flags, ARG_IGNORE_UNKNOWN_OPTIONS); } @@ -508,12 +524,61 @@ std::vector LDBCommand::BuildCmdLineOptions( ARG_FIX_PREFIX_LEN, ARG_TRY_LOAD_OPTIONS, ARG_DISABLE_CONSISTENCY_CHECKS, + ARG_ENABLE_BLOB_FILES, + ARG_MIN_BLOB_SIZE, + ARG_BLOB_FILE_SIZE, + ARG_BLOB_COMPRESSION_TYPE, + ARG_ENABLE_BLOB_GARBAGE_COLLECTION, + ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF, + ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD, + ARG_BLOB_COMPACTION_READAHEAD_SIZE, ARG_IGNORE_UNKNOWN_OPTIONS, ARG_CF_NAME}; ret.insert(ret.end(), options.begin(), options.end()); return ret; } +/** + * Parses the specific double option and fills in the value. + * Returns true if the option is found. + * Returns false if the option is not found or if there is an error parsing the + * value. If there is an error, the specified exec_state is also + * updated. + */ +bool LDBCommand::ParseDoubleOption( + const std::map& /*options*/, + const std::string& option, double& value, + LDBCommandExecuteResult& exec_state) { + auto itr = option_map_.find(option); + if (itr != option_map_.end()) { +#if defined(CYGWIN) + char* str_end = nullptr; + value = std::strtod(itr->second.c_str(), &str_end); + if (str_end == itr->second.c_str()) { + exec_state = + LDBCommandExecuteResult::Failed(option + " has an invalid value."); + } else if (errno == ERANGE) { + exec_state = LDBCommandExecuteResult::Failed( + option + " has a value out-of-range."); + } else { + return true; + } +#else + try { + value = std::stod(itr->second); + return true; + } catch (const std::invalid_argument&) { + exec_state = + LDBCommandExecuteResult::Failed(option + " has an invalid value."); + } catch (const std::out_of_range&) { + exec_state = LDBCommandExecuteResult::Failed( + option + " has a value out-of-range."); + } +#endif + } + return false; +} + /** * Parses the specific integer option and fills in the value. * Returns true if the option is found. @@ -525,15 +590,23 @@ bool LDBCommand::ParseIntOption( const std::map& /*options*/, const std::string& option, int& value, LDBCommandExecuteResult& exec_state) { - std::map::const_iterator itr = - option_map_.find(option); + auto itr = option_map_.find(option); if (itr != option_map_.end()) { - try { #if defined(CYGWIN) - value = strtol(itr->second.c_str(), 0, 10); + char* str_end = nullptr; + value = strtol(itr->second.c_str(), &str_end, 10); + if (str_end == itr->second.c_str()) { + exec_state = + LDBCommandExecuteResult::Failed(option + " has an invalid value."); + } else if (errno == ERANGE) { + exec_state = LDBCommandExecuteResult::Failed( + option + " has a value out-of-range."); + } else { + return true; + } #else + try { value = std::stoi(itr->second); -#endif return true; } catch (const std::invalid_argument&) { exec_state = @@ -542,6 +615,7 @@ bool LDBCommand::ParseIntOption( exec_state = LDBCommandExecuteResult::Failed( option + " has a value out-of-range."); } +#endif } return false; } @@ -562,6 +636,51 @@ bool LDBCommand::ParseStringOption( return false; } +/** + * Parses the specified compression type and fills in the value. + * Returns true if the compression type is found. + * Returns false otherwise. + */ +bool LDBCommand::ParseCompressionTypeOption( + const std::map& /*options*/, + const std::string& option, CompressionType& value, + LDBCommandExecuteResult& exec_state) { + auto itr = option_map_.find(option); + if (itr != option_map_.end()) { + const std::string& comp = itr->second; + if (comp == "no") { + value = kNoCompression; + return true; + } else if (comp == "snappy") { + value = kSnappyCompression; + return true; + } else if (comp == "zlib") { + value = kZlibCompression; + return true; + } else if (comp == "bzip2") { + value = kBZip2Compression; + return true; + } else if (comp == "lz4") { + value = kLZ4Compression; + return true; + } else if (comp == "lz4hc") { + value = kLZ4HCCompression; + return true; + } else if (comp == "xpress") { + value = kXpressCompression; + return true; + } else if (comp == "zstd") { + value = kZSTD; + return true; + } else { + // Unknown compression. + exec_state = LDBCommandExecuteResult::Failed( + "Unknown compression algorithm: " + comp); + } + } + return false; +} + void LDBCommand::OverrideBaseOptions() { options_.create_if_missing = false; @@ -614,35 +733,86 @@ void LDBCommand::OverrideBaseCFOptions(ColumnFamilyOptions* cf_opts) { cf_opts->table_factory.reset(NewBlockBasedTableFactory(table_options)); } + cf_opts->enable_blob_files = enable_blob_files_; + + int min_blob_size; + if (ParseIntOption(option_map_, ARG_MIN_BLOB_SIZE, min_blob_size, + exec_state_)) { + if (min_blob_size >= 0) { + cf_opts->min_blob_size = min_blob_size; + } else { + exec_state_ = + LDBCommandExecuteResult::Failed(ARG_MIN_BLOB_SIZE + " must be >= 0."); + } + } + + int blob_file_size; + if (ParseIntOption(option_map_, ARG_BLOB_FILE_SIZE, blob_file_size, + exec_state_)) { + if (blob_file_size > 0) { + cf_opts->blob_file_size = blob_file_size; + } else { + exec_state_ = + LDBCommandExecuteResult::Failed(ARG_BLOB_FILE_SIZE + " must be > 0."); + } + } + + cf_opts->enable_blob_garbage_collection = enable_blob_garbage_collection_; + + double blob_garbage_collection_age_cutoff; + if (ParseDoubleOption(option_map_, ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF, + blob_garbage_collection_age_cutoff, exec_state_)) { + if (blob_garbage_collection_age_cutoff >= 0 && + blob_garbage_collection_age_cutoff <= 1) { + cf_opts->blob_garbage_collection_age_cutoff = + blob_garbage_collection_age_cutoff; + } else { + exec_state_ = LDBCommandExecuteResult::Failed( + ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF + " must be >= 0 and <= 1."); + } + } + + double blob_garbage_collection_force_threshold; + if (ParseDoubleOption(option_map_, + ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD, + blob_garbage_collection_force_threshold, exec_state_)) { + if (blob_garbage_collection_force_threshold >= 0 && + blob_garbage_collection_force_threshold <= 1) { + cf_opts->blob_garbage_collection_force_threshold = + blob_garbage_collection_force_threshold; + } else { + exec_state_ = LDBCommandExecuteResult::Failed( + ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD + + " must be >= 0 and <= 1."); + } + } + + int blob_compaction_readahead_size; + if (ParseIntOption(option_map_, ARG_BLOB_COMPACTION_READAHEAD_SIZE, + blob_compaction_readahead_size, exec_state_)) { + if (blob_compaction_readahead_size > 0) { + cf_opts->blob_compaction_readahead_size = blob_compaction_readahead_size; + } else { + exec_state_ = LDBCommandExecuteResult::Failed( + ARG_BLOB_COMPACTION_READAHEAD_SIZE + " must be > 0."); + } + } + auto itr = option_map_.find(ARG_AUTO_COMPACTION); if (itr != option_map_.end()) { cf_opts->disable_auto_compactions = !StringToBool(itr->second); } - itr = option_map_.find(ARG_COMPRESSION_TYPE); - if (itr != option_map_.end()) { - std::string comp = itr->second; - if (comp == "no") { - cf_opts->compression = kNoCompression; - } else if (comp == "snappy") { - cf_opts->compression = kSnappyCompression; - } else if (comp == "zlib") { - cf_opts->compression = kZlibCompression; - } else if (comp == "bzip2") { - cf_opts->compression = kBZip2Compression; - } else if (comp == "lz4") { - cf_opts->compression = kLZ4Compression; - } else if (comp == "lz4hc") { - cf_opts->compression = kLZ4HCCompression; - } else if (comp == "xpress") { - cf_opts->compression = kXpressCompression; - } else if (comp == "zstd") { - cf_opts->compression = kZSTD; - } else { - // Unknown compression. - exec_state_ = - LDBCommandExecuteResult::Failed("Unknown compression level: " + comp); - } + CompressionType compression_type; + if (ParseCompressionTypeOption(option_map_, ARG_COMPRESSION_TYPE, + compression_type, exec_state_)) { + cf_opts->compression = compression_type; + } + + CompressionType blob_compression_type; + if (ParseCompressionTypeOption(option_map_, ARG_BLOB_COMPRESSION_TYPE, + blob_compression_type, exec_state_)) { + cf_opts->blob_compression_type = blob_compression_type; } int compression_max_dict_bytes; @@ -793,9 +963,7 @@ bool LDBCommand::ParseKeyValue(const std::string& line, std::string* key, * appropriate error msg to stderr. */ bool LDBCommand::ValidateCmdLineOptions() { - for (std::map::const_iterator itr = - option_map_.begin(); - itr != option_map_.end(); ++itr) { + for (auto itr = option_map_.begin(); itr != option_map_.end(); ++itr) { if (std::find(valid_cmd_line_options_.begin(), valid_cmd_line_options_.end(), itr->first) == valid_cmd_line_options_.end()) { @@ -884,7 +1052,7 @@ bool LDBCommand::IsValueHex(const std::map& options, bool LDBCommand::ParseBooleanOption( const std::map& options, const std::string& option, bool default_val) { - std::map::const_iterator itr = options.find(option); + auto itr = options.find(option); if (itr != options.end()) { std::string option_val = itr->second; return StringToBool(itr->second); @@ -914,8 +1082,7 @@ CompactorCommand::CompactorCommand( ARG_VALUE_HEX, ARG_TTL})), null_from_(true), null_to_(true) { - std::map::const_iterator itr = - options.find(ARG_FROM); + auto itr = options.find(ARG_FROM); if (itr != options.end()) { null_from_ = false; from_ = itr->second; @@ -1122,8 +1289,7 @@ ManifestDumpCommand::ManifestDumpCommand( verbose_ = IsFlagPresent(flags, ARG_VERBOSE); json_ = IsFlagPresent(flags, ARG_JSON); - std::map::const_iterator itr = - options.find(ARG_PATH); + auto itr = options.find(ARG_PATH); if (itr != options.end()) { path_ = itr->second; if (path_.empty()) { @@ -1264,8 +1430,7 @@ FileChecksumDumpCommand::FileChecksumDumpCommand( : LDBCommand(options, flags, false, BuildCmdLineOptions({ARG_PATH, ARG_HEX})), path_("") { - std::map::const_iterator itr = - options.find(ARG_PATH); + auto itr = options.find(ARG_PATH); if (itr != options.end()) { path_ = itr->second; if (path_.empty()) { @@ -1540,8 +1705,7 @@ InternalDumpCommand::InternalDumpCommand( has_to_ = ParseStringOption(options, ARG_TO, &to_); ParseIntOption(options, ARG_MAX_KEYS, max_keys_, exec_state_); - std::map::const_iterator itr = - options.find(ARG_COUNT_DELIM); + auto itr = options.find(ARG_COUNT_DELIM); if (itr != options.end()) { delim_ = itr->second; count_delim_ = true; @@ -1676,8 +1840,7 @@ DBDumperCommand::DBDumperCommand( count_only_(false), count_delim_(false), print_stats_(false) { - std::map::const_iterator itr = - options.find(ARG_FROM); + auto itr = options.find(ARG_FROM); if (itr != options.end()) { null_from_ = false; from_ = itr->second; @@ -2442,8 +2605,7 @@ WALDumperCommand::WALDumperCommand( is_write_committed_(false) { wal_file_.clear(); - std::map::const_iterator itr = - options.find(ARG_WAL_FILE); + auto itr = options.find(ARG_WAL_FILE); if (itr != options.end()) { wal_file_ = itr->second; } @@ -2662,8 +2824,7 @@ ScanCommand::ScanCommand(const std::vector& /*params*/, end_key_specified_(false), max_keys_scanned_(-1), no_value_(false) { - std::map::const_iterator itr = - options.find(ARG_FROM); + auto itr = options.find(ARG_FROM); if (itr != options.end()) { start_key_ = itr->second; if (is_key_hex_) { @@ -3750,8 +3911,7 @@ ListFileRangeDeletesCommand::ListFileRangeDeletesCommand( const std::map& options, const std::vector& flags) : LDBCommand(options, flags, true, BuildCmdLineOptions({ARG_MAX_KEYS})) { - std::map::const_iterator itr = - options.find(ARG_MAX_KEYS); + auto itr = options.find(ARG_MAX_KEYS); if (itr != options.end()) { try { #if defined(CYGWIN) diff --git a/tools/ldb_test.py b/tools/ldb_test.py index c94d9efaff..f10b6a22f8 100644 --- a/tools/ldb_test.py +++ b/tools/ldb_test.py @@ -165,6 +165,32 @@ class LDBTestCase(unittest.TestCase): self.assertRunFAIL("batchput k1") self.assertRunFAIL("batchput k1 v1 k2") + def testBlobBatchPut(self): + print("Running testBlobBatchPut...") + + dbPath = os.path.join(self.TMP_DIR, self.DB_NAME) + self.assertRunOK("batchput x1 y1 --create_if_missing --enable_blob_files", "OK") + self.assertRunOK("scan", "x1 : y1") + self.assertRunOK("batchput --enable_blob_files x2 y2 x3 y3 \"x4 abc\" \"y4 xyz\"", "OK") + self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 abc : y4 xyz") + + blob_files = self.getBlobFiles(dbPath) + self.assertTrue(len(blob_files) >= 1) + + def testBlobPut(self): + print("Running testBlobPut...") + + dbPath = os.path.join(self.TMP_DIR, self.DB_NAME) + self.assertRunOK("put --create_if_missing --enable_blob_files x1 y1", "OK") + self.assertRunOK("get x1", "y1") + self.assertRunOK("put --enable_blob_files x2 y2", "OK") + self.assertRunOK("get x1", "y1") + self.assertRunOK("get x2", "y2") + self.assertRunFAIL("get x3") + + blob_files = self.getBlobFiles(dbPath) + self.assertTrue(len(blob_files) >= 1) + def testCountDelimDump(self): print("Running testCountDelimDump...") self.assertRunOK("batchput x.1 x1 --create_if_missing", "OK") @@ -340,6 +366,21 @@ class LDBTestCase(unittest.TestCase): self.assertFalse(self.dumpDb( "--db=%s --create_if_missing" % origDbPath, dumpFilePath)) + # Dump and load with BlobDB enabled + blobParams = " ".join(["--enable_blob_files", "--min_blob_size=1", + "--blob_file_size=2097152"]) + dumpFilePath = os.path.join(self.TMP_DIR, "dump9") + loadedDbPath = os.path.join(self.TMP_DIR, "loaded_from_dump9") + self.assertTrue(self.dumpDb( + "--db=%s" % (origDbPath), dumpFilePath)) + self.assertTrue(self.loadDb( + "--db=%s %s --create_if_missing --disable_wal" % (loadedDbPath, blobParams), + dumpFilePath)) + self.assertRunOKFull("scan --db=%s" % loadedDbPath, + "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4") + blob_files = self.getBlobFiles(loadedDbPath) + self.assertTrue(len(blob_files) >= 1) + def testIDumpBasics(self): print("Running testIDumpBasics...") self.assertRunOK("put a val --create_if_missing", "OK") @@ -549,6 +590,9 @@ class LDBTestCase(unittest.TestCase): def getWALFiles(self, directory): return glob.glob(directory + "/*.log") + def getBlobFiles(self, directory): + return glob.glob(directory + "/*.blob") + def copyManifests(self, src, dest): return 0 == run_err_null("cp " + src + " " + dest) diff --git a/tools/ldb_tool.cc b/tools/ldb_tool.cc index 169fad6665..dccfaa6f5c 100644 --- a/tools/ldb_tool.cc +++ b/tools/ldb_tool.cc @@ -68,6 +68,20 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options, ret.append(" --" + LDBCommand::ARG_WRITE_BUFFER_SIZE + "=\n"); ret.append(" --" + LDBCommand::ARG_FILE_SIZE + "=\n"); + ret.append(" --" + LDBCommand::ARG_ENABLE_BLOB_FILES + + " : Enable key-value separation using BlobDB\n"); + ret.append(" --" + LDBCommand::ARG_MIN_BLOB_SIZE + "=\n"); + ret.append(" --" + LDBCommand::ARG_BLOB_FILE_SIZE + "=\n"); + ret.append(" --" + LDBCommand::ARG_BLOB_COMPRESSION_TYPE + + "=\n"); + ret.append(" --" + LDBCommand::ARG_ENABLE_BLOB_GARBAGE_COLLECTION + + " : Enable blob garbage collection\n"); + ret.append(" --" + LDBCommand::ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF + + "=\n"); + ret.append(" --" + LDBCommand::ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD + + "=\n"); + ret.append(" --" + LDBCommand::ARG_BLOB_COMPACTION_READAHEAD_SIZE + + "=\n"); ret.append("\n\n"); ret.append("Data Access Commands:\n");