Support read timestamp in ldb (#12641)

Summary:
As titled. Also updated sst_dump to print out user-defined timestamp separately and in human readable format.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12641

Test Plan:
manually tested:
Example success run:
./ldb --db=$TEST_DB multi_get_entity 0x00000000000000950000000000000120 --key_hex --column_family=7  --read_timestamp=1115613683797942 --value_hex
0x00000000000000950000000000000120 ==> :0x0E0000000A0B080906070405020300011E1F1C1D1A1B181916171415121310112E2F2C2D2A2B282926272425222320213E3F3C3D3A3B383936373435323330314E4F4C4D4A4B484946474445424340415E5F5C5D5A5B58595657545552535051
Example failed run:
Failed: GetEntity failed: Invalid argument: column family enables user-defined timestamp while --read_timestamp is not a valid uint64 value.

sst_dump print out:
'000000000000015D000000000000012B000000000000013B|timestamp:1113554493256671' seq:2330405, type:1 => 010000000504070609080B0A0D0C0F0E111013121514171619181B1A1D1C1F1E212023222524272629282B2A2D2C2F2E313033323534373639383B3A3D3C3F3E

Reviewed By: ltamasi

Differential Revision: D57297006

Pulled By: jowlyzhang

fbshipit-source-id: 8486d91468e4f6c0d42dca3c9629f1c45a92bf5a
This commit is contained in:
Yu Zhang 2024-05-13 15:43:12 -07:00 committed by Facebook GitHub Bot
parent 20213d01a3
commit c110091d36
10 changed files with 178 additions and 37 deletions

View File

@ -2992,9 +2992,9 @@ inline Status DBImpl::FailIfReadCollapsedHistory(const ColumnFamilyData* cfd,
if (!full_history_ts_low.empty() &&
ucmp->CompareTimestamp(ts, full_history_ts_low) < 0) {
std::stringstream oss;
oss << "Read timestamp: " << ts.ToString(true)
oss << "Read timestamp: " << ucmp->TimestampToString(ts)
<< " is smaller than full_history_ts_low: "
<< Slice(full_history_ts_low).ToString(true) << std::endl;
<< ucmp->TimestampToString(full_history_ts_low) << std::endl;
return Status::InvalidArgument(oss.str());
}
return Status::OK();

View File

@ -1094,7 +1094,12 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
assert(ucmp->timestamp_size() == ts_low.size() && !ts_low.empty());
if (!current_ts_low.empty() &&
ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) {
return Status::InvalidArgument("Cannot decrease full_history_ts_low");
std::stringstream oss;
oss << "Current full_history_ts_low: "
<< ucmp->TimestampToString(current_ts_low)
<< " is higher than provided ts: " << ucmp->TimestampToString(ts_low)
<< std::endl;
return Status::InvalidArgument(oss.str());
}
Status s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),

View File

@ -155,10 +155,23 @@ void ReplaceInternalKeyWithMinTimestamp(std::string* result, const Slice& key,
result->append(key.data() + key_sz - kNumInternalBytes, kNumInternalBytes);
}
std::string ParsedInternalKey::DebugString(bool log_err_key, bool hex) const {
std::string ParsedInternalKey::DebugString(bool log_err_key, bool hex,
const Comparator* ucmp) const {
std::string result = "'";
size_t ts_sz_for_debug = ucmp == nullptr ? 0 : ucmp->timestamp_size();
if (log_err_key) {
result += user_key.ToString(hex);
if (ts_sz_for_debug == 0) {
result += user_key.ToString(hex);
} else {
assert(user_key.size() >= ts_sz_for_debug);
Slice user_key_without_ts = user_key;
user_key_without_ts.remove_suffix(ts_sz_for_debug);
result += user_key_without_ts.ToString(hex);
Slice ts = Slice(user_key.data() + user_key.size() - ts_sz_for_debug,
ts_sz_for_debug);
result += "|timestamp:";
result += ucmp->TimestampToString(ts);
}
} else {
result += "<redacted>";
}
@ -171,11 +184,11 @@ std::string ParsedInternalKey::DebugString(bool log_err_key, bool hex) const {
return result;
}
std::string InternalKey::DebugString(bool hex) const {
std::string InternalKey::DebugString(bool hex, const Comparator* ucmp) const {
std::string result;
ParsedInternalKey parsed;
if (ParseInternalKey(rep_, &parsed, false /* log_err_key */).ok()) {
result = parsed.DebugString(true /* log_err_key */, hex); // TODO
result = parsed.DebugString(true /* log_err_key */, hex, ucmp); // TODO
} else {
result = "(bad)";
result.append(EscapeString(rep_));

View File

@ -148,7 +148,8 @@ struct ParsedInternalKey {
// u contains timestamp if user timestamp feature is enabled.
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
: user_key(u), sequence(seq), type(t) {}
std::string DebugString(bool log_err_key, bool hex) const;
std::string DebugString(bool log_err_key, bool hex,
const Comparator* ucmp = nullptr) const;
void clear() {
user_key.clear();
@ -503,7 +504,7 @@ class InternalKey {
AppendInternalKeyFooter(&rep_, s, t);
}
std::string DebugString(bool hex) const;
std::string DebugString(bool hex, const Comparator* ucmp = nullptr) const;
};
inline int InternalKeyComparator::Compare(const InternalKey& a,

View File

@ -142,6 +142,11 @@ class Comparator : public Customizable, public CompareInterface {
return Slice();
}
// Return a human readable user-defined timestamp for debugging.
virtual std::string TimestampToString(const Slice& /*timestamp*/) const {
return "";
}
int CompareWithoutTimestamp(const Slice& a, const Slice& b) const {
return CompareWithoutTimestamp(a, /*a_has_ts=*/true, b, /*b_has_ts=*/true);
}

View File

@ -72,6 +72,7 @@ class LDBCommand {
static const std::string ARG_PREPOPULATE_BLOB_CACHE;
static const std::string ARG_DECODE_BLOB_INDEX;
static const std::string ARG_DUMP_UNCOMPRESSED_BLOBS;
static const std::string ARG_READ_TIMESTAMP;
struct ParsedParams {
std::string cmd;
@ -190,6 +191,9 @@ class LDBCommand {
bool create_if_missing_;
/** Encoded user provided uint64_t read timestamp. */
std::string read_timestamp_;
/**
* Map of options passed on the command-line.
*/
@ -275,6 +279,10 @@ class LDBCommand {
bool ParseBooleanOption(const std::map<std::string, std::string>& options,
const std::string& option, bool default_val);
/* Populate `ropts.timestamp` from command line flag --read_timestamp */
Status MaybePopulateReadTimestamp(ColumnFamilyHandle* cfh, ReadOptions& ropts,
Slice* read_timestamp);
Options options_;
std::vector<ColumnFamilyDescriptor> column_families_;
ConfigOptions config_options_;

View File

@ -521,22 +521,22 @@ Status SstFileDumper::ReadSequential(bool print_kv, uint64_t read_num_limit,
iter->value(), oss, output_hex_);
if (!s.ok()) {
fprintf(stderr, "%s => error deserializing wide columns\n",
ikey.DebugString(true, output_hex_).c_str());
ikey.DebugString(true, output_hex_, ucmp).c_str());
continue;
}
fprintf(stdout, "%s => %s\n",
ikey.DebugString(true, output_hex_).c_str(),
ikey.DebugString(true, output_hex_, ucmp).c_str(),
oss.str().c_str());
} else if (ikey.type == kTypeValuePreferredSeqno) {
auto [unpacked_value, preferred_seqno] =
ParsePackedValueWithSeqno(value);
fprintf(stdout, "%s => %s, %llu\n",
ikey.DebugString(true, output_hex_).c_str(),
ikey.DebugString(true, output_hex_, ucmp).c_str(),
unpacked_value.ToString(output_hex_).c_str(),
static_cast<unsigned long long>(preferred_seqno));
} else {
fprintf(stdout, "%s => %s\n",
ikey.DebugString(true, output_hex_).c_str(),
ikey.DebugString(true, output_hex_, ucmp).c_str(),
value.ToString(output_hex_).c_str());
}
} else {
@ -545,12 +545,12 @@ Status SstFileDumper::ReadSequential(bool print_kv, uint64_t read_num_limit,
const Status s = blob_index.DecodeFrom(value);
if (!s.ok()) {
fprintf(stderr, "%s => error decoding blob index\n",
ikey.DebugString(true, output_hex_).c_str());
ikey.DebugString(true, output_hex_, ucmp).c_str());
continue;
}
fprintf(stdout, "%s => %s\n",
ikey.DebugString(true, output_hex_).c_str(),
ikey.DebugString(true, output_hex_, ucmp).c_str(),
blob_index.DebugString(output_hex_).c_str());
}
}

View File

@ -107,6 +107,7 @@ const std::string LDBCommand::ARG_PREPOPULATE_BLOB_CACHE =
const std::string LDBCommand::ARG_DECODE_BLOB_INDEX = "decode_blob_index";
const std::string LDBCommand::ARG_DUMP_UNCOMPRESSED_BLOBS =
"dump_uncompressed_blobs";
const std::string LDBCommand::ARG_READ_TIMESTAMP = "read_timestamp";
const char* LDBCommand::DELIM = " ==> ";
@ -722,6 +723,46 @@ bool LDBCommand::ParseCompressionTypeOption(
return false;
}
Status LDBCommand::MaybePopulateReadTimestamp(ColumnFamilyHandle* cfh,
ReadOptions& ropts,
Slice* read_timestamp) {
const size_t ts_sz = cfh->GetComparator()->timestamp_size();
auto iter = option_map_.find(ARG_READ_TIMESTAMP);
if (iter == option_map_.end()) {
if (ts_sz == 0) {
return Status::OK();
}
return Status::InvalidArgument(
"column family enables user-defined timestamp while --read_timestamp "
"is not provided.");
}
if (iter->second.empty()) {
if (ts_sz == 0) {
return Status::OK();
}
return Status::InvalidArgument(
"column family enables user-defined timestamp while --read_timestamp "
"is empty.");
}
if (ts_sz == 0) {
return Status::InvalidArgument(
"column family does not enable user-defined timestamps while "
"--read_timestamp is provided.");
}
uint64_t int_timestamp;
std::istringstream iss(iter->second);
if (!(iss >> int_timestamp)) {
return Status::InvalidArgument(
"column family enables user-defined timestamp while --read_timestamp "
"is not a valid uint64 value.");
}
EncodeU64Ts(int_timestamp, &read_timestamp_);
*read_timestamp = read_timestamp_;
ropts.timestamp = read_timestamp;
return Status::OK();
}
void LDBCommand::OverrideBaseOptions() {
options_.create_if_missing = false;
@ -769,7 +810,10 @@ void LDBCommand::OverrideBaseCFOptions(ColumnFamilyOptions* cf_opts) {
}
}
if (options_.comparator != nullptr) {
// Default comparator is BytewiseComparator, so only when it's not, it
// means user has a command line override.
if (options_.comparator != nullptr &&
options_.comparator != BytewiseComparator()) {
cf_opts->comparator = options_.comparator;
}
@ -2846,9 +2890,9 @@ void WALDumperCommand::DoCommand() {
GetCommand::GetCommand(const std::vector<std::string>& params,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(
options, flags, true,
BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX})) {
: LDBCommand(options, flags, true,
BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX,
ARG_VALUE_HEX, ARG_READ_TIMESTAMP})) {
if (params.size() != 1) {
exec_state_ = LDBCommandExecuteResult::Failed(
"<key> must be specified for the get command");
@ -2865,6 +2909,7 @@ void GetCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(GetCommand::Name());
ret.append(" <key>");
ret.append(" [--" + ARG_READ_TIMESTAMP + "=<uint64_ts>] ");
ret.append(" [--" + ARG_TTL + "]");
ret.append("\n");
}
@ -2874,8 +2919,18 @@ void GetCommand::DoCommand() {
assert(GetExecuteState().IsFailed());
return;
}
ReadOptions ropts;
Slice read_timestamp;
ColumnFamilyHandle* cfh = GetCfHandle();
Status st = MaybePopulateReadTimestamp(cfh, ropts, &read_timestamp);
if (!st.ok()) {
std::stringstream oss;
oss << "Get failed: " << st.ToString();
exec_state_ = LDBCommandExecuteResult::Failed(oss.str());
return;
}
std::string value;
Status st = db_->Get(ReadOptions(), GetCfHandle(), key_, &value);
st = db_->Get(ropts, cfh, key_, &value);
if (st.ok()) {
fprintf(stdout, "%s\n",
(is_value_hex_ ? StringToHex(value) : value).c_str());
@ -2895,7 +2950,8 @@ MultiGetCommand::MultiGetCommand(
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(options, flags, true,
BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX})) {
BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX,
ARG_READ_TIMESTAMP})) {
if (params.size() < 1) {
exec_state_ = LDBCommandExecuteResult::Failed(
"At least one <key> must be specified for multi_get.");
@ -2911,6 +2967,7 @@ void MultiGetCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(MultiGetCommand::Name());
ret.append(" <key_1> <key_2> <key_3> ...");
ret.append(" [--" + ARG_READ_TIMESTAMP + "=<uint64_ts>] ");
ret.append("\n");
}
@ -2919,6 +2976,16 @@ void MultiGetCommand::DoCommand() {
assert(GetExecuteState().IsFailed());
return;
}
ReadOptions ropts;
Slice read_timestamp;
ColumnFamilyHandle* cfh = GetCfHandle();
Status st = MaybePopulateReadTimestamp(cfh, ropts, &read_timestamp);
if (!st.ok()) {
std::stringstream oss;
oss << "MultiGet failed: " << st.ToString();
exec_state_ = LDBCommandExecuteResult::Failed(oss.str());
return;
}
size_t num_keys = keys_.size();
std::vector<Slice> key_slices;
std::vector<PinnableSlice> values(num_keys);
@ -2926,8 +2993,8 @@ void MultiGetCommand::DoCommand() {
for (const std::string& key : keys_) {
key_slices.emplace_back(key);
}
db_->MultiGet(ReadOptions(), GetCfHandle(), num_keys, key_slices.data(),
values.data(), statuses.data());
db_->MultiGet(ropts, cfh, num_keys, key_slices.data(), values.data(),
statuses.data());
bool failed = false;
for (size_t i = 0; i < num_keys; ++i) {
@ -2957,9 +3024,9 @@ GetEntityCommand::GetEntityCommand(
const std::vector<std::string>& params,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(
options, flags, true,
BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX})) {
: LDBCommand(options, flags, true,
BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX,
ARG_VALUE_HEX, ARG_READ_TIMESTAMP})) {
if (params.size() != 1) {
exec_state_ = LDBCommandExecuteResult::Failed(
"<key> must be specified for the get_entity command");
@ -2976,6 +3043,7 @@ void GetEntityCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(GetEntityCommand::Name());
ret.append(" <key>");
ret.append(" [--" + ARG_READ_TIMESTAMP + "=<uint64_ts>] ");
ret.append(" [--" + ARG_TTL + "]");
ret.append("\n");
}
@ -2985,9 +3053,18 @@ void GetEntityCommand::DoCommand() {
assert(GetExecuteState().IsFailed());
return;
}
ReadOptions ropt;
Slice read_timestamp;
ColumnFamilyHandle* cfh = GetCfHandle();
Status st = MaybePopulateReadTimestamp(cfh, ropt, &read_timestamp);
if (!st.ok()) {
std::stringstream oss;
oss << "GetEntity failed: " << st.ToString();
exec_state_ = LDBCommandExecuteResult::Failed(oss.str());
return;
}
PinnableWideColumns pinnable_wide_columns;
Status st = db_->GetEntity(ReadOptions(), GetCfHandle(), key_,
&pinnable_wide_columns);
st = db_->GetEntity(ropt, cfh, key_, &pinnable_wide_columns);
if (st.ok()) {
std::ostringstream oss;
WideColumnsHelper::DumpWideColumns(pinnable_wide_columns.columns(), oss,
@ -3007,7 +3084,8 @@ MultiGetEntityCommand::MultiGetEntityCommand(
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(options, flags, true /* is_read_only */,
BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX})) {
BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX,
ARG_READ_TIMESTAMP})) {
if (params.size() < 1) {
exec_state_ = LDBCommandExecuteResult::Failed(
"At least one <key> must be specified for the multi_get_entity "
@ -3024,6 +3102,7 @@ void MultiGetEntityCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(MultiGetEntityCommand::Name());
ret.append(" <key_1> <key_2> <key_3> ...");
ret.append(" [--" + ARG_READ_TIMESTAMP + "=<uint64_ts>] ");
ret.append("\n");
}
@ -3033,6 +3112,16 @@ void MultiGetEntityCommand::DoCommand() {
return;
}
ReadOptions ropt;
Slice read_timestamp;
ColumnFamilyHandle* cfh = GetCfHandle();
Status st = MaybePopulateReadTimestamp(cfh, ropt, &read_timestamp);
if (!st.ok()) {
std::stringstream oss;
oss << "MultiGetEntity failed: " << st.ToString();
exec_state_ = LDBCommandExecuteResult::Failed(oss.str());
return;
}
size_t num_keys = keys_.size();
std::vector<Slice> key_slices;
std::vector<PinnableWideColumns> results(num_keys);
@ -3041,8 +3130,8 @@ void MultiGetEntityCommand::DoCommand() {
key_slices.emplace_back(key);
}
db_->MultiGetEntity(ReadOptions(), GetCfHandle(), num_keys, key_slices.data(),
results.data(), statuses.data());
db_->MultiGetEntity(ropt, cfh, num_keys, key_slices.data(), results.data(),
statuses.data());
bool failed = false;
for (size_t i = 0; i < num_keys; ++i) {
@ -3200,11 +3289,11 @@ void BatchPutCommand::OverrideBaseOptions() {
ScanCommand::ScanCommand(const std::vector<std::string>& /*params*/,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(
options, flags, true,
BuildCmdLineOptions({ARG_TTL, ARG_NO_VALUE, ARG_HEX, ARG_KEY_HEX,
ARG_TO, ARG_VALUE_HEX, ARG_FROM, ARG_TIMESTAMP,
ARG_MAX_KEYS, ARG_TTL_START, ARG_TTL_END})),
: LDBCommand(options, flags, true,
BuildCmdLineOptions(
{ARG_TTL, ARG_NO_VALUE, ARG_HEX, ARG_KEY_HEX, ARG_TO,
ARG_VALUE_HEX, ARG_FROM, ARG_TIMESTAMP, ARG_MAX_KEYS,
ARG_TTL_START, ARG_TTL_END, ARG_READ_TIMESTAMP})),
start_key_specified_(false),
end_key_specified_(false),
max_keys_scanned_(-1),
@ -3260,6 +3349,7 @@ void ScanCommand::Help(std::string& ret) {
ret.append(" [--" + ARG_TTL_START + "=<N>:- is inclusive]");
ret.append(" [--" + ARG_TTL_END + "=<N>:- is exclusive]");
ret.append(" [--" + ARG_NO_VALUE + "]");
ret.append(" [--" + ARG_READ_TIMESTAMP + "=<uint64_ts>] ");
ret.append("\n");
}
@ -3271,8 +3361,17 @@ void ScanCommand::DoCommand() {
int num_keys_scanned = 0;
ReadOptions scan_read_opts;
ColumnFamilyHandle* cfh = GetCfHandle();
Slice read_timestamp;
Status st = MaybePopulateReadTimestamp(cfh, scan_read_opts, &read_timestamp);
if (!st.ok()) {
std::stringstream oss;
oss << "Scan failed: " << st.ToString();
exec_state_ = LDBCommandExecuteResult::Failed(oss.str());
return;
}
scan_read_opts.total_order_seek = true;
Iterator* it = db_->NewIterator(scan_read_opts, GetCfHandle());
Iterator* it = db_->NewIterator(scan_read_opts, cfh);
if (start_key_specified_) {
it->Seek(start_key_);
} else {

View File

@ -85,6 +85,9 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options,
"=<double,e.g.:0.25>\n");
ret.append(" --" + LDBCommand::ARG_BLOB_COMPACTION_READAHEAD_SIZE +
"=<int,e.g.:2097152>\n");
ret.append(" --" + LDBCommand::ARG_READ_TIMESTAMP +
"=<uint64_ts, e.g.:323> : read timestamp, required if column "
"family enables timestamp, otherwise invalid if provided.");
ret.append("\n\n");
ret.append("Data Access Commands:\n");

View File

@ -276,6 +276,13 @@ class ComparatorWithU64TsImpl : public Comparator {
Slice GetMinTimestamp() const override { return MinU64Ts(); }
std::string TimestampToString(const Slice& timestamp) const override {
assert(timestamp.size() == sizeof(uint64_t));
uint64_t ts = 0;
DecodeU64Ts(timestamp, &ts).PermitUncheckedError();
return std::to_string(ts);
}
using Comparator::CompareWithoutTimestamp;
int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
bool b_has_ts) const override {