Add support for BlobDB to ldb (#9630)

Summary:
Add the configuration options and help messages of BlobDB to `ldb`

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9630

Test Plan: `python ./tools/ldb_test.py`

Reviewed By: ltamasi

Differential Revision: D34443176

Pulled By: changneng

fbshipit-source-id: 5b3f185cdfc2561e06dd37215c7edfbca07dbe80
This commit is contained in:
Changneng Chen 2022-02-25 23:13:11 -08:00 committed by Facebook GitHub Bot
parent 87a8b3c8af
commit 9ed96703d1
5 changed files with 290 additions and 50 deletions

View File

@ -2,6 +2,7 @@
## Unreleased
### New Features
* Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp.
* Added BlobDB options to `ldb`
### Bug Fixes
* * Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)

View File

@ -61,6 +61,14 @@ class LDBCommand {
static const std::string ARG_CREATE_IF_MISSING;
static const std::string ARG_NO_VALUE;
static const std::string ARG_DISABLE_CONSISTENCY_CHECKS;
static const std::string ARG_ENABLE_BLOB_FILES;
static const std::string ARG_MIN_BLOB_SIZE;
static const std::string ARG_BLOB_FILE_SIZE;
static const std::string ARG_BLOB_COMPRESSION_TYPE;
static const std::string ARG_ENABLE_BLOB_GARBAGE_COLLECTION;
static const std::string ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF;
static const std::string ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD;
static const std::string ARG_BLOB_COMPACTION_READAHEAD_SIZE;
struct ParsedParams {
std::string cmd;
@ -173,6 +181,10 @@ class LDBCommand {
// The value passed to options.force_consistency_checks.
bool force_consistency_checks_;
bool enable_blob_files_;
bool enable_blob_garbage_collection_;
bool create_if_missing_;
/**
@ -233,9 +245,18 @@ class LDBCommand {
const std::string& option, int& value,
LDBCommandExecuteResult& exec_state);
bool ParseDoubleOption(const std::map<std::string, std::string>& options,
const std::string& option, double& value,
LDBCommandExecuteResult& exec_state);
bool ParseStringOption(const std::map<std::string, std::string>& options,
const std::string& option, std::string* value);
bool ParseCompressionTypeOption(
const std::map<std::string, std::string>& options,
const std::string& option, CompressionType& value,
LDBCommandExecuteResult& exec_state);
/**
* Returns the value of the specified option as a boolean.
* default_val is used if the option is not found in options.

View File

@ -82,6 +82,19 @@ const std::string LDBCommand::ARG_WRITE_BUFFER_SIZE = "write_buffer_size";
const std::string LDBCommand::ARG_FILE_SIZE = "file_size";
const std::string LDBCommand::ARG_CREATE_IF_MISSING = "create_if_missing";
const std::string LDBCommand::ARG_NO_VALUE = "no_value";
const std::string LDBCommand::ARG_ENABLE_BLOB_FILES = "enable_blob_files";
const std::string LDBCommand::ARG_MIN_BLOB_SIZE = "min_blob_size";
const std::string LDBCommand::ARG_BLOB_FILE_SIZE = "blob_file_size";
const std::string LDBCommand::ARG_BLOB_COMPRESSION_TYPE =
"blob_compression_type";
const std::string LDBCommand::ARG_ENABLE_BLOB_GARBAGE_COLLECTION =
"enable_blob_garbage_collection";
const std::string LDBCommand::ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF =
"blob_garbage_collection_age_cutoff";
const std::string LDBCommand::ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD =
"blob_garbage_collection_force_threshold";
const std::string LDBCommand::ARG_BLOB_COMPACTION_READAHEAD_SIZE =
"blob_compaction_readahead_size";
const char* LDBCommand::DELIM = " ==> ";
@ -348,7 +361,7 @@ LDBCommand::LDBCommand(const std::map<std::string, std::string>& options,
option_map_(options),
flags_(flags),
valid_cmd_line_options_(valid_cmd_line_options) {
std::map<std::string, std::string>::const_iterator itr = options.find(ARG_DB);
auto itr = options.find(ARG_DB);
if (itr != options.end()) {
db_path_ = itr->second;
}
@ -383,6 +396,9 @@ LDBCommand::LDBCommand(const std::map<std::string, std::string>& options,
try_load_options_ = IsFlagPresent(flags, ARG_TRY_LOAD_OPTIONS);
force_consistency_checks_ =
!IsFlagPresent(flags, ARG_DISABLE_CONSISTENCY_CHECKS);
enable_blob_files_ = IsFlagPresent(flags, ARG_ENABLE_BLOB_FILES);
enable_blob_garbage_collection_ =
IsFlagPresent(flags, ARG_ENABLE_BLOB_GARBAGE_COLLECTION);
config_options_.ignore_unknown_options =
IsFlagPresent(flags, ARG_IGNORE_UNKNOWN_OPTIONS);
}
@ -508,12 +524,61 @@ std::vector<std::string> LDBCommand::BuildCmdLineOptions(
ARG_FIX_PREFIX_LEN,
ARG_TRY_LOAD_OPTIONS,
ARG_DISABLE_CONSISTENCY_CHECKS,
ARG_ENABLE_BLOB_FILES,
ARG_MIN_BLOB_SIZE,
ARG_BLOB_FILE_SIZE,
ARG_BLOB_COMPRESSION_TYPE,
ARG_ENABLE_BLOB_GARBAGE_COLLECTION,
ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF,
ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD,
ARG_BLOB_COMPACTION_READAHEAD_SIZE,
ARG_IGNORE_UNKNOWN_OPTIONS,
ARG_CF_NAME};
ret.insert(ret.end(), options.begin(), options.end());
return ret;
}
/**
* Parses the specific double option and fills in the value.
* Returns true if the option is found.
* Returns false if the option is not found or if there is an error parsing the
* value. If there is an error, the specified exec_state is also
* updated.
*/
bool LDBCommand::ParseDoubleOption(
const std::map<std::string, std::string>& /*options*/,
const std::string& option, double& value,
LDBCommandExecuteResult& exec_state) {
auto itr = option_map_.find(option);
if (itr != option_map_.end()) {
#if defined(CYGWIN)
char* str_end = nullptr;
value = std::strtod(itr->second.c_str(), &str_end);
if (str_end == itr->second.c_str()) {
exec_state =
LDBCommandExecuteResult::Failed(option + " has an invalid value.");
} else if (errno == ERANGE) {
exec_state = LDBCommandExecuteResult::Failed(
option + " has a value out-of-range.");
} else {
return true;
}
#else
try {
value = std::stod(itr->second);
return true;
} catch (const std::invalid_argument&) {
exec_state =
LDBCommandExecuteResult::Failed(option + " has an invalid value.");
} catch (const std::out_of_range&) {
exec_state = LDBCommandExecuteResult::Failed(
option + " has a value out-of-range.");
}
#endif
}
return false;
}
/**
* Parses the specific integer option and fills in the value.
* Returns true if the option is found.
@ -525,15 +590,23 @@ bool LDBCommand::ParseIntOption(
const std::map<std::string, std::string>& /*options*/,
const std::string& option, int& value,
LDBCommandExecuteResult& exec_state) {
std::map<std::string, std::string>::const_iterator itr =
option_map_.find(option);
auto itr = option_map_.find(option);
if (itr != option_map_.end()) {
try {
#if defined(CYGWIN)
value = strtol(itr->second.c_str(), 0, 10);
char* str_end = nullptr;
value = strtol(itr->second.c_str(), &str_end, 10);
if (str_end == itr->second.c_str()) {
exec_state =
LDBCommandExecuteResult::Failed(option + " has an invalid value.");
} else if (errno == ERANGE) {
exec_state = LDBCommandExecuteResult::Failed(
option + " has a value out-of-range.");
} else {
return true;
}
#else
try {
value = std::stoi(itr->second);
#endif
return true;
} catch (const std::invalid_argument&) {
exec_state =
@ -542,6 +615,7 @@ bool LDBCommand::ParseIntOption(
exec_state = LDBCommandExecuteResult::Failed(
option + " has a value out-of-range.");
}
#endif
}
return false;
}
@ -562,6 +636,51 @@ bool LDBCommand::ParseStringOption(
return false;
}
/**
* Parses the specified compression type and fills in the value.
* Returns true if the compression type is found.
* Returns false otherwise.
*/
bool LDBCommand::ParseCompressionTypeOption(
const std::map<std::string, std::string>& /*options*/,
const std::string& option, CompressionType& value,
LDBCommandExecuteResult& exec_state) {
auto itr = option_map_.find(option);
if (itr != option_map_.end()) {
const std::string& comp = itr->second;
if (comp == "no") {
value = kNoCompression;
return true;
} else if (comp == "snappy") {
value = kSnappyCompression;
return true;
} else if (comp == "zlib") {
value = kZlibCompression;
return true;
} else if (comp == "bzip2") {
value = kBZip2Compression;
return true;
} else if (comp == "lz4") {
value = kLZ4Compression;
return true;
} else if (comp == "lz4hc") {
value = kLZ4HCCompression;
return true;
} else if (comp == "xpress") {
value = kXpressCompression;
return true;
} else if (comp == "zstd") {
value = kZSTD;
return true;
} else {
// Unknown compression.
exec_state = LDBCommandExecuteResult::Failed(
"Unknown compression algorithm: " + comp);
}
}
return false;
}
void LDBCommand::OverrideBaseOptions() {
options_.create_if_missing = false;
@ -614,35 +733,86 @@ void LDBCommand::OverrideBaseCFOptions(ColumnFamilyOptions* cf_opts) {
cf_opts->table_factory.reset(NewBlockBasedTableFactory(table_options));
}
cf_opts->enable_blob_files = enable_blob_files_;
int min_blob_size;
if (ParseIntOption(option_map_, ARG_MIN_BLOB_SIZE, min_blob_size,
exec_state_)) {
if (min_blob_size >= 0) {
cf_opts->min_blob_size = min_blob_size;
} else {
exec_state_ =
LDBCommandExecuteResult::Failed(ARG_MIN_BLOB_SIZE + " must be >= 0.");
}
}
int blob_file_size;
if (ParseIntOption(option_map_, ARG_BLOB_FILE_SIZE, blob_file_size,
exec_state_)) {
if (blob_file_size > 0) {
cf_opts->blob_file_size = blob_file_size;
} else {
exec_state_ =
LDBCommandExecuteResult::Failed(ARG_BLOB_FILE_SIZE + " must be > 0.");
}
}
cf_opts->enable_blob_garbage_collection = enable_blob_garbage_collection_;
double blob_garbage_collection_age_cutoff;
if (ParseDoubleOption(option_map_, ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF,
blob_garbage_collection_age_cutoff, exec_state_)) {
if (blob_garbage_collection_age_cutoff >= 0 &&
blob_garbage_collection_age_cutoff <= 1) {
cf_opts->blob_garbage_collection_age_cutoff =
blob_garbage_collection_age_cutoff;
} else {
exec_state_ = LDBCommandExecuteResult::Failed(
ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF + " must be >= 0 and <= 1.");
}
}
double blob_garbage_collection_force_threshold;
if (ParseDoubleOption(option_map_,
ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD,
blob_garbage_collection_force_threshold, exec_state_)) {
if (blob_garbage_collection_force_threshold >= 0 &&
blob_garbage_collection_force_threshold <= 1) {
cf_opts->blob_garbage_collection_force_threshold =
blob_garbage_collection_force_threshold;
} else {
exec_state_ = LDBCommandExecuteResult::Failed(
ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD +
" must be >= 0 and <= 1.");
}
}
int blob_compaction_readahead_size;
if (ParseIntOption(option_map_, ARG_BLOB_COMPACTION_READAHEAD_SIZE,
blob_compaction_readahead_size, exec_state_)) {
if (blob_compaction_readahead_size > 0) {
cf_opts->blob_compaction_readahead_size = blob_compaction_readahead_size;
} else {
exec_state_ = LDBCommandExecuteResult::Failed(
ARG_BLOB_COMPACTION_READAHEAD_SIZE + " must be > 0.");
}
}
auto itr = option_map_.find(ARG_AUTO_COMPACTION);
if (itr != option_map_.end()) {
cf_opts->disable_auto_compactions = !StringToBool(itr->second);
}
itr = option_map_.find(ARG_COMPRESSION_TYPE);
if (itr != option_map_.end()) {
std::string comp = itr->second;
if (comp == "no") {
cf_opts->compression = kNoCompression;
} else if (comp == "snappy") {
cf_opts->compression = kSnappyCompression;
} else if (comp == "zlib") {
cf_opts->compression = kZlibCompression;
} else if (comp == "bzip2") {
cf_opts->compression = kBZip2Compression;
} else if (comp == "lz4") {
cf_opts->compression = kLZ4Compression;
} else if (comp == "lz4hc") {
cf_opts->compression = kLZ4HCCompression;
} else if (comp == "xpress") {
cf_opts->compression = kXpressCompression;
} else if (comp == "zstd") {
cf_opts->compression = kZSTD;
} else {
// Unknown compression.
exec_state_ =
LDBCommandExecuteResult::Failed("Unknown compression level: " + comp);
CompressionType compression_type;
if (ParseCompressionTypeOption(option_map_, ARG_COMPRESSION_TYPE,
compression_type, exec_state_)) {
cf_opts->compression = compression_type;
}
CompressionType blob_compression_type;
if (ParseCompressionTypeOption(option_map_, ARG_BLOB_COMPRESSION_TYPE,
blob_compression_type, exec_state_)) {
cf_opts->blob_compression_type = blob_compression_type;
}
int compression_max_dict_bytes;
@ -793,9 +963,7 @@ bool LDBCommand::ParseKeyValue(const std::string& line, std::string* key,
* appropriate error msg to stderr.
*/
bool LDBCommand::ValidateCmdLineOptions() {
for (std::map<std::string, std::string>::const_iterator itr =
option_map_.begin();
itr != option_map_.end(); ++itr) {
for (auto itr = option_map_.begin(); itr != option_map_.end(); ++itr) {
if (std::find(valid_cmd_line_options_.begin(),
valid_cmd_line_options_.end(),
itr->first) == valid_cmd_line_options_.end()) {
@ -884,7 +1052,7 @@ bool LDBCommand::IsValueHex(const std::map<std::string, std::string>& options,
bool LDBCommand::ParseBooleanOption(
const std::map<std::string, std::string>& options,
const std::string& option, bool default_val) {
std::map<std::string, std::string>::const_iterator itr = options.find(option);
auto itr = options.find(option);
if (itr != options.end()) {
std::string option_val = itr->second;
return StringToBool(itr->second);
@ -914,8 +1082,7 @@ CompactorCommand::CompactorCommand(
ARG_VALUE_HEX, ARG_TTL})),
null_from_(true),
null_to_(true) {
std::map<std::string, std::string>::const_iterator itr =
options.find(ARG_FROM);
auto itr = options.find(ARG_FROM);
if (itr != options.end()) {
null_from_ = false;
from_ = itr->second;
@ -1122,8 +1289,7 @@ ManifestDumpCommand::ManifestDumpCommand(
verbose_ = IsFlagPresent(flags, ARG_VERBOSE);
json_ = IsFlagPresent(flags, ARG_JSON);
std::map<std::string, std::string>::const_iterator itr =
options.find(ARG_PATH);
auto itr = options.find(ARG_PATH);
if (itr != options.end()) {
path_ = itr->second;
if (path_.empty()) {
@ -1264,8 +1430,7 @@ FileChecksumDumpCommand::FileChecksumDumpCommand(
: LDBCommand(options, flags, false,
BuildCmdLineOptions({ARG_PATH, ARG_HEX})),
path_("") {
std::map<std::string, std::string>::const_iterator itr =
options.find(ARG_PATH);
auto itr = options.find(ARG_PATH);
if (itr != options.end()) {
path_ = itr->second;
if (path_.empty()) {
@ -1540,8 +1705,7 @@ InternalDumpCommand::InternalDumpCommand(
has_to_ = ParseStringOption(options, ARG_TO, &to_);
ParseIntOption(options, ARG_MAX_KEYS, max_keys_, exec_state_);
std::map<std::string, std::string>::const_iterator itr =
options.find(ARG_COUNT_DELIM);
auto itr = options.find(ARG_COUNT_DELIM);
if (itr != options.end()) {
delim_ = itr->second;
count_delim_ = true;
@ -1676,8 +1840,7 @@ DBDumperCommand::DBDumperCommand(
count_only_(false),
count_delim_(false),
print_stats_(false) {
std::map<std::string, std::string>::const_iterator itr =
options.find(ARG_FROM);
auto itr = options.find(ARG_FROM);
if (itr != options.end()) {
null_from_ = false;
from_ = itr->second;
@ -2442,8 +2605,7 @@ WALDumperCommand::WALDumperCommand(
is_write_committed_(false) {
wal_file_.clear();
std::map<std::string, std::string>::const_iterator itr =
options.find(ARG_WAL_FILE);
auto itr = options.find(ARG_WAL_FILE);
if (itr != options.end()) {
wal_file_ = itr->second;
}
@ -2662,8 +2824,7 @@ ScanCommand::ScanCommand(const std::vector<std::string>& /*params*/,
end_key_specified_(false),
max_keys_scanned_(-1),
no_value_(false) {
std::map<std::string, std::string>::const_iterator itr =
options.find(ARG_FROM);
auto itr = options.find(ARG_FROM);
if (itr != options.end()) {
start_key_ = itr->second;
if (is_key_hex_) {
@ -3750,8 +3911,7 @@ ListFileRangeDeletesCommand::ListFileRangeDeletesCommand(
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(options, flags, true, BuildCmdLineOptions({ARG_MAX_KEYS})) {
std::map<std::string, std::string>::const_iterator itr =
options.find(ARG_MAX_KEYS);
auto itr = options.find(ARG_MAX_KEYS);
if (itr != options.end()) {
try {
#if defined(CYGWIN)

View File

@ -165,6 +165,32 @@ class LDBTestCase(unittest.TestCase):
self.assertRunFAIL("batchput k1")
self.assertRunFAIL("batchput k1 v1 k2")
def testBlobBatchPut(self):
print("Running testBlobBatchPut...")
dbPath = os.path.join(self.TMP_DIR, self.DB_NAME)
self.assertRunOK("batchput x1 y1 --create_if_missing --enable_blob_files", "OK")
self.assertRunOK("scan", "x1 : y1")
self.assertRunOK("batchput --enable_blob_files x2 y2 x3 y3 \"x4 abc\" \"y4 xyz\"", "OK")
self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 abc : y4 xyz")
blob_files = self.getBlobFiles(dbPath)
self.assertTrue(len(blob_files) >= 1)
def testBlobPut(self):
print("Running testBlobPut...")
dbPath = os.path.join(self.TMP_DIR, self.DB_NAME)
self.assertRunOK("put --create_if_missing --enable_blob_files x1 y1", "OK")
self.assertRunOK("get x1", "y1")
self.assertRunOK("put --enable_blob_files x2 y2", "OK")
self.assertRunOK("get x1", "y1")
self.assertRunOK("get x2", "y2")
self.assertRunFAIL("get x3")
blob_files = self.getBlobFiles(dbPath)
self.assertTrue(len(blob_files) >= 1)
def testCountDelimDump(self):
print("Running testCountDelimDump...")
self.assertRunOK("batchput x.1 x1 --create_if_missing", "OK")
@ -340,6 +366,21 @@ class LDBTestCase(unittest.TestCase):
self.assertFalse(self.dumpDb(
"--db=%s --create_if_missing" % origDbPath, dumpFilePath))
# Dump and load with BlobDB enabled
blobParams = " ".join(["--enable_blob_files", "--min_blob_size=1",
"--blob_file_size=2097152"])
dumpFilePath = os.path.join(self.TMP_DIR, "dump9")
loadedDbPath = os.path.join(self.TMP_DIR, "loaded_from_dump9")
self.assertTrue(self.dumpDb(
"--db=%s" % (origDbPath), dumpFilePath))
self.assertTrue(self.loadDb(
"--db=%s %s --create_if_missing --disable_wal" % (loadedDbPath, blobParams),
dumpFilePath))
self.assertRunOKFull("scan --db=%s" % loadedDbPath,
"x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")
blob_files = self.getBlobFiles(loadedDbPath)
self.assertTrue(len(blob_files) >= 1)
def testIDumpBasics(self):
print("Running testIDumpBasics...")
self.assertRunOK("put a val --create_if_missing", "OK")
@ -549,6 +590,9 @@ class LDBTestCase(unittest.TestCase):
def getWALFiles(self, directory):
return glob.glob(directory + "/*.log")
def getBlobFiles(self, directory):
return glob.glob(directory + "/*.blob")
def copyManifests(self, src, dest):
return 0 == run_err_null("cp " + src + " " + dest)

View File

@ -68,6 +68,20 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options,
ret.append(" --" + LDBCommand::ARG_WRITE_BUFFER_SIZE +
"=<int,e.g.:4194304>\n");
ret.append(" --" + LDBCommand::ARG_FILE_SIZE + "=<int,e.g.:2097152>\n");
ret.append(" --" + LDBCommand::ARG_ENABLE_BLOB_FILES +
" : Enable key-value separation using BlobDB\n");
ret.append(" --" + LDBCommand::ARG_MIN_BLOB_SIZE + "=<int,e.g.:2097152>\n");
ret.append(" --" + LDBCommand::ARG_BLOB_FILE_SIZE + "=<int,e.g.:2097152>\n");
ret.append(" --" + LDBCommand::ARG_BLOB_COMPRESSION_TYPE +
"=<no|snappy|zlib|bzip2|lz4|lz4hc|xpress|zstd>\n");
ret.append(" --" + LDBCommand::ARG_ENABLE_BLOB_GARBAGE_COLLECTION +
" : Enable blob garbage collection\n");
ret.append(" --" + LDBCommand::ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF +
"=<double,e.g.:0.25>\n");
ret.append(" --" + LDBCommand::ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD +
"=<double,e.g.:0.25>\n");
ret.append(" --" + LDBCommand::ARG_BLOB_COMPACTION_READAHEAD_SIZE +
"=<int,e.g.:2097152>\n");
ret.append("\n\n");
ret.append("Data Access Commands:\n");