Add timestamp support in dump_wal/dump/idump (#12690)

Summary:
As titled.  For dumping wal files, since a mapping from column family id to the user comparator object is needed to print the timestamp in human readable format, option `[--db=<db_path>]` is added to `dump_wal` command to allow the user to choose to optionally open the DB as read only instance and dump the wal file with better timestamp formatting.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12690

Test Plan:
Manually tested

dump_wal:
[dump a wal file specified with --walfile]
```
>> ./ldb --walfile=$TEST_DB/000004.log dump_wal  --print_value
>>1,1,28,13,PUT(0) : 0x666F6F0100000000000000 : 0x7631
(Column family id: [0] contained in WAL are not opened in DB. Applied default hex formatting for user key. Specify --db=<db_path> to open DB for better user key formatting if it contains timestamp.)
```

[dump with --db specified for better timestamp formatting]
```
>> ./ldb --walfile=$TEST_DB/000004.log dump_wal  --db=$TEST_DB --print_value
>> 1,1,28,13,PUT(0) : 0x666F6F|timestamp:1 : 0x7631
```

dump:
[dump a file specified with --path]
```
>>./ldb --path=/tmp/rocksdbtest-501/column_family_test_75359_17910784957761284041/000004.log dump
Sequence,Count,ByteSize,Physical Offset,Key(s) : value
1,1,28,13,PUT(0) : 0x666F6F0100000000000000 : 0x7631
(Column family id: [0] contained in WAL are not opened in DB. Applied default hex formatting for user key. Specify --db=<db_path> to open DB for better user key formatting if it contains timestamp.)
```

[dump db specified with --db]
```
>> ./ldb --db=/tmp/rocksdbtest-501/column_family_test_75359_17910784957761284041 dump
>> foo|timestamp:1 ==> v1
Keys in range: 1
```

idump
```
./ldb --db=$TEST_DB idump
'foo|timestamp:1' seq:1, type:1 => v1
Internal keys in range: 1
```

Reviewed By: ltamasi

Differential Revision: D57755382

Pulled By: jowlyzhang

fbshipit-source-id: a0a2ef80c92801cbf7bfccc64769c1191824362e
This commit is contained in:
Yu Zhang 2024-05-23 20:26:57 -07:00 committed by Facebook GitHub Bot
parent f044b6a6ad
commit 9a72cf1a61
8 changed files with 293 additions and 58 deletions

View File

@ -160,6 +160,7 @@ class LDBCommand {
DB* db_;
DBWithTTL* db_ttl_;
std::map<std::string, ColumnFamilyHandle*> cf_handles_;
std::map<uint32_t, const Comparator*> ucmps_;
/**
* true implies that this command can work if the db is opened in read-only
@ -224,17 +225,19 @@ class LDBCommand {
ColumnFamilyHandle* GetCfHandle();
static std::string PrintKeyValue(const std::string& key,
const std::string& timestamp,
const std::string& value, bool is_key_hex,
bool is_value_hex);
bool is_value_hex, const Comparator* ucmp);
static std::string PrintKeyValue(const std::string& key,
const std::string& value, bool is_hex);
const std::string& timestamp,
const std::string& value, bool is_hex,
const Comparator* ucmp);
static std::string PrintKeyValueOrWideColumns(const Slice& key,
const Slice& value,
const WideColumns& wide_columns,
bool is_key_hex,
bool is_value_hex);
static std::string PrintKeyValueOrWideColumns(
const Slice& key, const Slice& timestamp, const Slice& value,
const WideColumns& wide_columns, bool is_key_hex, bool is_value_hex,
const Comparator* ucmp);
/**
* Return true if the specified flag is present in the specified flags vector

View File

@ -45,6 +45,7 @@
#include "util/file_checksum_helper.h"
#include "util/stderr_logger.h"
#include "util/string_util.h"
#include "util/write_batch_util.h"
#include "utilities/blob_db/blob_dump_tool.h"
#include "utilities/merge_operators.h"
#include "utilities/ttl/db_ttl_impl.h"
@ -115,6 +116,7 @@ namespace {
void DumpWalFile(Options options, std::string wal_file, bool print_header,
bool print_values, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state);
void DumpSstFile(Options options, std::string filename, bool output_hex,
@ -503,6 +505,7 @@ void LDBCommand::OpenDB() {
bool found_cf_name = false;
for (size_t i = 0; i < handles_opened.size(); i++) {
cf_handles_[column_families_[i].name] = handles_opened[i];
ucmps_[handles_opened[i]->GetID()] = handles_opened[i]->GetComparator();
if (column_family_name_ == column_families_[i].name) {
found_cf_name = true;
}
@ -512,6 +515,8 @@ void LDBCommand::OpenDB() {
"Non-existing column family " + column_family_name_);
CloseDB();
}
ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
ucmps_[default_cf->GetID()] = default_cf->GetComparator();
} else {
// We successfully opened DB in single column family mode.
assert(column_families_.empty());
@ -520,6 +525,8 @@ void LDBCommand::OpenDB() {
"Non-existing column family " + column_family_name_);
CloseDB();
}
ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
ucmps_[default_cf->GetID()] = default_cf->GetComparator();
}
}
@ -1146,27 +1153,36 @@ std::string LDBCommand::StringToHex(const std::string& str) {
}
std::string LDBCommand::PrintKeyValue(const std::string& key,
const std::string& timestamp,
const std::string& value, bool is_key_hex,
bool is_value_hex) {
bool is_value_hex,
const Comparator* ucmp) {
std::string result;
result.append(is_key_hex ? StringToHex(key) : key);
if (!timestamp.empty()) {
result.append("|timestamp:");
result.append(ucmp->TimestampToString(timestamp));
}
result.append(DELIM);
result.append(is_value_hex ? StringToHex(value) : value);
return result;
}
std::string LDBCommand::PrintKeyValue(const std::string& key,
const std::string& value, bool is_hex) {
return PrintKeyValue(key, value, is_hex, is_hex);
const std::string& timestamp,
const std::string& value, bool is_hex,
const Comparator* ucmp) {
return PrintKeyValue(key, timestamp, value, is_hex, is_hex, ucmp);
}
std::string LDBCommand::PrintKeyValueOrWideColumns(
const Slice& key, const Slice& value, const WideColumns& wide_columns,
bool is_key_hex, bool is_value_hex) {
const Slice& key, const Slice& timestamp, const Slice& value,
const WideColumns& wide_columns, bool is_key_hex, bool is_value_hex,
const Comparator* ucmp) {
if (wide_columns.empty() ||
WideColumnsHelper::HasDefaultColumnOnly(wide_columns)) {
return PrintKeyValue(key.ToString(), value.ToString(), is_key_hex,
is_value_hex);
return PrintKeyValue(key.ToString(), timestamp.ToString(), value.ToString(),
is_key_hex, is_value_hex, ucmp);
}
/*
// Sample plaintext output (first column is kDefaultWideColumnName)
@ -1177,9 +1193,10 @@ std::string LDBCommand::PrintKeyValueOrWideColumns(
*/
std::ostringstream oss;
WideColumnsHelper::DumpWideColumns(wide_columns, oss, is_value_hex);
return PrintKeyValue(key.ToString(), oss.str().c_str(), is_key_hex,
false); // is_value_hex_ is already honored in oss.
// avoid double-hexing it.
return PrintKeyValue(key.ToString(), timestamp.ToString(), oss.str().c_str(),
is_key_hex, false,
ucmp); // is_value_hex_ is already honored in oss.
// avoid double-hexing it.
}
std::string LDBCommand::HelpRangeCmdArgs() {
@ -1929,10 +1946,12 @@ void InternalDumpCommand::DoCommand() {
assert(GetExecuteState().IsFailed());
return;
}
ColumnFamilyHandle* cfh = GetCfHandle();
const Comparator* ucmp = cfh->GetComparator();
size_t ts_sz = ucmp->timestamp_size();
if (print_stats_) {
std::string stats;
if (db_->GetProperty(GetCfHandle(), "rocksdb.stats", &stats)) {
if (db_->GetProperty(cfh, "rocksdb.stats", &stats)) {
fprintf(stdout, "%s\n", stats.c_str());
}
}
@ -1954,7 +1973,11 @@ void InternalDumpCommand::DoCommand() {
for (auto& key_version : key_versions) {
ValueType value_type = static_cast<ValueType>(key_version.type);
InternalKey ikey(key_version.user_key, key_version.sequence, value_type);
if (has_to_ && ikey.user_key() == to_) {
Slice user_key_without_ts = ikey.user_key();
if (ts_sz > 0) {
user_key_without_ts.remove_suffix(ts_sz);
}
if (has_to_ && ucmp->Compare(user_key_without_ts, to_) == 0) {
// GetAllKeyVersions() includes keys with user key `to_`, but idump has
// traditionally excluded such keys.
break;
@ -1990,7 +2013,7 @@ void InternalDumpCommand::DoCommand() {
}
if (!count_only_ && !count_delim_) {
std::string key = ikey.DebugString(is_key_hex_);
std::string key = ikey.DebugString(is_key_hex_, ucmp);
Slice value(key_version.value);
if (!decode_blob_index_ || value_type != kTypeBlobIndex) {
if (value_type == kTypeWideColumnEntity) {
@ -2166,7 +2189,7 @@ void DBDumperCommand::DoCommand() {
// TODO(myabandeh): allow configuring is_write_commited
DumpWalFile(options_, path_, /* print_header_ */ true,
/* print_values_ */ true, true /* is_write_commited */,
&exec_state_);
ucmps_, &exec_state_);
break;
case kTableFile:
DumpSstFile(options_, path_, is_key_hex_, /* show_properties */ true,
@ -2206,8 +2229,16 @@ void DBDumperCommand::DoDumpCommand() {
// Setup key iterator
ReadOptions scan_read_opts;
Slice read_timestamp;
ColumnFamilyHandle* cfh = GetCfHandle();
const Comparator* ucmp = cfh->GetComparator();
size_t ts_sz = ucmp->timestamp_size();
if (ucmp->timestamp_size() > 0) {
read_timestamp = ucmp->GetMaxTimestamp();
scan_read_opts.timestamp = &read_timestamp;
}
scan_read_opts.total_order_seek = true;
Iterator* iter = db_->NewIterator(scan_read_opts, GetCfHandle());
Iterator* iter = db_->NewIterator(scan_read_opts, cfh);
Status st = iter->status();
if (!st.ok()) {
exec_state_ =
@ -2262,7 +2293,7 @@ void DBDumperCommand::DoDumpCommand() {
for (; iter->Valid(); iter->Next()) {
int rawtime = 0;
// If end marker was specified, we stop before it
if (!null_to_ && (iter->key().ToString() >= to_)) {
if (!null_to_ && ucmp->Compare(iter->key(), to_) >= 0) {
break;
}
// Terminate if maximum number of keys have been dumped
@ -2316,11 +2347,14 @@ void DBDumperCommand::DoDumpCommand() {
// (TODO) TTL Iterator does not support wide columns yet.
std::string str =
is_db_ttl_
? PrintKeyValue(iter->key().ToString(), iter->value().ToString(),
is_key_hex_, is_value_hex_)
: PrintKeyValueOrWideColumns(iter->key(), iter->value(),
iter->columns(), is_key_hex_,
is_value_hex_);
? PrintKeyValue(iter->key().ToString(),
ts_sz == 0 ? "" : iter->timestamp().ToString(),
iter->value().ToString(), is_key_hex_,
is_value_hex_, ucmp)
: PrintKeyValueOrWideColumns(
iter->key(), ts_sz == 0 ? "" : iter->timestamp().ToString(),
iter->value(), iter->columns(), is_key_hex_, is_value_hex_,
ucmp);
fprintf(stdout, "%s\n", str.c_str());
}
}
@ -2641,14 +2675,16 @@ struct StdErrReporter : public log::Reader::Reporter {
class InMemoryHandler : public WriteBatch::Handler {
public:
InMemoryHandler(std::stringstream& row, bool print_values,
bool write_after_commit = false)
bool write_after_commit,
const std::map<uint32_t, const Comparator*>& ucmps)
: Handler(),
row_(row),
print_values_(print_values),
write_after_commit_(write_after_commit) {}
write_after_commit_(write_after_commit),
ucmps_(ucmps) {}
void commonPutMerge(const Slice& key, const Slice& value) {
std::string k = LDBCommand::StringToHex(key.ToString());
void commonPutMerge(uint32_t cf, const Slice& key, const Slice& value) {
std::string k = PrintKey(cf, key);
if (print_values_) {
std::string v = LDBCommand::StringToHex(value.ToString());
row_ << k << " : ";
@ -2660,15 +2696,13 @@ class InMemoryHandler : public WriteBatch::Handler {
Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
row_ << "PUT(" << cf << ") : ";
commonPutMerge(key, value);
commonPutMerge(cf, key, value);
return Status::OK();
}
Status PutEntityCF(uint32_t cf, const Slice& key,
const Slice& value) override {
row_ << "PUT_ENTITY(" << cf
<< ") : " << LDBCommand::StringToHex(key.ToString());
row_ << "PUT_ENTITY(" << cf << ") : " << PrintKey(cf, key);
if (print_values_) {
row_ << " : ";
const Status s =
@ -2684,7 +2718,7 @@ class InMemoryHandler : public WriteBatch::Handler {
Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
row_ << "MERGE(" << cf << ") : ";
commonPutMerge(key, value);
commonPutMerge(cf, key, value);
return Status::OK();
}
@ -2695,21 +2729,21 @@ class InMemoryHandler : public WriteBatch::Handler {
Status DeleteCF(uint32_t cf, const Slice& key) override {
row_ << "DELETE(" << cf << ") : ";
row_ << LDBCommand::StringToHex(key.ToString()) << " ";
row_ << PrintKey(cf, key) << " ";
return Status::OK();
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
row_ << "SINGLE_DELETE(" << cf << ") : ";
row_ << LDBCommand::StringToHex(key.ToString()) << " ";
row_ << PrintKey(cf, key) << " ";
return Status::OK();
}
Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
const Slice& end_key) override {
row_ << "DELETE_RANGE(" << cf << ") : ";
row_ << LDBCommand::StringToHex(begin_key.ToString()) << " ";
row_ << LDBCommand::StringToHex(end_key.ToString()) << " ";
row_ << PrintKey(cf, begin_key) << " ";
row_ << PrintKey(cf, end_key) << " ";
return Status::OK();
}
@ -2754,13 +2788,37 @@ class InMemoryHandler : public WriteBatch::Handler {
}
private:
std::string PrintKey(uint32_t cf, const Slice& key) {
auto ucmp_iter = ucmps_.find(cf);
if (ucmp_iter == ucmps_.end()) {
// Fallback to default print slice as hex
return LDBCommand::StringToHex(key.ToString());
}
size_t ts_sz = ucmp_iter->second->timestamp_size();
if (ts_sz == 0) {
return LDBCommand::StringToHex(key.ToString());
} else {
// This could happen if there is corruption or undetected comparator
// change.
if (key.size() < ts_sz) {
return "CORRUPT KEY";
}
Slice user_key_without_ts = key;
user_key_without_ts.remove_suffix(ts_sz);
Slice ts = Slice(key.data() + key.size() - ts_sz, ts_sz);
return LDBCommand::StringToHex(user_key_without_ts.ToString()) +
"|timestamp:" + ucmp_iter->second->TimestampToString(ts);
}
}
std::stringstream& row_;
bool print_values_;
bool write_after_commit_;
const std::map<uint32_t, const Comparator*> ucmps_;
};
void DumpWalFile(Options options, std::string wal_file, bool print_header,
bool print_values, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state) {
const auto& fs = options.env->GetFileSystem();
FileOptions soptions(options);
@ -2781,6 +2839,12 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header,
uint64_t log_number;
FileType type;
// Comparators are available and will be used for formatting user key if DB
// is opened for this dump wal operation.
UnorderedMap<uint32_t, size_t> running_ts_sz;
for (const auto& [cf_id, ucmp] : ucmps) {
running_ts_sz.emplace(cf_id, ucmp->timestamp_size());
}
// we need the log number, but ParseFilename expects dbname/NNN.log.
std::string sanitized = wal_file;
size_t lastslash = sanitized.rfind('/');
@ -2793,6 +2857,7 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header,
}
log::Reader reader(options.info_log, std::move(wal_file_reader), &reporter,
true /* checksum */, log_number);
std::unordered_set<uint32_t> encountered_cf_ids;
std::string scratch;
WriteBatch batch;
Slice record;
@ -2821,11 +2886,51 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header,
}
break;
}
const UnorderedMap<uint32_t, size_t> recorded_ts_sz =
reader.GetRecordedTimestampSize();
if (!running_ts_sz.empty()) {
status = HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, recorded_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency,
/*new_batch=*/nullptr);
if (!status.ok()) {
std::stringstream oss;
oss << "Format for user keys in WAL file is inconsistent with the "
"comparator used to open the DB. Timestamp size recorded in "
"WAL vs specified by "
"comparator: {";
bool first_cf = true;
for (const auto& [cf_id, ts_sz] : running_ts_sz) {
if (first_cf) {
first_cf = false;
} else {
oss << ", ";
}
auto record_ts_iter = recorded_ts_sz.find(cf_id);
size_t ts_sz_in_wal = (record_ts_iter == recorded_ts_sz.end())
? 0
: record_ts_iter->second;
oss << "(cf_id: " << cf_id << ", [recorded: " << ts_sz_in_wal
<< ", comparator: " << ts_sz << "])";
}
oss << "}";
if (exec_state) {
*exec_state = LDBCommandExecuteResult::Failed(oss.str());
} else {
std::cerr << oss.str() << std::endl;
}
break;
}
}
row << WriteBatchInternal::Sequence(&batch) << ",";
row << WriteBatchInternal::Count(&batch) << ",";
row << WriteBatchInternal::ByteSize(&batch) << ",";
row << reader.LastRecordOffset() << ",";
InMemoryHandler handler(row, print_values, is_write_committed);
ColumnFamilyCollector cf_collector;
status = batch.Iterate(&cf_collector);
auto cf_ids = cf_collector.column_families();
encountered_cf_ids.insert(cf_ids.begin(), cf_ids.end());
InMemoryHandler handler(row, print_values, is_write_committed, ucmps);
status = batch.Iterate(&handler);
if (!status.ok()) {
if (exec_state) {
@ -2840,6 +2945,29 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header,
}
std::cout << row.str();
}
std::stringstream cf_ids_oss;
bool empty_cfs = true;
for (uint32_t cf_id : encountered_cf_ids) {
if (ucmps.find(cf_id) == ucmps.end()) {
if (empty_cfs) {
cf_ids_oss << "[";
empty_cfs = false;
} else {
cf_ids_oss << ",";
}
cf_ids_oss << cf_id;
}
}
if (!empty_cfs) {
cf_ids_oss << "]";
std::cout
<< "(Column family id: " << cf_ids_oss.str()
<< " contained in WAL are not opened in DB. Applied default "
"hex formatting for user key. Specify --db=<db_path> to "
"open DB for better user key formatting if it contains timestamp.)"
<< std::endl;
}
}
}
@ -2855,7 +2983,7 @@ WALDumperCommand::WALDumperCommand(
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(options, flags, true,
BuildCmdLineOptions({ARG_WAL_FILE, ARG_WRITE_COMMITTED,
BuildCmdLineOptions({ARG_WAL_FILE, ARG_DB, ARG_WRITE_COMMITTED,
ARG_PRINT_HEADER, ARG_PRINT_VALUE})),
print_header_(false),
print_values_(false),
@ -2875,12 +3003,17 @@ WALDumperCommand::WALDumperCommand(
exec_state_ = LDBCommandExecuteResult::Failed("Argument " + ARG_WAL_FILE +
" must be specified.");
}
if (!db_path_.empty()) {
no_db_open_ = false;
}
}
void WALDumperCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(WALDumperCommand::Name());
ret.append(" --" + ARG_WAL_FILE + "=<write_ahead_log_file_path>");
ret.append(" [--" + ARG_DB + "=<db_path>]");
ret.append(" [--" + ARG_PRINT_HEADER + "] ");
ret.append(" [--" + ARG_PRINT_VALUE + "] ");
ret.append(" [--" + ARG_WRITE_COMMITTED + "=true|false] ");
@ -2890,7 +3023,7 @@ void WALDumperCommand::Help(std::string& ret) {
void WALDumperCommand::DoCommand() {
PrepareOptions();
DumpWalFile(options_, wal_file_, print_header_, print_values_,
is_write_committed_, &exec_state_);
is_write_committed_, ucmps_, &exec_state_);
}
// ----------------------------------------------------------------------------
@ -3370,6 +3503,8 @@ void ScanCommand::DoCommand() {
int num_keys_scanned = 0;
ReadOptions scan_read_opts;
ColumnFamilyHandle* cfh = GetCfHandle();
const Comparator* ucmp = cfh->GetComparator();
size_t ts_sz = ucmp->timestamp_size();
Slice read_timestamp;
Status st = MaybePopulateReadTimestamp(cfh, scan_read_opts, &read_timestamp);
if (!st.ok()) {
@ -3426,12 +3561,15 @@ void ScanCommand::DoCommand() {
}
fprintf(stdout, "%s\n", key_str.c_str());
} else {
std::string str = is_db_ttl_ ? PrintKeyValue(it->key().ToString(),
it->value().ToString(),
is_key_hex_, is_value_hex_)
: PrintKeyValueOrWideColumns(
it->key(), it->value(), it->columns(),
is_key_hex_, is_value_hex_);
std::string str =
is_db_ttl_
? PrintKeyValue(it->key().ToString(),
ts_sz == 0 ? "" : it->timestamp().ToString(),
it->value().ToString(), is_key_hex_,
is_value_hex_, ucmp)
: PrintKeyValueOrWideColumns(
it->key(), ts_sz == 0 ? "" : it->timestamp(), it->value(),
it->columns(), is_key_hex_, is_value_hex_, ucmp);
fprintf(stdout, "%s\n", str.c_str());
}
@ -3769,8 +3907,11 @@ void DBQuerierCommand::DoCommand() {
key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]);
s = db_->Get(read_options, GetCfHandle(), Slice(key), &value);
if (s.ok()) {
// TODO: add read timestamp support in querier
fprintf(stdout, "%s\n",
PrintKeyValue(key, value, is_key_hex_, is_value_hex_).c_str());
PrintKeyValue(key, "", value, is_key_hex_, is_value_hex_,
GetCfHandle()->GetComparator())
.c_str());
} else {
if (s.IsNotFound()) {
fprintf(stdout, "Not found %s\n", tokens[1].c_str());
@ -4230,7 +4371,7 @@ void DBFileDumperCommand::DoCommand() {
std::cout << filename << std::endl;
// TODO(myabandeh): allow configuring is_write_commited
DumpWalFile(options_, filename, true, true, true /* is_write_commited */,
&exec_state_);
ucmps_, &exec_state_);
}
}
}

View File

@ -369,7 +369,7 @@ class WALDumperCommand : public LDBCommand {
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags);
bool NoDBOpen() override { return true; }
bool NoDBOpen() override { return no_db_open_; }
static void Help(std::string& ret);
@ -380,6 +380,7 @@ class WALDumperCommand : public LDBCommand {
std::string wal_file_;
bool print_values_;
bool is_write_committed_; // default will be set to true
bool no_db_open_ = true;
static const std::string ARG_WAL_FILE;
static const std::string ARG_WRITE_COMMITTED;

View File

@ -158,6 +158,39 @@ Status TimestampRecoveryHandler::PutCF(uint32_t cf, const Slice& key,
return WriteBatchInternal::Put(new_batch_.get(), cf, new_key, value);
}
Status TimestampRecoveryHandler::PutEntityCF(uint32_t cf, const Slice& key,
const Slice& entity) {
std::string new_key_buf;
Slice new_key;
Status status = TimestampRecoveryHandler::ReconcileTimestampDiscrepancy(
cf, key, &new_key_buf, &new_key);
if (!status.ok()) {
return status;
}
Slice entity_copy = entity;
WideColumns columns;
if (!WideColumnSerialization::Deserialize(entity_copy, columns).ok()) {
return Status::Corruption("Unable to deserialize entity",
entity.ToString(/* hex */ true));
}
return WriteBatchInternal::PutEntity(new_batch_.get(), cf, new_key, columns);
}
Status TimestampRecoveryHandler::TimedPutCF(uint32_t cf, const Slice& key,
const Slice& value,
uint64_t write_time) {
std::string new_key_buf;
Slice new_key;
Status status =
ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
if (!status.ok()) {
return status;
}
return WriteBatchInternal::TimedPut(new_batch_.get(), cf, new_key, value,
write_time);
}
Status TimestampRecoveryHandler::DeleteCF(uint32_t cf, const Slice& key) {
std::string new_key_buf;
Slice new_key;

View File

@ -11,6 +11,7 @@
#include <unordered_map>
#include <vector>
#include "db/wide/wide_column_serialization.h"
#include "db/write_batch_internal.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
@ -116,6 +117,12 @@ class TimestampRecoveryHandler : public WriteBatch::Handler {
Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override;
Status PutEntityCF(uint32_t cf, const Slice& key,
const Slice& entity) override;
Status TimedPutCF(uint32_t cf, const Slice& key, const Slice& value,
uint64_t write_time) override;
Status DeleteCF(uint32_t cf, const Slice& key) override;
Status SingleDeleteCF(uint32_t cf, const Slice& key) override;

View File

@ -16,6 +16,7 @@ namespace ROCKSDB_NAMESPACE {
namespace {
static const std::string kTestKeyWithoutTs = "key";
static const std::string kValuePlaceHolder = "value";
static const uint64_t kWriteUnixTime = 100;
} // namespace
class HandleTimestampSizeDifferenceTest : public testing::Test {
@ -38,6 +39,34 @@ class HandleTimestampSizeDifferenceTest : public testing::Test {
return AddKey(cf, key);
}
Status TimedPutCF(uint32_t cf, const Slice& key, const Slice& value,
uint64_t write_unix_time) override {
if (value.compare(kValuePlaceHolder) != 0) {
return Status::InvalidArgument();
}
if (write_unix_time != kWriteUnixTime) {
return Status::InvalidArgument();
}
return AddKey(cf, key);
}
Status PutEntityCF(uint32_t cf, const Slice& key,
const Slice& entity) override {
Slice entity_copy = entity;
WideColumns columns;
Status s = WideColumnSerialization::Deserialize(entity_copy, columns);
if (!s.ok()) {
return s;
}
if (columns.size() != 1) {
return Status::InvalidArgument();
}
if (columns[0].value().compare(kValuePlaceHolder) != 0) {
return Status::InvalidArgument();
}
return AddKey(cf, key);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return AddKey(cf, key);
}
@ -117,6 +146,10 @@ class HandleTimestampSizeDifferenceTest : public testing::Test {
WriteBatchInternal::Merge(batch, cf_id, key, kValuePlaceHolder));
ASSERT_OK(WriteBatchInternal::PutBlobIndex(batch, cf_id, key,
kValuePlaceHolder));
ASSERT_OK(WriteBatchInternal::TimedPut(
batch, cf_id, key, kValuePlaceHolder, kWriteUnixTime));
WideColumns columns{{kDefaultWideColumnName, kValuePlaceHolder}};
ASSERT_OK(WriteBatchInternal::PutEntity(batch, cf_id, key, columns));
}
}

View File

@ -32,6 +32,11 @@ class ColumnFamilyCollector : public WriteBatch::Handler {
return AddColumnFamilyId(column_family_id);
}
Status PutEntityCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status TimedPutCF(uint32_t column_family_id, const Slice&, const Slice&,
uint64_t) override {
return AddColumnFamilyId(column_family_id);

View File

@ -85,9 +85,21 @@ Status GetAllKeyVersions(DB* db, ColumnFamilyHandle* cfh, Slice begin_key,
ScopedArenaPtr<InternalIterator> iter(
idb->NewInternalIterator(read_options, &arena, kMaxSequenceNumber, cfh));
if (!begin_key.empty()) {
const Comparator* ucmp = icmp.user_comparator();
size_t ts_sz = ucmp->timestamp_size();
Slice from_slice = begin_key;
bool has_begin = !begin_key.empty();
Slice end_slice = end_key;
bool has_end = !end_key.empty();
std::string begin_key_buf, end_key_buf;
auto [from, end] = MaybeAddTimestampsToRange(
has_begin ? &from_slice : nullptr, has_end ? &end_slice : nullptr, ts_sz,
&begin_key_buf, &end_key_buf);
if (has_begin) {
assert(from.has_value());
InternalKey ikey;
ikey.SetMinPossibleForUserKey(begin_key);
ikey.SetMinPossibleForUserKey(from.value());
iter->Seek(ikey.Encode());
} else {
iter->SeekToFirst();
@ -102,8 +114,8 @@ Status GetAllKeyVersions(DB* db, ColumnFamilyHandle* cfh, Slice begin_key,
return pik_status;
}
if (!end_key.empty() &&
icmp.user_comparator()->Compare(ikey.user_key, end_key) > 0) {
if (has_end && end.has_value() &&
icmp.user_comparator()->Compare(ikey.user_key, end.value()) > 0) {
break;
}