mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-26 16:30:56 +00:00
ff8953380f
Summary: Trace file V2 added lower/upper bounds to `Iterator::Seek()` and `Iterator::SeekForPrev()`. They were not used anywhere during the execution of a `TraceRecord`. Now they are added to be used by `ReadOptions` during `Iterator::Seek()` and `Iterator::SeekForPrev()` if they are set. Added test cases in `DBTest2.TraceAndManualReplay`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8677 Reviewed By: zhichao-cao Differential Revision: D30438255 Pulled By: autopear fbshipit-source-id: 82563006be0b69155990e506a74951c18af8d288
182 lines
5 KiB
C++
182 lines
5 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "trace_replay/trace_record_handler.h"
|
|
|
|
#include "rocksdb/iterator.h"
|
|
#include "rocksdb/trace_record_result.h"
|
|
#include "rocksdb/write_batch.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// TraceExecutionHandler
|
|
TraceExecutionHandler::TraceExecutionHandler(
|
|
DB* db, const std::vector<ColumnFamilyHandle*>& handles)
|
|
: TraceRecord::Handler(),
|
|
db_(db),
|
|
write_opts_(WriteOptions()),
|
|
read_opts_(ReadOptions()),
|
|
clock_(SystemClock::Default()) {
|
|
assert(db != nullptr);
|
|
assert(!handles.empty());
|
|
cf_map_.reserve(handles.size());
|
|
for (ColumnFamilyHandle* handle : handles) {
|
|
assert(handle != nullptr);
|
|
cf_map_.insert({handle->GetID(), handle});
|
|
}
|
|
}
|
|
|
|
TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); }
|
|
|
|
Status TraceExecutionHandler::Handle(
|
|
const WriteQueryTraceRecord& record,
|
|
std::unique_ptr<TraceRecordResult>* result) {
|
|
if (result != nullptr) {
|
|
result->reset(nullptr);
|
|
}
|
|
uint64_t start = clock_->NowMicros();
|
|
|
|
WriteBatch batch(record.GetWriteBatchRep().ToString());
|
|
Status s = db_->Write(write_opts_, &batch);
|
|
|
|
uint64_t end = clock_->NowMicros();
|
|
|
|
if (s.ok() && result != nullptr) {
|
|
result->reset(new StatusOnlyTraceExecutionResult(s, start, end,
|
|
record.GetTraceType()));
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Status TraceExecutionHandler::Handle(
|
|
const GetQueryTraceRecord& record,
|
|
std::unique_ptr<TraceRecordResult>* result) {
|
|
if (result != nullptr) {
|
|
result->reset(nullptr);
|
|
}
|
|
auto it = cf_map_.find(record.GetColumnFamilyID());
|
|
if (it == cf_map_.end()) {
|
|
return Status::Corruption("Invalid Column Family ID.");
|
|
}
|
|
|
|
uint64_t start = clock_->NowMicros();
|
|
|
|
std::string value;
|
|
Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value);
|
|
|
|
uint64_t end = clock_->NowMicros();
|
|
|
|
// Treat not found as ok, return other errors.
|
|
if (!s.ok() && !s.IsNotFound()) {
|
|
return s;
|
|
}
|
|
|
|
if (result != nullptr) {
|
|
// Report the actual opetation status in TraceExecutionResult
|
|
result->reset(new SingleValueTraceExecutionResult(
|
|
std::move(s), std::move(value), start, end, record.GetTraceType()));
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status TraceExecutionHandler::Handle(
|
|
const IteratorSeekQueryTraceRecord& record,
|
|
std::unique_ptr<TraceRecordResult>* result) {
|
|
if (result != nullptr) {
|
|
result->reset(nullptr);
|
|
}
|
|
auto it = cf_map_.find(record.GetColumnFamilyID());
|
|
if (it == cf_map_.end()) {
|
|
return Status::Corruption("Invalid Column Family ID.");
|
|
}
|
|
|
|
ReadOptions r_opts = read_opts_;
|
|
Slice lower = record.GetLowerBound();
|
|
if (!lower.empty()) {
|
|
r_opts.iterate_lower_bound = &lower;
|
|
}
|
|
Slice upper = record.GetUpperBound();
|
|
if (!upper.empty()) {
|
|
r_opts.iterate_upper_bound = &upper;
|
|
}
|
|
Iterator* single_iter = db_->NewIterator(r_opts, it->second);
|
|
|
|
uint64_t start = clock_->NowMicros();
|
|
|
|
switch (record.GetSeekType()) {
|
|
case IteratorSeekQueryTraceRecord::kSeekForPrev: {
|
|
single_iter->SeekForPrev(record.GetKey());
|
|
break;
|
|
}
|
|
default: {
|
|
single_iter->Seek(record.GetKey());
|
|
break;
|
|
}
|
|
}
|
|
|
|
uint64_t end = clock_->NowMicros();
|
|
|
|
Status s = single_iter->status();
|
|
delete single_iter;
|
|
|
|
if (s.ok() && result != nullptr) {
|
|
result->reset(new StatusOnlyTraceExecutionResult(s, start, end,
|
|
record.GetTraceType()));
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Status TraceExecutionHandler::Handle(
|
|
const MultiGetQueryTraceRecord& record,
|
|
std::unique_ptr<TraceRecordResult>* result) {
|
|
if (result != nullptr) {
|
|
result->reset(nullptr);
|
|
}
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
handles.reserve(record.GetColumnFamilyIDs().size());
|
|
for (uint32_t cf_id : record.GetColumnFamilyIDs()) {
|
|
auto it = cf_map_.find(cf_id);
|
|
if (it == cf_map_.end()) {
|
|
return Status::Corruption("Invalid Column Family ID.");
|
|
}
|
|
handles.push_back(it->second);
|
|
}
|
|
|
|
std::vector<Slice> keys = record.GetKeys();
|
|
|
|
if (handles.empty() || keys.empty()) {
|
|
return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
|
|
}
|
|
if (handles.size() != keys.size()) {
|
|
return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch.");
|
|
}
|
|
|
|
uint64_t start = clock_->NowMicros();
|
|
|
|
std::vector<std::string> values;
|
|
std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values);
|
|
|
|
uint64_t end = clock_->NowMicros();
|
|
|
|
// Treat not found as ok, return other errors.
|
|
for (const Status& s : ss) {
|
|
if (!s.ok() && !s.IsNotFound()) {
|
|
return s;
|
|
}
|
|
}
|
|
|
|
if (result != nullptr) {
|
|
// Report the actual opetation status in TraceExecutionResult
|
|
result->reset(new MultiValuesTraceExecutionResult(
|
|
std::move(ss), std::move(values), start, end, record.GetTraceType()));
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|