mirror of https://github.com/facebook/rocksdb.git
Add user timestamp support into interactive query command (#12716)
Summary: As titled. This PR also makes the interactive query tool more permissive by allowing the user to continue to try out a different command after the previous command received some allowed errors, such as `Status::NotFound`, `Status::InvalidArgument`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12716 Test Plan: Manually tested: ``` yuzhangyu@yuzhangyu-mbp rocksdb % ./ldb --db=$TEST_DB --key_hex --value_hex query get 0x0000000000000000 --read_timestamp=1115559245398440 0x0000000000000000|timestamp:1115559245398440 ==> 0x07000000000102030C0D0E0F08090A0B14151617101112131C1D1E1F18191A1B24252627202122232C2D2E2F28292A2B34353637303132333C3D3E3F38393A3B put 0x0000000000000000 0x0000 put 0x0000000000000000 => 0x0000 failed: Invalid argument: cannot call this method on column family default that enables timestamp put 0x0000000000000000 aha 0x0000 put gets invalid argument: Invalid argument: user provided timestamp is not a valid uint64 value. put 0x0000000000000000 1115559245398441 0x08000000000102030C0D0E0F08090A0B14151617101112131C1D1E1F18191A1B24252627202122232C2D2E2F28292A2B34353637303132333C3D3E3F38393A3B put 0x0000000000000000 write_ts: 1115559245398441 => 0x08000000000102030C0D0E0F08090A0B14151617101112131C1D1E1F18191A1B24252627202122232C2D2E2F28292A2B34353637303132333C3D3E3F38393A3B succeeded delete 0x0000000000000000 delete 0x0000000000000000 failed: Invalid argument: cannot call this method on column family default that enables timestamp delete 0x0000000000000000 1115559245398442 delete 0x0000000000000000 write_ts: 1115559245398442 succeeded get 0x0000000000000000 --read_timestamp=1115559245398442 get 0x0000000000000000 read_timestamp: 1115559245398442 status: NotFound: get 0x0000000000000000 --read_timestamp=1115559245398441 0x0000000000000000|timestamp:1115559245398441 ==> 0x08000000000102030C0D0E0F08090A0B14151617101112131C1D1E1F18191A1B24252627202122232C2D2E2F28292A2B34353637303132333C3D3E3F38393A3B count --from=0x0000000000000000 --to=0x0000000000000001 scan from 0x0000000000000000 to 0x0000000000000001failed: Invalid argument: cannot call this method on column family default that enables timestamp count --from=0x0000000000000000 --to=0x0000000000000001 --read_timestamp=1115559245398442 0 count --from=0x0000000000000000 --to=0x0000000000000001 --read_timestamp=1115559245398441 1 ``` Reviewed By: ltamasi Differential Revision: D57992183 Pulled By: jowlyzhang fbshipit-source-id: 720525de22412d16aa952870e088f2c371459ece
This commit is contained in:
parent
7127119ae9
commit
8a462eefae
188
tools/ldb_cmd.cc
188
tools/ldb_cmd.cc
|
@ -126,6 +126,9 @@ void DumpSstFile(Options options, std::string filename, bool output_hex,
|
|||
|
||||
void DumpBlobFile(const std::string& filename, bool is_key_hex,
|
||||
bool is_value_hex, bool dump_uncompressed_blobs);
|
||||
|
||||
Status EncodeUserProvidedTimestamp(const std::string& user_timestamp,
|
||||
std::string* ts_buf);
|
||||
} // namespace
|
||||
|
||||
LDBCommand* LDBCommand::InitFromCmdLineArgs(
|
||||
|
@ -784,14 +787,10 @@ Status LDBCommand::MaybePopulateReadTimestamp(ColumnFamilyHandle* cfh,
|
|||
"column family does not enable user-defined timestamps while "
|
||||
"--read_timestamp is provided.");
|
||||
}
|
||||
uint64_t int_timestamp;
|
||||
std::istringstream iss(iter->second);
|
||||
if (!(iss >> int_timestamp)) {
|
||||
return Status::InvalidArgument(
|
||||
"column family enables user-defined timestamp while --read_timestamp "
|
||||
"is not a valid uint64 value.");
|
||||
Status s = EncodeUserProvidedTimestamp(iter->second, &read_timestamp_);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
EncodeU64Ts(int_timestamp, &read_timestamp_);
|
||||
*read_timestamp = read_timestamp_;
|
||||
ropts.timestamp = read_timestamp;
|
||||
return Status::OK();
|
||||
|
@ -3884,13 +3883,18 @@ void DBQuerierCommand::DoCommand() {
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
std::string line;
|
||||
std::string key;
|
||||
std::string value;
|
||||
Status s;
|
||||
std::stringstream oss;
|
||||
while (s.ok() && getline(std::cin, line, '\n')) {
|
||||
ColumnFamilyHandle* cfh = GetCfHandle();
|
||||
const Comparator* ucmp = cfh->GetComparator();
|
||||
while ((s.ok() || s.IsNotFound() || s.IsInvalidArgument()) &&
|
||||
getline(std::cin, line, '\n')) {
|
||||
std::string key;
|
||||
std::string timestamp;
|
||||
std::string value;
|
||||
// Reset to OK status before parsing and executing next user command.
|
||||
s = Status::OK();
|
||||
std::stringstream oss;
|
||||
// Parse line into std::vector<std::string>
|
||||
std::vector<std::string> tokens;
|
||||
ParsedParams parsed_params;
|
||||
|
@ -3914,49 +3918,111 @@ void DBQuerierCommand::DoCommand() {
|
|||
const std::string& cmd = tokens[0];
|
||||
ReadOptions read_options;
|
||||
WriteOptions write_options;
|
||||
Slice read_timestamp;
|
||||
|
||||
if (cmd == HELP_CMD) {
|
||||
fprintf(stdout,
|
||||
"get <key>\n"
|
||||
"put <key> <value>\n"
|
||||
"delete <key>\n"
|
||||
"count [--from=<start_key>] [--to=<end_key>]\n");
|
||||
} else if (cmd == DELETE_CMD && tokens.size() == 2 &&
|
||||
parsed_params.option_map.empty()) {
|
||||
"get <key> [--read_timestamp=<uint64_ts>]\n"
|
||||
"put <key> [<write_timestamp>] <value>\n"
|
||||
"delete <key> [<write_timestamp>]\n"
|
||||
"count [--from=<start_key>] [--to=<end_key>] "
|
||||
"[--read_timestamp=<uint64_ts>]\n");
|
||||
} else if (cmd == DELETE_CMD && parsed_params.option_map.empty()) {
|
||||
key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]);
|
||||
s = db_->Delete(write_options, GetCfHandle(), Slice(key));
|
||||
if (s.ok()) {
|
||||
fprintf(stdout, "Successfully deleted %s\n", tokens[1].c_str());
|
||||
if (tokens.size() == 2) {
|
||||
s = db_->Delete(write_options, cfh, Slice(key));
|
||||
} else if (tokens.size() == 3) {
|
||||
Status encode_s = EncodeUserProvidedTimestamp(tokens[2], ×tamp);
|
||||
if (encode_s.ok()) {
|
||||
s = db_->Delete(write_options, cfh, Slice(key), Slice(timestamp));
|
||||
} else {
|
||||
fprintf(stdout, "delete gets invalid argument: %s\n",
|
||||
encode_s.ToString().c_str());
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
oss << "delete " << key << " failed: " << s.ToString();
|
||||
fprintf(stdout, "delete gets invalid arguments\n");
|
||||
continue;
|
||||
}
|
||||
oss << "delete " << (is_key_hex_ ? StringToHex(key) : key);
|
||||
if (!timestamp.empty()) {
|
||||
oss << " write_ts: " << ucmp->TimestampToString(timestamp);
|
||||
}
|
||||
} else if (cmd == PUT_CMD && tokens.size() == 3 &&
|
||||
parsed_params.option_map.empty()) {
|
||||
key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]);
|
||||
value = (is_value_hex_ ? HexToString(tokens[2]) : tokens[2]);
|
||||
s = db_->Put(write_options, GetCfHandle(), Slice(key), Slice(value));
|
||||
if (s.ok()) {
|
||||
fprintf(stdout, "Successfully put %s %s\n", tokens[1].c_str(),
|
||||
tokens[2].c_str());
|
||||
oss << " succeeded";
|
||||
} else {
|
||||
oss << "put " << key << "=>" << value << " failed: " << s.ToString();
|
||||
oss << " failed: " << s.ToString();
|
||||
}
|
||||
} else if (cmd == GET_CMD && tokens.size() == 2 &&
|
||||
parsed_params.option_map.empty()) {
|
||||
fprintf(stdout, "%s\n", oss.str().c_str());
|
||||
} else if (cmd == PUT_CMD && parsed_params.option_map.empty()) {
|
||||
key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]);
|
||||
s = db_->Get(read_options, GetCfHandle(), Slice(key), &value);
|
||||
if (tokens.size() == 3) {
|
||||
value = (is_value_hex_ ? HexToString(tokens[2]) : tokens[2]);
|
||||
s = db_->Put(write_options, cfh, Slice(key), Slice(value));
|
||||
} else if (tokens.size() == 4) {
|
||||
value = (is_value_hex_ ? HexToString(tokens[3]) : tokens[3]);
|
||||
Status encode_s = EncodeUserProvidedTimestamp(tokens[2], ×tamp);
|
||||
if (encode_s.ok()) {
|
||||
s = db_->Put(write_options, cfh, Slice(key), Slice(timestamp),
|
||||
Slice(value));
|
||||
} else {
|
||||
fprintf(stdout, "put gets invalid argument: %s\n",
|
||||
encode_s.ToString().c_str());
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
fprintf(stdout, "put gets invalid arguments\n");
|
||||
continue;
|
||||
}
|
||||
|
||||
oss << "put " << (is_key_hex_ ? StringToHex(key) : key);
|
||||
if (!timestamp.empty()) {
|
||||
oss << " write_ts: " << ucmp->TimestampToString(timestamp);
|
||||
}
|
||||
oss << " => " << (is_value_hex_ ? StringToHex(value) : value);
|
||||
if (s.ok()) {
|
||||
oss << " succeeded";
|
||||
} else {
|
||||
oss << " failed: " << s.ToString();
|
||||
}
|
||||
fprintf(stdout, "%s\n", oss.str().c_str());
|
||||
} else if (cmd == GET_CMD && tokens.size() == 2) {
|
||||
key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]);
|
||||
bool bad_option = false;
|
||||
for (auto& option : parsed_params.option_map) {
|
||||
if (option.first == "read_timestamp") {
|
||||
Status encode_s =
|
||||
EncodeUserProvidedTimestamp(option.second, ×tamp);
|
||||
if (!encode_s.ok()) {
|
||||
fprintf(stdout, "get gets invalid argument: %s\n",
|
||||
encode_s.ToString().c_str());
|
||||
bad_option = true;
|
||||
break;
|
||||
}
|
||||
read_timestamp = timestamp;
|
||||
read_options.timestamp = &read_timestamp;
|
||||
} else {
|
||||
fprintf(stdout, "get gets invalid arguments\n");
|
||||
bad_option = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (bad_option) {
|
||||
continue;
|
||||
}
|
||||
s = db_->Get(read_options, cfh, Slice(key), &value);
|
||||
if (s.ok()) {
|
||||
// TODO: add read timestamp support in querier
|
||||
fprintf(stdout, "%s\n",
|
||||
PrintKeyValue(key, "", value, is_key_hex_, is_value_hex_,
|
||||
GetCfHandle()->GetComparator())
|
||||
PrintKeyValue(key, timestamp, value, is_key_hex_, is_value_hex_,
|
||||
ucmp)
|
||||
.c_str());
|
||||
} else {
|
||||
if (s.IsNotFound()) {
|
||||
fprintf(stdout, "Not found %s\n", tokens[1].c_str());
|
||||
} else {
|
||||
oss << "get " << key << " error: " << s.ToString();
|
||||
oss << "get " << (is_key_hex_ ? StringToHex(key) : key);
|
||||
if (!timestamp.empty()) {
|
||||
oss << " read_timestamp: " << ucmp->TimestampToString(timestamp);
|
||||
}
|
||||
oss << " status: " << s.ToString();
|
||||
fprintf(stdout, "%s\n", oss.str().c_str());
|
||||
}
|
||||
} else if (cmd == COUNT_CMD) {
|
||||
std::string start_key;
|
||||
|
@ -3968,8 +4034,19 @@ void DBQuerierCommand::DoCommand() {
|
|||
(is_key_hex_ ? HexToString(option.second) : option.second);
|
||||
} else if (option.first == "to") {
|
||||
end_key = (is_key_hex_ ? HexToString(option.second) : option.second);
|
||||
} else if (option.first == "read_timestamp") {
|
||||
Status encode_s =
|
||||
EncodeUserProvidedTimestamp(option.second, ×tamp);
|
||||
if (!encode_s.ok()) {
|
||||
bad_option = true;
|
||||
fprintf(stdout, "count gets invalid argument: %s\n",
|
||||
encode_s.ToString().c_str());
|
||||
break;
|
||||
}
|
||||
read_timestamp = timestamp;
|
||||
read_options.timestamp = &read_timestamp;
|
||||
} else {
|
||||
fprintf(stdout, "Unknown option %s\n", option.first.c_str());
|
||||
fprintf(stdout, "count gets invalid arguments\n");
|
||||
bad_option = true;
|
||||
break;
|
||||
}
|
||||
|
@ -3983,8 +4060,7 @@ void DBQuerierCommand::DoCommand() {
|
|||
if (!end_key.empty()) {
|
||||
read_options.iterate_upper_bound = &end_key_slice;
|
||||
}
|
||||
std::unique_ptr<Iterator> iter(
|
||||
db_->NewIterator(read_options, GetCfHandle()));
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, cfh));
|
||||
if (start_key.empty()) {
|
||||
iter->SeekToFirst();
|
||||
} else {
|
||||
|
@ -3997,15 +4073,21 @@ void DBQuerierCommand::DoCommand() {
|
|||
if (iter->status().ok()) {
|
||||
fprintf(stdout, "%" PRIu64 "\n", count);
|
||||
} else {
|
||||
oss << "scan from " << start_key << " to " << end_key
|
||||
<< "failed: " << iter->status().ToString();
|
||||
oss << "scan from "
|
||||
<< (is_key_hex_ ? StringToHex(start_key) : start_key);
|
||||
if (!timestamp.empty()) {
|
||||
oss << " read_timestamp: " << ucmp->TimestampToString(timestamp);
|
||||
}
|
||||
oss << " to " << (is_key_hex_ ? StringToHex(end_key) : end_key)
|
||||
<< " failed: " << iter->status().ToString();
|
||||
fprintf(stdout, "%s\n", oss.str().c_str());
|
||||
}
|
||||
} else {
|
||||
fprintf(stdout, "Unknown command %s\n", line.c_str());
|
||||
}
|
||||
}
|
||||
if (!s.ok()) {
|
||||
exec_state_ = LDBCommandExecuteResult::Failed(oss.str());
|
||||
if (!(s.ok() || s.IsNotFound() || s.IsInvalidArgument())) {
|
||||
exec_state_ = LDBCommandExecuteResult::Failed(s.ToString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4341,6 +4423,18 @@ void DumpBlobFile(const std::string& filename, bool is_key_hex,
|
|||
fprintf(stderr, "Failed: %s\n", s.ToString().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
Status EncodeUserProvidedTimestamp(const std::string& user_timestamp,
|
||||
std::string* ts_buf) {
|
||||
uint64_t int_timestamp;
|
||||
std::istringstream iss(user_timestamp);
|
||||
if (!(iss >> int_timestamp)) {
|
||||
return Status::InvalidArgument(
|
||||
"user provided timestamp is not a valid uint64 value.");
|
||||
}
|
||||
EncodeU64Ts(int_timestamp, ts_buf);
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace
|
||||
|
||||
DBFileDumperCommand::DBFileDumperCommand(
|
||||
|
|
Loading…
Reference in New Issue