Add LDB command and option for follower instances (#12682)

Summary:
Add the `--leader_path` option to specify the directory path of the leader for a follower RocksDB instance. This PR also adds a `count` command to the repl shell. While not specific to followers, it is useful for testing purposes.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12682

Reviewed By: jowlyzhang

Differential Revision: D57642296

Pulled By: anand1976

fbshipit-source-id: 53767d496ecadc363ff92cd958b8e15a7bf3b151
This commit is contained in:
anand76 2024-05-28 23:21:32 -07:00 committed by Facebook GitHub Bot
parent 5cec4bbcab
commit 9cc6168c98
7 changed files with 131 additions and 38 deletions

View File

@ -691,6 +691,7 @@ set(SOURCES
db/db_impl/db_impl_write.cc db/db_impl/db_impl_write.cc
db/db_impl/db_impl_compaction_flush.cc db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_files.cc db/db_impl/db_impl_files.cc
db/db_impl/db_impl_follower.cc
db/db_impl/db_impl_open.cc db/db_impl/db_impl_open.cc
db/db_impl/db_impl_debug.cc db/db_impl/db_impl_debug.cc
db/db_impl/db_impl_experimental.cc db/db_impl/db_impl_experimental.cc
@ -748,6 +749,7 @@ set(SOURCES
env/env_encryption.cc env/env_encryption.cc
env/file_system.cc env/file_system.cc
env/file_system_tracer.cc env/file_system_tracer.cc
env/fs_on_demand.cc
env/fs_remap.cc env/fs_remap.cc
env/mock_env.cc env/mock_env.cc
env/unique_id_gen.cc env/unique_id_gen.cc
@ -1037,10 +1039,8 @@ endif()
else() else()
list(APPEND SOURCES list(APPEND SOURCES
db/db_impl/db_impl_follower.cc
port/port_posix.cc port/port_posix.cc
env/env_posix.cc env/env_posix.cc
env/fs_on_demand.cc
env/fs_posix.cc env/fs_posix.cc
env/io_posix.cc) env/io_posix.cc)
endif() endif()

View File

@ -5,6 +5,7 @@
#include "db/db_impl/db_impl_follower.h" #include "db/db_impl/db_impl_follower.h"
#include <algorithm>
#include <cinttypes> #include <cinttypes>
#include "db/arena_wrapped_db_iter.h" #include "db/arena_wrapped_db_iter.h"

View File

@ -35,6 +35,7 @@ class LDBCommand {
static const std::string ARG_DB; static const std::string ARG_DB;
static const std::string ARG_PATH; static const std::string ARG_PATH;
static const std::string ARG_SECONDARY_PATH; static const std::string ARG_SECONDARY_PATH;
static const std::string ARG_LEADER_PATH;
static const std::string ARG_HEX; static const std::string ARG_HEX;
static const std::string ARG_KEY_HEX; static const std::string ARG_KEY_HEX;
static const std::string ARG_VALUE_HEX; static const std::string ARG_VALUE_HEX;
@ -83,6 +84,10 @@ class LDBCommand {
static LDBCommand* SelectCommand(const ParsedParams& parsed_parms); static LDBCommand* SelectCommand(const ParsedParams& parsed_parms);
static void ParseSingleParam(const std::string& param,
ParsedParams& parsed_params,
std::vector<std::string>& cmd_tokens);
static LDBCommand* InitFromCmdLineArgs( static LDBCommand* InitFromCmdLineArgs(
const std::vector<std::string>& args, const Options& options, const std::vector<std::string>& args, const Options& options,
const LDBOptions& ldb_options, const LDBOptions& ldb_options,
@ -156,6 +161,7 @@ class LDBCommand {
// with this secondary path. When running against a database opened by // with this secondary path. When running against a database opened by
// another process, ldb wll leave the source directory completely intact. // another process, ldb wll leave the source directory completely intact.
std::string secondary_path_; std::string secondary_path_;
std::string leader_path_;
std::string column_family_name_; std::string column_family_name_;
DB* db_; DB* db_;
DBWithTTL* db_ttl_; DBWithTTL* db_ttl_;

View File

@ -60,6 +60,7 @@ const std::string LDBCommand::ARG_FS_URI = "fs_uri";
const std::string LDBCommand::ARG_DB = "db"; const std::string LDBCommand::ARG_DB = "db";
const std::string LDBCommand::ARG_PATH = "path"; const std::string LDBCommand::ARG_PATH = "path";
const std::string LDBCommand::ARG_SECONDARY_PATH = "secondary_path"; const std::string LDBCommand::ARG_SECONDARY_PATH = "secondary_path";
const std::string LDBCommand::ARG_LEADER_PATH = "leader_path";
const std::string LDBCommand::ARG_HEX = "hex"; const std::string LDBCommand::ARG_HEX = "hex";
const std::string LDBCommand::ARG_KEY_HEX = "key_hex"; const std::string LDBCommand::ARG_KEY_HEX = "key_hex";
const std::string LDBCommand::ARG_VALUE_HEX = "value_hex"; const std::string LDBCommand::ARG_VALUE_HEX = "value_hex";
@ -139,6 +140,32 @@ LDBCommand* LDBCommand::InitFromCmdLineArgs(
SelectCommand); SelectCommand);
} }
void LDBCommand::ParseSingleParam(const std::string& param,
ParsedParams& parsed_params,
std::vector<std::string>& cmd_tokens) {
const std::string OPTION_PREFIX = "--";
if (param[0] == '-' && param[1] == '-') {
std::vector<std::string> splits = StringSplit(param, '=');
// --option_name=option_value
if (splits.size() == 2) {
std::string optionKey = splits[0].substr(OPTION_PREFIX.size());
parsed_params.option_map[optionKey] = splits[1];
} else if (splits.size() == 1) {
// --flag_name
std::string optionKey = splits[0].substr(OPTION_PREFIX.size());
parsed_params.flags.push_back(optionKey);
} else {
// --option_name=option_value, option_value contains '='
std::string optionKey = splits[0].substr(OPTION_PREFIX.size());
parsed_params.option_map[optionKey] =
param.substr(splits[0].length() + 1);
}
} else {
cmd_tokens.push_back(param);
}
}
/** /**
* Parse the command-line arguments and create the appropriate LDBCommand2 * Parse the command-line arguments and create the appropriate LDBCommand2
* instance. * instance.
@ -165,28 +192,8 @@ LDBCommand* LDBCommand::InitFromCmdLineArgs(
// and their parameters. For eg: put key1 value1 go into this vector. // and their parameters. For eg: put key1 value1 go into this vector.
std::vector<std::string> cmdTokens; std::vector<std::string> cmdTokens;
const std::string OPTION_PREFIX = "--";
for (const auto& arg : args) { for (const auto& arg : args) {
if (arg[0] == '-' && arg[1] == '-') { ParseSingleParam(arg, parsed_params, cmdTokens);
std::vector<std::string> splits = StringSplit(arg, '=');
// --option_name=option_value
if (splits.size() == 2) {
std::string optionKey = splits[0].substr(OPTION_PREFIX.size());
parsed_params.option_map[optionKey] = splits[1];
} else if (splits.size() == 1) {
// --flag_name
std::string optionKey = splits[0].substr(OPTION_PREFIX.size());
parsed_params.flags.push_back(optionKey);
} else {
// --option_name=option_value, option_value contains '='
std::string optionKey = splits[0].substr(OPTION_PREFIX.size());
parsed_params.option_map[optionKey] =
arg.substr(splits[0].length() + 1);
}
} else {
cmdTokens.push_back(arg);
}
} }
if (cmdTokens.size() < 1) { if (cmdTokens.size() < 1) {
@ -429,6 +436,12 @@ LDBCommand::LDBCommand(const std::map<std::string, std::string>& options,
secondary_path_ = itr->second; secondary_path_ = itr->second;
} }
itr = options.find(ARG_LEADER_PATH);
leader_path_ = "";
if (itr != options.end()) {
leader_path_ = itr->second;
}
is_key_hex_ = IsKeyHex(options, flags); is_key_hex_ = IsKeyHex(options, flags);
is_value_hex_ = IsValueHex(options, flags); is_value_hex_ = IsValueHex(options, flags);
is_db_ttl_ = IsFlagPresent(flags, ARG_TTL); is_db_ttl_ = IsFlagPresent(flags, ARG_TTL);
@ -461,9 +474,9 @@ void LDBCommand::OpenDB() {
exec_state_ = LDBCommandExecuteResult::Failed( exec_state_ = LDBCommandExecuteResult::Failed(
"ldb doesn't support TTL DB with multiple column families"); "ldb doesn't support TTL DB with multiple column families");
} }
if (!secondary_path_.empty()) { if (!secondary_path_.empty() || !leader_path_.empty()) {
exec_state_ = LDBCommandExecuteResult::Failed( exec_state_ = LDBCommandExecuteResult::Failed(
"Open as secondary is not supported for TTL DB yet."); "Open as secondary or follower is not supported for TTL DB yet.");
} }
if (is_read_only_) { if (is_read_only_) {
st = DBWithTTL::Open(options_, db_path_, &db_ttl_, 0, true); st = DBWithTTL::Open(options_, db_path_, &db_ttl_, 0, true);
@ -472,7 +485,11 @@ void LDBCommand::OpenDB() {
} }
db_ = db_ttl_; db_ = db_ttl_;
} else { } else {
if (is_read_only_ && secondary_path_.empty()) { if (!secondary_path_.empty() && !leader_path_.empty()) {
exec_state_ = LDBCommandExecuteResult::Failed(
"Cannot provide both secondary and leader paths");
}
if (is_read_only_ && secondary_path_.empty() && leader_path_.empty()) {
if (column_families_.empty()) { if (column_families_.empty()) {
st = DB::OpenForReadOnly(options_, db_path_, &db_); st = DB::OpenForReadOnly(options_, db_path_, &db_);
} else { } else {
@ -481,18 +498,27 @@ void LDBCommand::OpenDB() {
} }
} else { } else {
if (column_families_.empty()) { if (column_families_.empty()) {
if (secondary_path_.empty()) { if (secondary_path_.empty() && leader_path_.empty()) {
st = DB::Open(options_, db_path_, &db_); st = DB::Open(options_, db_path_, &db_);
} else { } else if (!secondary_path_.empty()) {
st = DB::OpenAsSecondary(options_, db_path_, secondary_path_, &db_); st = DB::OpenAsSecondary(options_, db_path_, secondary_path_, &db_);
} else {
std::unique_ptr<DB> dbptr;
st = DB::OpenAsFollower(options_, db_path_, leader_path_, &dbptr);
db_ = dbptr.release();
} }
} else { } else {
if (secondary_path_.empty()) { if (secondary_path_.empty() && leader_path_.empty()) {
st = DB::Open(options_, db_path_, column_families_, &handles_opened, st = DB::Open(options_, db_path_, column_families_, &handles_opened,
&db_); &db_);
} else { } else if (!secondary_path_.empty()) {
st = DB::OpenAsSecondary(options_, db_path_, secondary_path_, st = DB::OpenAsSecondary(options_, db_path_, secondary_path_,
column_families_, &handles_opened, &db_); column_families_, &handles_opened, &db_);
} else {
std::unique_ptr<DB> dbptr;
st = DB::OpenAsFollower(options_, db_path_, leader_path_,
column_families_, &handles_opened, &dbptr);
db_ = dbptr.release();
} }
} }
} }
@ -561,6 +587,7 @@ std::vector<std::string> LDBCommand::BuildCmdLineOptions(
ARG_FS_URI, ARG_FS_URI,
ARG_DB, ARG_DB,
ARG_SECONDARY_PATH, ARG_SECONDARY_PATH,
ARG_LEADER_PATH,
ARG_BLOOM_BITS, ARG_BLOOM_BITS,
ARG_BLOCK_SIZE, ARG_BLOCK_SIZE,
ARG_AUTO_COMPACTION, ARG_AUTO_COMPACTION,
@ -3828,6 +3855,7 @@ const char* DBQuerierCommand::HELP_CMD = "help";
const char* DBQuerierCommand::GET_CMD = "get"; const char* DBQuerierCommand::GET_CMD = "get";
const char* DBQuerierCommand::PUT_CMD = "put"; const char* DBQuerierCommand::PUT_CMD = "put";
const char* DBQuerierCommand::DELETE_CMD = "delete"; const char* DBQuerierCommand::DELETE_CMD = "delete";
const char* DBQuerierCommand::COUNT_CMD = "count";
DBQuerierCommand::DBQuerierCommand( DBQuerierCommand::DBQuerierCommand(
const std::vector<std::string>& /*params*/, const std::vector<std::string>& /*params*/,
@ -3856,8 +3884,6 @@ void DBQuerierCommand::DoCommand() {
return; return;
} }
ReadOptions read_options;
WriteOptions write_options;
std::string line; std::string line;
std::string key; std::string key;
@ -3867,25 +3893,36 @@ void DBQuerierCommand::DoCommand() {
while (s.ok() && getline(std::cin, line, '\n')) { while (s.ok() && getline(std::cin, line, '\n')) {
// Parse line into std::vector<std::string> // Parse line into std::vector<std::string>
std::vector<std::string> tokens; std::vector<std::string> tokens;
ParsedParams parsed_params;
size_t pos = 0; size_t pos = 0;
while (true) { while (true) {
size_t pos2 = line.find(' ', pos); size_t pos2 = line.find(' ', pos);
std::string token =
line.substr(pos, (pos2 == std::string::npos) ? pos2 : (pos2 - pos));
ParseSingleParam(token, parsed_params, tokens);
if (pos2 == std::string::npos) { if (pos2 == std::string::npos) {
break; break;
} }
tokens.push_back(line.substr(pos, pos2 - pos));
pos = pos2 + 1; pos = pos2 + 1;
} }
tokens.push_back(line.substr(pos));
if (tokens.empty() || !parsed_params.flags.empty()) {
fprintf(stdout, "Bad command\n");
continue;
}
const std::string& cmd = tokens[0]; const std::string& cmd = tokens[0];
ReadOptions read_options;
WriteOptions write_options;
if (cmd == HELP_CMD) { if (cmd == HELP_CMD) {
fprintf(stdout, fprintf(stdout,
"get <key>\n" "get <key>\n"
"put <key> <value>\n" "put <key> <value>\n"
"delete <key>\n"); "delete <key>\n"
} else if (cmd == DELETE_CMD && tokens.size() == 2) { "count [--from=<start_key>] [--to=<end_key>]\n");
} else if (cmd == DELETE_CMD && tokens.size() == 2 &&
parsed_params.option_map.empty()) {
key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]); key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]);
s = db_->Delete(write_options, GetCfHandle(), Slice(key)); s = db_->Delete(write_options, GetCfHandle(), Slice(key));
if (s.ok()) { if (s.ok()) {
@ -3893,7 +3930,8 @@ void DBQuerierCommand::DoCommand() {
} else { } else {
oss << "delete " << key << " failed: " << s.ToString(); oss << "delete " << key << " failed: " << s.ToString();
} }
} else if (cmd == PUT_CMD && tokens.size() == 3) { } else if (cmd == PUT_CMD && tokens.size() == 3 &&
parsed_params.option_map.empty()) {
key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]); key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]);
value = (is_value_hex_ ? HexToString(tokens[2]) : tokens[2]); value = (is_value_hex_ ? HexToString(tokens[2]) : tokens[2]);
s = db_->Put(write_options, GetCfHandle(), Slice(key), Slice(value)); s = db_->Put(write_options, GetCfHandle(), Slice(key), Slice(value));
@ -3903,7 +3941,8 @@ void DBQuerierCommand::DoCommand() {
} else { } else {
oss << "put " << key << "=>" << value << " failed: " << s.ToString(); oss << "put " << key << "=>" << value << " failed: " << s.ToString();
} }
} else if (cmd == GET_CMD && tokens.size() == 2) { } else if (cmd == GET_CMD && tokens.size() == 2 &&
parsed_params.option_map.empty()) {
key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]); key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]);
s = db_->Get(read_options, GetCfHandle(), Slice(key), &value); s = db_->Get(read_options, GetCfHandle(), Slice(key), &value);
if (s.ok()) { if (s.ok()) {
@ -3919,6 +3958,48 @@ void DBQuerierCommand::DoCommand() {
oss << "get " << key << " error: " << s.ToString(); oss << "get " << key << " error: " << s.ToString();
} }
} }
} else if (cmd == COUNT_CMD) {
std::string start_key;
std::string end_key;
bool bad_option = false;
for (auto& option : parsed_params.option_map) {
if (option.first == "from") {
start_key =
(is_key_hex_ ? HexToString(option.second) : option.second);
} else if (option.first == "to") {
end_key = (is_key_hex_ ? HexToString(option.second) : option.second);
} else {
fprintf(stdout, "Unknown option %s\n", option.first.c_str());
bad_option = true;
break;
}
}
if (bad_option) {
continue;
}
Slice end_key_slice(end_key);
uint64_t count = 0;
if (!end_key.empty()) {
read_options.iterate_upper_bound = &end_key_slice;
}
std::unique_ptr<Iterator> iter(
db_->NewIterator(read_options, GetCfHandle()));
if (start_key.empty()) {
iter->SeekToFirst();
} else {
iter->Seek(start_key);
}
while (iter->status().ok() && iter->Valid()) {
count++;
iter->Next();
}
if (iter->status().ok()) {
fprintf(stdout, "%" PRIu64 "\n", count);
} else {
oss << "scan from " << start_key << " to " << end_key
<< "failed: " << iter->status().ToString();
}
} else { } else {
fprintf(stdout, "Unknown command %s\n", line.c_str()); fprintf(stdout, "Unknown command %s\n", line.c_str());
} }

View File

@ -620,6 +620,7 @@ class DBQuerierCommand : public LDBCommand {
static const char* GET_CMD; static const char* GET_CMD;
static const char* PUT_CMD; static const char* PUT_CMD;
static const char* DELETE_CMD; static const char* DELETE_CMD;
static const char* COUNT_CMD;
}; };
class CheckConsistencyCommand : public LDBCommand { class CheckConsistencyCommand : public LDBCommand {

View File

@ -28,6 +28,9 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options,
ret.append(" --" + LDBCommand::ARG_SECONDARY_PATH + ret.append(" --" + LDBCommand::ARG_SECONDARY_PATH +
"=<secondary_path> to open DB as secondary instance. Operations " "=<secondary_path> to open DB as secondary instance. Operations "
"not supported in secondary instance will fail.\n\n"); "not supported in secondary instance will fail.\n\n");
ret.append(" --" + LDBCommand::ARG_LEADER_PATH +
"=<leader_path> to open DB as a follower instance. Operations "
"not supported in follower instance will fail.\n\n");
ret.append( ret.append(
"The following optional parameters control if keys/values are " "The following optional parameters control if keys/values are "
"input/output as hex or as plain strings:\n"); "input/output as hex or as plain strings:\n");

View File

@ -0,0 +1 @@
Added a new "count" command to the ldb repl shell. By default, it prints a count of keys in the database from start to end. The options --from=<key> and/or --to=<key> can be specified to limit the range.