diff --git a/CMakeLists.txt b/CMakeLists.txt index a14d2dde7f..1d78743d95 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -819,6 +819,7 @@ set(SOURCES trace_replay/block_cache_tracer.cc trace_replay/io_tracer.cc trace_replay/trace_record_handler.cc + trace_replay/trace_record_result.cc trace_replay/trace_record.cc trace_replay/trace_replay.cc util/coding.cc diff --git a/HISTORY.md b/HISTORY.md index 4de77d6f7a..48c9f1e015 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -22,7 +22,7 @@ * The integrated BlobDB implementation now supports the tickers `BLOB_DB_BLOB_FILE_BYTES_READ`, `BLOB_DB_GC_NUM_KEYS_RELOCATED`, and `BLOB_DB_GC_BYTES_RELOCATED`, as well as the histograms `BLOB_DB_COMPRESSION_MICROS` and `BLOB_DB_DECOMPRESSION_MICROS`. ## Public API change -* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Added `TraceReader::Reset()` to restart reading a trace file. Created trace_record.h and utilities/replayer.h files to access decoded Trace records and replay them. +* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Added `TraceReader::Reset()` to restart reading a trace file. Created trace_record.h, trace_record_result.h and utilities/replayer.h files to access the decoded Trace records, replay them, and query the actual operation results. ### Performance Improvements * Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value. diff --git a/TARGETS b/TARGETS index 0f50b4fa85..b5b992b6b6 100644 --- a/TARGETS +++ b/TARGETS @@ -337,6 +337,7 @@ cpp_library( "trace_replay/io_tracer.cc", "trace_replay/trace_record.cc", "trace_replay/trace_record_handler.cc", + "trace_replay/trace_record_result.cc", "trace_replay/trace_replay.cc", "util/build_version.cc", "util/coding.cc", @@ -655,6 +656,7 @@ cpp_library( "trace_replay/io_tracer.cc", "trace_replay/trace_record.cc", "trace_replay/trace_record_handler.cc", + "trace_replay/trace_record_result.cc", "trace_replay/trace_replay.cc", "util/build_version.cc", "util/coding.cc", diff --git a/db/db_test2.cc b/db/db_test2.cc index c6e807c1ea..c09ea4c71c 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -6,6 +6,7 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. + #include #include #include @@ -17,6 +18,8 @@ #include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/persistent_cache.h" +#include "rocksdb/trace_record.h" +#include "rocksdb/trace_record_result.h" #include "rocksdb/utilities/replayer.h" #include "rocksdb/wal_filter.h" #include "util/random.h" @@ -4236,6 +4239,106 @@ TEST_F(DBTest2, TestNumPread) { ASSERT_EQ(0, env_->random_file_open_counter_.load()); } +class TraceExecutionResultHandler : public TraceRecordResult::Handler { + public: + TraceExecutionResultHandler() {} + ~TraceExecutionResultHandler() override {} + + virtual Status Handle(const StatusOnlyTraceExecutionResult& result) override { + if (result.GetStartTimestamp() > result.GetEndTimestamp()) { + return Status::InvalidArgument("Invalid timestamps."); + } + result.GetStatus().PermitUncheckedError(); + switch (result.GetTraceType()) { + case kTraceWrite: { + total_latency_ += result.GetLatency(); + cnt_++; + writes_++; + break; + } + case kTraceIteratorSeek: + case kTraceIteratorSeekForPrev: { + total_latency_ += result.GetLatency(); + cnt_++; + seeks_++; + break; + } + default: + return Status::Corruption("Type mismatch."); + } + return Status::OK(); + } + + virtual Status Handle( + const SingleValueTraceExecutionResult& result) override { + if (result.GetStartTimestamp() > result.GetEndTimestamp()) { + return Status::InvalidArgument("Invalid timestamps."); + } + result.GetStatus().PermitUncheckedError(); + switch (result.GetTraceType()) { + case kTraceGet: { + total_latency_ += result.GetLatency(); + cnt_++; + gets_++; + break; + } + default: + return Status::Corruption("Type mismatch."); + } + return Status::OK(); + } + + virtual Status Handle( + const MultiValuesTraceExecutionResult& result) override { + if (result.GetStartTimestamp() > result.GetEndTimestamp()) { + return Status::InvalidArgument("Invalid timestamps."); + } + for (const Status& s : result.GetMultiStatus()) { + s.PermitUncheckedError(); + } + switch (result.GetTraceType()) { + case kTraceMultiGet: { + total_latency_ += result.GetLatency(); + cnt_++; + multigets_++; + break; + } + default: + return Status::Corruption("Type mismatch."); + } + return Status::OK(); + } + + void Reset() { + total_latency_ = 0; + cnt_ = 0; + writes_ = 0; + gets_ = 0; + seeks_ = 0; + multigets_ = 0; + } + + double GetAvgLatency() const { + return cnt_ == 0 ? 0.0 : 1.0 * total_latency_ / cnt_; + } + + int GetNumWrites() const { return writes_; } + + int GetNumGets() const { return gets_; } + + int GetNumIterSeeks() const { return seeks_; } + + int GetNumMultiGets() const { return multigets_; } + + private: + std::atomic total_latency_{0}; + std::atomic cnt_{0}; + std::atomic writes_{0}; + std::atomic gets_{0}; + std::atomic seeks_{0}; + std::atomic multigets_{0}; +}; + TEST_F(DBTest2, TraceAndReplay) { Options options = CurrentOptions(); options.merge_operator = MergeOperators::CreatePutOperator(); @@ -4254,12 +4357,14 @@ TEST_F(DBTest2, TraceAndReplay) { ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer)); ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer))); + // 5 Writes ASSERT_OK(Put(0, "a", "1")); ASSERT_OK(Merge(0, "b", "2")); ASSERT_OK(Delete(0, "c")); ASSERT_OK(SingleDelete(0, "d")); ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f")); + // 6th Write WriteBatch batch; ASSERT_OK(batch.Put("f", "11")); ASSERT_OK(batch.Merge("g", "12")); @@ -4268,19 +4373,23 @@ TEST_F(DBTest2, TraceAndReplay) { ASSERT_OK(batch.DeleteRange("j", "k")); ASSERT_OK(db_->Write(wo, &batch)); + // 2 Seek(ForPrev)s single_iter = db_->NewIterator(ro); - single_iter->Seek("f"); + single_iter->Seek("f"); // Seek 1 single_iter->SeekForPrev("g"); ASSERT_OK(single_iter->status()); delete single_iter; + // 2 Gets ASSERT_EQ("1", Get(0, "a")); ASSERT_EQ("12", Get(0, "g")); + // 7th and 8th Write, 3rd Get ASSERT_OK(Put(1, "foo", "bar")); ASSERT_OK(Put(1, "rocksdb", "rocks")); ASSERT_EQ("NOT_FOUND", Get(1, "leveldb")); + // Total Write x 8, Get x 3, Seek x 2. ASSERT_OK(db_->EndTrace()); // These should not get into the trace file as it is after EndTrace. ASSERT_OK(Put("hello", "world")); @@ -4324,13 +4433,30 @@ TEST_F(DBTest2, TraceAndReplay) { std::unique_ptr replayer; ASSERT_OK( db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); + + TraceExecutionResultHandler res_handler; + std::function &&)> res_cb = + [&res_handler](Status exec_s, std::unique_ptr&& res) { + ASSERT_TRUE(exec_s.ok() || exec_s.IsNotSupported()); + if (res != nullptr) { + ASSERT_OK(res->Accept(&res_handler)); + res.reset(); + } + }; + // Unprepared replay should fail with Status::Incomplete() - ASSERT_TRUE(replayer->Replay().IsIncomplete()); + ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete()); ASSERT_OK(replayer->Prepare()); // Ok to repeatedly Prepare(). ASSERT_OK(replayer->Prepare()); // Replay using 1 thread, 1x speed. - ASSERT_OK(replayer->Replay()); + ASSERT_OK(replayer->Replay(ReplayOptions(1, 1.0), res_cb)); + ASSERT_GT(res_handler.GetAvgLatency(), 0.0); + ASSERT_EQ(res_handler.GetNumWrites(), 8); + ASSERT_EQ(res_handler.GetNumGets(), 3); + ASSERT_EQ(res_handler.GetNumIterSeeks(), 2); + ASSERT_EQ(res_handler.GetNumMultiGets(), 0); + res_handler.Reset(); ASSERT_OK(db2->Get(ro, handles[0], "a", &value)); ASSERT_EQ("1", value); @@ -4346,15 +4472,28 @@ TEST_F(DBTest2, TraceAndReplay) { // Re-replay should fail with Status::Incomplete() if Prepare() was not // called. Currently we don't distinguish between unprepared and trace end. - ASSERT_TRUE(replayer->Replay().IsIncomplete()); + ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete()); // Re-replay using 2 threads, 2x speed. ASSERT_OK(replayer->Prepare()); - ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0))); + ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0), res_cb)); + ASSERT_GT(res_handler.GetAvgLatency(), 0.0); + ASSERT_EQ(res_handler.GetNumWrites(), 8); + ASSERT_EQ(res_handler.GetNumGets(), 3); + ASSERT_EQ(res_handler.GetNumIterSeeks(), 2); + ASSERT_EQ(res_handler.GetNumMultiGets(), 0); + res_handler.Reset(); // Re-replay using 2 threads, 1/2 speed. ASSERT_OK(replayer->Prepare()); - ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5))); + ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5), res_cb)); + ASSERT_GT(res_handler.GetAvgLatency(), 0.0); + ASSERT_EQ(res_handler.GetNumWrites(), 8); + ASSERT_EQ(res_handler.GetNumGets(), 3); + ASSERT_EQ(res_handler.GetNumIterSeeks(), 2); + ASSERT_EQ(res_handler.GetNumMultiGets(), 0); + res_handler.Reset(); + replayer.reset(); for (auto handle : handles) { @@ -4408,6 +4547,7 @@ TEST_F(DBTest2, TraceAndManualReplay) { ASSERT_OK(Put(1, "rocksdb", "rocks")); ASSERT_EQ("NOT_FOUND", Get(1, "leveldb")); + // Same as TraceAndReplay, Write x 8, Get x 3, Seek x 2. ASSERT_OK(db_->EndTrace()); // These should not get into the trace file as it is after EndTrace. ASSERT_OK(Put("hello", "world")); @@ -4452,8 +4592,11 @@ TEST_F(DBTest2, TraceAndManualReplay) { ASSERT_OK( db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); + TraceExecutionResultHandler res_handler; + // Manual replay for 2 times. The 2nd checks if the replay can restart. std::unique_ptr record; + std::unique_ptr result; for (int i = 0; i < 2; i++) { // Next should fail if unprepared. ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete()); @@ -4467,13 +4610,23 @@ TEST_F(DBTest2, TraceAndManualReplay) { continue; } if (s.ok()) { - ASSERT_OK(replayer->Execute(std::move(record))); + ASSERT_OK(replayer->Execute(record, &result)); + if (result != nullptr) { + ASSERT_OK(result->Accept(&res_handler)); + result.reset(); + } } } // Status::Incomplete() will be returned when manually reading the trace // end, or Prepare() was not called. ASSERT_TRUE(s.IsIncomplete()); ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete()); + ASSERT_GT(res_handler.GetAvgLatency(), 0.0); + ASSERT_EQ(res_handler.GetNumWrites(), 8); + ASSERT_EQ(res_handler.GetNumGets(), 3); + ASSERT_EQ(res_handler.GetNumIterSeeks(), 2); + ASSERT_EQ(res_handler.GetNumMultiGets(), 0); + res_handler.Reset(); } ASSERT_OK(db2->Get(ro, handles[0], "a", &value)); @@ -4495,25 +4648,44 @@ TEST_F(DBTest2, TraceAndManualReplay) { ASSERT_OK(batch.Put("trace-record-write1", "write1")); ASSERT_OK(batch.Put("trace-record-write2", "write2")); record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++)); - ASSERT_OK(replayer->Execute(std::move(record))); + ASSERT_OK(replayer->Execute(record, &result)); + ASSERT_TRUE(result != nullptr); + ASSERT_OK(result->Accept(&res_handler)); // Write x 1 ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value)); ASSERT_EQ("write1", value); ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value)); ASSERT_EQ("write2", value); + ASSERT_GT(res_handler.GetAvgLatency(), 0.0); + ASSERT_EQ(res_handler.GetNumWrites(), 1); + ASSERT_EQ(res_handler.GetNumGets(), 0); + ASSERT_EQ(res_handler.GetNumIterSeeks(), 0); + ASSERT_EQ(res_handler.GetNumMultiGets(), 0); + res_handler.Reset(); // Get related // Get an existing key. record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-write1", fake_ts++)); - ASSERT_OK(replayer->Execute(std::move(record))); + ASSERT_OK(replayer->Execute(record, &result)); + ASSERT_TRUE(result != nullptr); + ASSERT_OK(result->Accept(&res_handler)); // Get x 1 // Get an non-existing key, should still return Status::OK(). record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get", fake_ts++)); - ASSERT_OK(replayer->Execute(std::move(record))); + ASSERT_OK(replayer->Execute(record, &result)); + ASSERT_TRUE(result != nullptr); + ASSERT_OK(result->Accept(&res_handler)); // Get x 2 // Get from an invalid (non-existing) cf_id. uint32_t invalid_cf_id = handles[1]->GetID() + 1; record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++)); - ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption()); + ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption()); + ASSERT_TRUE(result == nullptr); + ASSERT_GT(res_handler.GetAvgLatency(), 0.0); + ASSERT_EQ(res_handler.GetNumWrites(), 0); + ASSERT_EQ(res_handler.GetNumGets(), 2); + ASSERT_EQ(res_handler.GetNumIterSeeks(), 0); + ASSERT_EQ(res_handler.GetNumMultiGets(), 0); + res_handler.Reset(); // Iteration related for (IteratorSeekQueryTraceRecord::SeekType seekType : @@ -4522,48 +4694,82 @@ TEST_F(DBTest2, TraceAndManualReplay) { // Seek to an existing key. record.reset(new IteratorSeekQueryTraceRecord( seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++)); - ASSERT_OK(replayer->Execute(std::move(record))); + ASSERT_OK(replayer->Execute(record, &result)); + ASSERT_TRUE(result != nullptr); + ASSERT_OK(result->Accept(&res_handler)); // Seek x 1 in one iteration // Seek to an non-existing key, should still return Status::OK(). record.reset(new IteratorSeekQueryTraceRecord( seekType, handles[0]->GetID(), "trace-record-get", fake_ts++)); - ASSERT_OK(replayer->Execute(std::move(record))); + ASSERT_OK(replayer->Execute(record, &result)); + ASSERT_TRUE(result != nullptr); + ASSERT_OK(result->Accept(&res_handler)); // Seek x 2 in one iteration // Seek from an invalid cf_id. record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id, "whatever", fake_ts++)); - ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption()); + ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption()); + ASSERT_TRUE(result == nullptr); } + ASSERT_GT(res_handler.GetAvgLatency(), 0.0); + ASSERT_EQ(res_handler.GetNumWrites(), 0); + ASSERT_EQ(res_handler.GetNumGets(), 0); + ASSERT_EQ(res_handler.GetNumIterSeeks(), 4); // Seek x 2 in two iterations + ASSERT_EQ(res_handler.GetNumMultiGets(), 0); + res_handler.Reset(); // MultiGet related // Get existing keys. record.reset(new MultiGetQueryTraceRecord( std::vector({handles[0]->GetID(), handles[1]->GetID()}), std::vector({"a", "foo"}), fake_ts++)); - ASSERT_OK(replayer->Execute(std::move(record))); + ASSERT_OK(replayer->Execute(record, &result)); + ASSERT_TRUE(result != nullptr); + ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 1 // Get all non-existing keys, should still return Status::OK(). record.reset(new MultiGetQueryTraceRecord( std::vector({handles[0]->GetID(), handles[1]->GetID()}), std::vector({"no1", "no2"}), fake_ts++)); + ASSERT_OK(replayer->Execute(record, &result)); + ASSERT_TRUE(result != nullptr); + ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 2 // Get mixed of existing and non-existing keys, should still return // Status::OK(). record.reset(new MultiGetQueryTraceRecord( std::vector({handles[0]->GetID(), handles[1]->GetID()}), std::vector({"a", "no2"}), fake_ts++)); - ASSERT_OK(replayer->Execute(std::move(record))); + ASSERT_OK(replayer->Execute(record, &result)); + ASSERT_TRUE(result != nullptr); + MultiValuesTraceExecutionResult* mvr = + dynamic_cast(result.get()); + ASSERT_TRUE(mvr != nullptr); + ASSERT_OK(mvr->GetMultiStatus()[0]); + ASSERT_TRUE(mvr->GetMultiStatus()[1].IsNotFound()); + ASSERT_EQ(mvr->GetValues()[0], "1"); + ASSERT_EQ(mvr->GetValues()[1], ""); + ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 3 // Get from an invalid (non-existing) cf_id. record.reset(new MultiGetQueryTraceRecord( std::vector( {handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}), std::vector({"a", "foo", "whatever"}), fake_ts++)); - ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption()); + ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption()); + ASSERT_TRUE(result == nullptr); // Empty MultiGet record.reset(new MultiGetQueryTraceRecord( std::vector(), std::vector(), fake_ts++)); - ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument()); + ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument()); + ASSERT_TRUE(result == nullptr); // MultiGet size mismatch record.reset(new MultiGetQueryTraceRecord( std::vector({handles[0]->GetID(), handles[1]->GetID()}), std::vector({"a"}), fake_ts++)); - ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument()); + ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument()); + ASSERT_TRUE(result == nullptr); + ASSERT_GT(res_handler.GetAvgLatency(), 0.0); + ASSERT_EQ(res_handler.GetNumWrites(), 0); + ASSERT_EQ(res_handler.GetNumGets(), 0); + ASSERT_EQ(res_handler.GetNumIterSeeks(), 0); + ASSERT_EQ(res_handler.GetNumMultiGets(), 3); + res_handler.Reset(); replayer.reset(); @@ -4634,7 +4840,7 @@ TEST_F(DBTest2, TraceWithLimit) { ASSERT_OK( db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); ASSERT_OK(replayer->Prepare()); - ASSERT_OK(replayer->Replay()); + ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr)); replayer.reset(); ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); @@ -4709,7 +4915,7 @@ TEST_F(DBTest2, TraceWithSampling) { ASSERT_OK( db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); ASSERT_OK(replayer->Prepare()); - ASSERT_OK(replayer->Replay()); + ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr)); replayer.reset(); ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); @@ -4813,7 +5019,7 @@ TEST_F(DBTest2, TraceWithFilter) { ASSERT_OK( db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); ASSERT_OK(replayer->Prepare()); - ASSERT_OK(replayer->Replay()); + ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr)); replayer.reset(); // All the key-values should not present since we filter out the WRITE ops. diff --git a/include/rocksdb/trace_record.h b/include/rocksdb/trace_record.h index f715a4396d..3f591d3d15 100644 --- a/include/rocksdb/trace_record.h +++ b/include/rocksdb/trace_record.h @@ -5,17 +5,18 @@ #pragma once +#include #include #include #include "rocksdb/rocksdb_namespace.h" #include "rocksdb/slice.h" +#include "rocksdb/status.h" namespace ROCKSDB_NAMESPACE { class ColumnFamilyHandle; class DB; -class Status; // Supported trace record types. enum TraceType : char { @@ -41,40 +42,55 @@ enum TraceType : char { kTraceMax, }; -class WriteQueryTraceRecord; class GetQueryTraceRecord; class IteratorSeekQueryTraceRecord; class MultiGetQueryTraceRecord; +class TraceRecordResult; +class WriteQueryTraceRecord; // Base class for all types of trace records. class TraceRecord { public: - TraceRecord(); explicit TraceRecord(uint64_t timestamp); - virtual ~TraceRecord(); + virtual ~TraceRecord() = default; + + // Type of the trace record. virtual TraceType GetTraceType() const = 0; + // Timestamp (in microseconds) of this trace. virtual uint64_t GetTimestamp() const; class Handler { public: - virtual ~Handler() {} + virtual ~Handler() = default; - virtual Status Handle(const WriteQueryTraceRecord& record) = 0; - virtual Status Handle(const GetQueryTraceRecord& record) = 0; - virtual Status Handle(const IteratorSeekQueryTraceRecord& record) = 0; - virtual Status Handle(const MultiGetQueryTraceRecord& record) = 0; + // Handle WriteQueryTraceRecord + virtual Status Handle(const WriteQueryTraceRecord& record, + std::unique_ptr* result) = 0; + + // Handle GetQueryTraceRecord + virtual Status Handle(const GetQueryTraceRecord& record, + std::unique_ptr* result) = 0; + + // Handle IteratorSeekQueryTraceRecord + virtual Status Handle(const IteratorSeekQueryTraceRecord& record, + std::unique_ptr* result) = 0; + + // Handle MultiGetQueryTraceRecord + virtual Status Handle(const MultiGetQueryTraceRecord& record, + std::unique_ptr* result) = 0; }; - virtual Status Accept(Handler* handler) = 0; + // Accept the handler and report the corresponding result in `result`. + virtual Status Accept(Handler* handler, + std::unique_ptr* result) = 0; // Create a handler for the exeution of TraceRecord. static Handler* NewExecutionHandler( DB* db, const std::vector& handles); private: - // Timestamp (in microseconds) of this trace. uint64_t timestamp_; }; @@ -82,8 +98,6 @@ class TraceRecord { class QueryTraceRecord : public TraceRecord { public: explicit QueryTraceRecord(uint64_t timestamp); - - virtual ~QueryTraceRecord() override; }; // Trace record for DB::Write() operation. @@ -97,9 +111,11 @@ class WriteQueryTraceRecord : public QueryTraceRecord { TraceType GetTraceType() const override { return kTraceWrite; } + // rep string for the WriteBatch. virtual Slice GetWriteBatchRep() const; - virtual Status Accept(Handler* handler) override; + Status Accept(Handler* handler, + std::unique_ptr* result) override; private: PinnableSlice rep_; @@ -118,16 +134,17 @@ class GetQueryTraceRecord : public QueryTraceRecord { TraceType GetTraceType() const override { return kTraceGet; } + // Column family ID. virtual uint32_t GetColumnFamilyID() const; + // Key to get. virtual Slice GetKey() const; - virtual Status Accept(Handler* handler) override; + Status Accept(Handler* handler, + std::unique_ptr* result) override; private: - // Column family ID. uint32_t cf_id_; - // Key to get. PinnableSlice key_; }; @@ -135,8 +152,6 @@ class GetQueryTraceRecord : public QueryTraceRecord { class IteratorQueryTraceRecord : public QueryTraceRecord { public: explicit IteratorQueryTraceRecord(uint64_t timestamp); - - virtual ~IteratorQueryTraceRecord() override; }; // Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation. @@ -156,21 +171,24 @@ class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord { virtual ~IteratorSeekQueryTraceRecord() override; + // Trace type matches the seek type. TraceType GetTraceType() const override; + // Type of seek, Seek or SeekForPrev. virtual SeekType GetSeekType() const; + // Column family ID. virtual uint32_t GetColumnFamilyID() const; + // Key to seek to. virtual Slice GetKey() const; - virtual Status Accept(Handler* handler) override; + Status Accept(Handler* handler, + std::unique_ptr* result) override; private: SeekType type_; - // Column family ID. uint32_t cf_id_; - // Key to seek to. PinnableSlice key_; }; @@ -189,16 +207,17 @@ class MultiGetQueryTraceRecord : public QueryTraceRecord { TraceType GetTraceType() const override { return kTraceMultiGet; } + // Column familiy IDs. virtual std::vector GetColumnFamilyIDs() const; + // Keys to get. virtual std::vector GetKeys() const; - virtual Status Accept(Handler* handler) override; + Status Accept(Handler* handler, + std::unique_ptr* result) override; private: - // Column familiy IDs. std::vector cf_ids_; - // Keys to get. std::vector keys_; }; diff --git a/include/rocksdb/trace_record_result.h b/include/rocksdb/trace_record_result.h new file mode 100644 index 0000000000..7924c6de39 --- /dev/null +++ b/include/rocksdb/trace_record_result.h @@ -0,0 +1,178 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" +#include "rocksdb/trace_record.h" + +namespace ROCKSDB_NAMESPACE { + +class MultiValuesTraceExecutionResult; +class SingleValueTraceExecutionResult; +class StatusOnlyTraceExecutionResult; + +// Base class for the results of all types of trace records. +// Theses classes can be used to report the execution result of +// TraceRecord::Handler::Handle() or TraceRecord::Accept(). +class TraceRecordResult { + public: + explicit TraceRecordResult(TraceType trace_type); + + virtual ~TraceRecordResult() = default; + + // Trace type of the corresponding TraceRecord. + virtual TraceType GetTraceType() const; + + class Handler { + public: + virtual ~Handler() = default; + + // Handle StatusOnlyTraceExecutionResult + virtual Status Handle(const StatusOnlyTraceExecutionResult& result) = 0; + + // Handle SingleValueTraceExecutionResult + virtual Status Handle(const SingleValueTraceExecutionResult& result) = 0; + + // Handle MultiValuesTraceExecutionResult + virtual Status Handle(const MultiValuesTraceExecutionResult& result) = 0; + }; + + /* + * Example handler to just print the trace record execution results. + * + * class ResultPrintHandler : public TraceRecordResult::Handler { + * public: + * ResultPrintHandler(); + * ~ResultPrintHandler() override {} + * + * Status Handle(const StatusOnlyTraceExecutionResult& result) override { + * std::cout << "Status: " << result.GetStatus().ToString() << std::endl; + * } + * + * Status Handle(const SingleValueTraceExecutionResult& result) override { + * std::cout << "Status: " << result.GetStatus().ToString() + * << ", value: " << result.GetValue() << std::endl; + * } + * + * Status Handle(const MultiValuesTraceExecutionResult& result) override { + * size_t size = result.GetMultiStatus().size(); + * for (size_t i = 0; i < size; i++) { + * std::cout << "Status: " << result.GetMultiStatus()[i].ToString() + * << ", value: " << result.GetValues()[i] << std::endl; + * } + * } + * }; + * */ + + // Accept the handler. + virtual Status Accept(Handler* handler) = 0; + + private: + TraceType trace_type_; +}; + +// Base class for the results from the trace record execution handler (created +// by TraceRecord::NewExecutionHandler()). +// +// The actual execution status or returned values may be hidden from +// TraceRecord::Handler::Handle and TraceRecord::Accept. For example, a +// GetQueryTraceRecord's execution calls DB::Get() internally. DB::Get() may +// return Status::NotFound() but TraceRecord::Handler::Handle() or +// TraceRecord::Accept() will still return Status::OK(). The actual status from +// DB::Get() and the returned value string may be saved in a +// SingleValueTraceExecutionResult. +class TraceExecutionResult : public TraceRecordResult { + public: + TraceExecutionResult(uint64_t start_timestamp, uint64_t end_timestamp, + TraceType trace_type); + + // Execution start/end timestamps and request latency in microseconds. + virtual uint64_t GetStartTimestamp() const; + virtual uint64_t GetEndTimestamp() const; + inline uint64_t GetLatency() const { + return GetEndTimestamp() - GetStartTimestamp(); + } + + private: + uint64_t ts_start_; + uint64_t ts_end_; +}; + +// Result for operations that only return a single Status. +// Example operations: DB::Write(), Iterator::Seek() and +// Iterator::SeekForPrev(). +class StatusOnlyTraceExecutionResult : public TraceExecutionResult { + public: + StatusOnlyTraceExecutionResult(Status status, uint64_t start_timestamp, + uint64_t end_timestamp, TraceType trace_type); + + virtual ~StatusOnlyTraceExecutionResult() override = default; + + // Return value of DB::Write(), etc. + virtual const Status& GetStatus() const; + + virtual Status Accept(Handler* handler) override; + + private: + Status status_; +}; + +// Result for operations that return a Status and a value. +// Example operation: DB::Get() +class SingleValueTraceExecutionResult : public TraceExecutionResult { + public: + SingleValueTraceExecutionResult(Status status, const std::string& value, + uint64_t start_timestamp, + uint64_t end_timestamp, TraceType trace_type); + + SingleValueTraceExecutionResult(Status status, std::string&& value, + uint64_t start_timestamp, + uint64_t end_timestamp, TraceType trace_type); + + virtual ~SingleValueTraceExecutionResult() override; + + // Return status of DB::Get(), etc. + virtual const Status& GetStatus() const; + + // Value for the searched key. + virtual const std::string& GetValue() const; + + virtual Status Accept(Handler* handler) override; + + private: + Status status_; + std::string value_; +}; + +// Result for operations that return multiple Status(es) and values. +// Example operation: DB::MultiGet() +class MultiValuesTraceExecutionResult : public TraceExecutionResult { + public: + MultiValuesTraceExecutionResult(std::vector multi_status, + std::vector values, + uint64_t start_timestamp, + uint64_t end_timestamp, TraceType trace_type); + + virtual ~MultiValuesTraceExecutionResult() override; + + // Returned Status(es) of DB::MultiGet(), etc. + virtual const std::vector& GetMultiStatus() const; + + // Returned values for the searched keys. + virtual const std::vector& GetValues() const; + + virtual Status Accept(Handler* handler) override; + + private: + std::vector multi_status_; + std::vector values_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/utilities/replayer.h b/include/rocksdb/utilities/replayer.h index 976fadb689..4fdd8d73a7 100644 --- a/include/rocksdb/utilities/replayer.h +++ b/include/rocksdb/utilities/replayer.h @@ -6,14 +6,17 @@ #pragma once #ifndef ROCKSDB_LITE +#include #include #include "rocksdb/rocksdb_namespace.h" #include "rocksdb/status.h" -#include "rocksdb/trace_record.h" namespace ROCKSDB_NAMESPACE { +class TraceRecord; +class TraceRecordResult; + struct ReplayOptions { // Number of threads used for replaying. If 0 or 1, replay using // single thread. @@ -27,6 +30,7 @@ struct ReplayOptions { double fast_forward; ReplayOptions() : num_threads(1), fast_forward(1.0) {} + ReplayOptions(uint32_t num_of_threads, double fast_forward_ratio) : num_threads(num_of_threads), fast_forward(fast_forward_ratio) {} }; @@ -36,7 +40,7 @@ struct ReplayOptions { // instantiated via db_bench today, on using "replay" benchmark. class Replayer { public: - virtual ~Replayer() {} + virtual ~Replayer() = default; // Make some preparation before replaying the trace. This will also reset the // replayer in order to restart replaying. @@ -61,13 +65,22 @@ class Replayer { // trace; // Status::NotSupported() if the operation is not supported; // Otherwise, return the corresponding error status. - virtual Status Execute(const std::unique_ptr& record) = 0; - virtual Status Execute(std::unique_ptr&& record) = 0; + // + // The actual operation execution status and result(s) will be saved in + // result. For example, a GetQueryTraceRecord will have its DB::Get() status + // and the returned value saved in a SingleValueTraceExecutionResult. + virtual Status Execute(const std::unique_ptr& record, + std::unique_ptr* result) = 0; // Replay all the traces from the provided trace stream, taking the delay // between the traces into consideration. - virtual Status Replay(const ReplayOptions& options) = 0; - virtual Status Replay() { return Replay(ReplayOptions()); } + // + // result_callback reports the status of executing a trace record, and the + // actual operation execution result (See the description for Execute()). + virtual Status Replay( + const ReplayOptions& options, + const std::function&&)>& + result_callback) = 0; }; } // namespace ROCKSDB_NAMESPACE diff --git a/src.mk b/src.mk index b2c83f048e..28d8380ad5 100644 --- a/src.mk +++ b/src.mk @@ -199,6 +199,7 @@ LIB_SOURCES = \ test_util/transaction_test_util.cc \ tools/dump/db_dump_tool.cc \ trace_replay/trace_record_handler.cc \ + trace_replay/trace_record_result.cc \ trace_replay/trace_record.cc \ trace_replay/trace_replay.cc \ trace_replay/block_cache_tracer.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 89d7505362..bf6e0c1c06 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -8027,7 +8027,8 @@ class Benchmark { } s = replayer->Replay( ReplayOptions(static_cast(FLAGS_trace_replay_threads), - FLAGS_trace_replay_fast_forward)); + FLAGS_trace_replay_fast_forward), + nullptr); replayer.reset(); if (s.ok()) { fprintf(stdout, "Replay completed from trace_file: %s\n", diff --git a/trace_replay/trace_record.cc b/trace_replay/trace_record.cc index 75afcf37ed..e0ce020904 100644 --- a/trace_replay/trace_record.cc +++ b/trace_replay/trace_record.cc @@ -11,6 +11,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/options.h" #include "rocksdb/status.h" +#include "rocksdb/trace_record_result.h" #include "trace_replay/trace_record_handler.h" namespace ROCKSDB_NAMESPACE { @@ -18,8 +19,6 @@ namespace ROCKSDB_NAMESPACE { // TraceRecord TraceRecord::TraceRecord(uint64_t timestamp) : timestamp_(timestamp) {} -TraceRecord::~TraceRecord() {} - uint64_t TraceRecord::GetTimestamp() const { return timestamp_; } TraceRecord::Handler* TraceRecord::NewExecutionHandler( @@ -31,8 +30,6 @@ TraceRecord::Handler* TraceRecord::NewExecutionHandler( QueryTraceRecord::QueryTraceRecord(uint64_t timestamp) : TraceRecord(timestamp) {} -QueryTraceRecord::~QueryTraceRecord() {} - // WriteQueryTraceRecord WriteQueryTraceRecord::WriteQueryTraceRecord(PinnableSlice&& write_batch_rep, uint64_t timestamp) @@ -44,13 +41,14 @@ WriteQueryTraceRecord::WriteQueryTraceRecord(const std::string& write_batch_rep, rep_.PinSelf(write_batch_rep); } -WriteQueryTraceRecord::~WriteQueryTraceRecord() {} +WriteQueryTraceRecord::~WriteQueryTraceRecord() { rep_.clear(); } Slice WriteQueryTraceRecord::GetWriteBatchRep() const { return Slice(rep_); } -Status WriteQueryTraceRecord::Accept(Handler* handler) { +Status WriteQueryTraceRecord::Accept( + Handler* handler, std::unique_ptr* result) { assert(handler != nullptr); - return handler->Handle(*this); + return handler->Handle(*this, result); } // GetQueryTraceRecord @@ -68,23 +66,22 @@ GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id, key_.PinSelf(key); } -GetQueryTraceRecord::~GetQueryTraceRecord() {} +GetQueryTraceRecord::~GetQueryTraceRecord() { key_.clear(); } uint32_t GetQueryTraceRecord::GetColumnFamilyID() const { return cf_id_; } Slice GetQueryTraceRecord::GetKey() const { return Slice(key_); } -Status GetQueryTraceRecord::Accept(Handler* handler) { +Status GetQueryTraceRecord::Accept(Handler* handler, + std::unique_ptr* result) { assert(handler != nullptr); - return handler->Handle(*this); + return handler->Handle(*this, result); } // IteratorQueryTraceRecord IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp) : QueryTraceRecord(timestamp) {} -IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {} - // IteratorSeekQueryTraceRecord IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key, @@ -103,7 +100,7 @@ IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( key_.PinSelf(key); } -IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() {} +IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() { key_.clear(); } TraceType IteratorSeekQueryTraceRecord::GetTraceType() const { return static_cast(type_); @@ -120,9 +117,10 @@ uint32_t IteratorSeekQueryTraceRecord::GetColumnFamilyID() const { Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); } -Status IteratorSeekQueryTraceRecord::Accept(Handler* handler) { +Status IteratorSeekQueryTraceRecord::Accept( + Handler* handler, std::unique_ptr* result) { assert(handler != nullptr); - return handler->Handle(*this); + return handler->Handle(*this, result); } // MultiGetQueryTraceRecord @@ -145,7 +143,10 @@ MultiGetQueryTraceRecord::MultiGetQueryTraceRecord( } } -MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {} +MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() { + cf_ids_.clear(); + keys_.clear(); +} std::vector MultiGetQueryTraceRecord::GetColumnFamilyIDs() const { return cf_ids_; @@ -155,9 +156,10 @@ std::vector MultiGetQueryTraceRecord::GetKeys() const { return std::vector(keys_.begin(), keys_.end()); } -Status MultiGetQueryTraceRecord::Accept(Handler* handler) { +Status MultiGetQueryTraceRecord::Accept( + Handler* handler, std::unique_ptr* result) { assert(handler != nullptr); - return handler->Handle(*this); + return handler->Handle(*this, result); } } // namespace ROCKSDB_NAMESPACE diff --git a/trace_replay/trace_record_handler.cc b/trace_replay/trace_record_handler.cc index 3651d0fe20..6343d2ed35 100644 --- a/trace_replay/trace_record_handler.cc +++ b/trace_replay/trace_record_handler.cc @@ -6,6 +6,7 @@ #include "trace_replay/trace_record_handler.h" #include "rocksdb/iterator.h" +#include "rocksdb/trace_record_result.h" #include "rocksdb/write_batch.h" namespace ROCKSDB_NAMESPACE { @@ -16,7 +17,8 @@ TraceExecutionHandler::TraceExecutionHandler( : TraceRecord::Handler(), db_(db), write_opts_(WriteOptions()), - read_opts_(ReadOptions()) { + read_opts_(ReadOptions()), + clock_(SystemClock::Default()) { assert(db != nullptr); assert(!handles.empty()); cf_map_.reserve(handles.size()); @@ -28,26 +30,64 @@ TraceExecutionHandler::TraceExecutionHandler( TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); } -Status TraceExecutionHandler::Handle(const WriteQueryTraceRecord& record) { +Status TraceExecutionHandler::Handle( + const WriteQueryTraceRecord& record, + std::unique_ptr* result) { + if (result != nullptr) { + result->reset(nullptr); + } + uint64_t start = clock_->NowMicros(); + WriteBatch batch(record.GetWriteBatchRep().ToString()); - return db_->Write(write_opts_, &batch); + Status s = db_->Write(write_opts_, &batch); + + uint64_t end = clock_->NowMicros(); + + if (s.ok() && result != nullptr) { + result->reset(new StatusOnlyTraceExecutionResult(s, start, end, + record.GetTraceType())); + } + + return s; } -Status TraceExecutionHandler::Handle(const GetQueryTraceRecord& record) { +Status TraceExecutionHandler::Handle( + const GetQueryTraceRecord& record, + std::unique_ptr* result) { + if (result != nullptr) { + result->reset(nullptr); + } auto it = cf_map_.find(record.GetColumnFamilyID()); if (it == cf_map_.end()) { return Status::Corruption("Invalid Column Family ID."); } + uint64_t start = clock_->NowMicros(); + std::string value; Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value); - // Treat not found as ok and return other errors. - return s.IsNotFound() ? Status::OK() : s; + uint64_t end = clock_->NowMicros(); + + // Treat not found as ok, return other errors. + if (!s.ok() && !s.IsNotFound()) { + return s; + } + + if (result != nullptr) { + // Report the actual opetation status in TraceExecutionResult + result->reset(new SingleValueTraceExecutionResult( + std::move(s), std::move(value), start, end, record.GetTraceType())); + } + return Status::OK(); } Status TraceExecutionHandler::Handle( - const IteratorSeekQueryTraceRecord& record) { + const IteratorSeekQueryTraceRecord& record, + std::unique_ptr* result) { + if (result != nullptr) { + result->reset(nullptr); + } auto it = cf_map_.find(record.GetColumnFamilyID()); if (it == cf_map_.end()) { return Status::Corruption("Invalid Column Family ID."); @@ -55,6 +95,8 @@ Status TraceExecutionHandler::Handle( Iterator* single_iter = db_->NewIterator(read_opts_, it->second); + uint64_t start = clock_->NowMicros(); + switch (record.GetSeekType()) { case IteratorSeekQueryTraceRecord::kSeekForPrev: { single_iter->SeekForPrev(record.GetKey()); @@ -65,12 +107,26 @@ Status TraceExecutionHandler::Handle( break; } } + + uint64_t end = clock_->NowMicros(); + Status s = single_iter->status(); delete single_iter; + + if (s.ok() && result != nullptr) { + result->reset(new StatusOnlyTraceExecutionResult(s, start, end, + record.GetTraceType())); + } + return s; } -Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) { +Status TraceExecutionHandler::Handle( + const MultiGetQueryTraceRecord& record, + std::unique_ptr* result) { + if (result != nullptr) { + result->reset(nullptr); + } std::vector handles; handles.reserve(record.GetColumnFamilyIDs().size()); for (uint32_t cf_id : record.GetColumnFamilyIDs()) { @@ -90,15 +146,26 @@ Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) { return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch."); } + uint64_t start = clock_->NowMicros(); + std::vector values; std::vector ss = db_->MultiGet(read_opts_, handles, keys, &values); + uint64_t end = clock_->NowMicros(); + // Treat not found as ok, return other errors. - for (Status s : ss) { + for (const Status& s : ss) { if (!s.ok() && !s.IsNotFound()) { return s; } } + + if (result != nullptr) { + // Report the actual opetation status in TraceExecutionResult + result->reset(new MultiValuesTraceExecutionResult( + std::move(ss), std::move(values), start, end, record.GetTraceType())); + } + return Status::OK(); } diff --git a/trace_replay/trace_record_handler.h b/trace_replay/trace_record_handler.h index fbc5a839f3..0d9f1d629f 100644 --- a/trace_replay/trace_record_handler.h +++ b/trace_replay/trace_record_handler.h @@ -5,12 +5,14 @@ #pragma once +#include #include #include #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/status.h" +#include "rocksdb/system_clock.h" #include "rocksdb/trace_record.h" namespace ROCKSDB_NAMESPACE { @@ -22,16 +24,21 @@ class TraceExecutionHandler : public TraceRecord::Handler { const std::vector& handles); virtual ~TraceExecutionHandler() override; - virtual Status Handle(const WriteQueryTraceRecord& record) override; - virtual Status Handle(const GetQueryTraceRecord& record) override; - virtual Status Handle(const IteratorSeekQueryTraceRecord& record) override; - virtual Status Handle(const MultiGetQueryTraceRecord& record) override; + virtual Status Handle(const WriteQueryTraceRecord& record, + std::unique_ptr* result) override; + virtual Status Handle(const GetQueryTraceRecord& record, + std::unique_ptr* result) override; + virtual Status Handle(const IteratorSeekQueryTraceRecord& record, + std::unique_ptr* result) override; + virtual Status Handle(const MultiGetQueryTraceRecord& record, + std::unique_ptr* result) override; private: DB* db_; std::unordered_map cf_map_; WriteOptions write_opts_; ReadOptions read_opts_; + std::shared_ptr clock_; }; // To do: Handler for trace_analyzer. diff --git a/trace_replay/trace_record_result.cc b/trace_replay/trace_record_result.cc new file mode 100644 index 0000000000..b22b57e432 --- /dev/null +++ b/trace_replay/trace_record_result.cc @@ -0,0 +1,106 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/trace_record_result.h" + +namespace ROCKSDB_NAMESPACE { + +// TraceRecordResult +TraceRecordResult::TraceRecordResult(TraceType trace_type) + : trace_type_(trace_type) {} + +TraceType TraceRecordResult::GetTraceType() const { return trace_type_; } + +// TraceExecutionResult +TraceExecutionResult::TraceExecutionResult(uint64_t start_timestamp, + uint64_t end_timestamp, + TraceType trace_type) + : TraceRecordResult(trace_type), + ts_start_(start_timestamp), + ts_end_(end_timestamp) { + assert(ts_start_ <= ts_end_); +} + +uint64_t TraceExecutionResult::GetStartTimestamp() const { return ts_start_; } + +uint64_t TraceExecutionResult::GetEndTimestamp() const { return ts_end_; } + +// StatusOnlyTraceExecutionResult +StatusOnlyTraceExecutionResult::StatusOnlyTraceExecutionResult( + Status status, uint64_t start_timestamp, uint64_t end_timestamp, + TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + status_(std::move(status)) {} + +const Status& StatusOnlyTraceExecutionResult::GetStatus() const { + return status_; +} + +Status StatusOnlyTraceExecutionResult::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + +// SingleValueTraceExecutionResult +SingleValueTraceExecutionResult::SingleValueTraceExecutionResult( + Status status, const std::string& value, uint64_t start_timestamp, + uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + status_(std::move(status)), + value_(value) {} + +SingleValueTraceExecutionResult::SingleValueTraceExecutionResult( + Status status, std::string&& value, uint64_t start_timestamp, + uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + status_(std::move(status)), + value_(std::move(value)) {} + +SingleValueTraceExecutionResult::~SingleValueTraceExecutionResult() { + value_.clear(); +} + +const Status& SingleValueTraceExecutionResult::GetStatus() const { + return status_; +} + +const std::string& SingleValueTraceExecutionResult::GetValue() const { + return value_; +} + +Status SingleValueTraceExecutionResult::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + +// MultiValuesTraceExecutionResult +MultiValuesTraceExecutionResult::MultiValuesTraceExecutionResult( + std::vector multi_status, std::vector values, + uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + multi_status_(std::move(multi_status)), + values_(std::move(values)) {} + +MultiValuesTraceExecutionResult::~MultiValuesTraceExecutionResult() { + multi_status_.clear(); + values_.clear(); +} + +const std::vector& MultiValuesTraceExecutionResult::GetMultiStatus() + const { + return multi_status_; +} + +const std::vector& MultiValuesTraceExecutionResult::GetValues() + const { + return values_; +} + +Status MultiValuesTraceExecutionResult::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/trace_replay/trace_replay.cc b/trace_replay/trace_replay.cc index b3063c1431..af2e76500d 100644 --- a/trace_replay/trace_replay.cc +++ b/trace_replay/trace_replay.cc @@ -131,6 +131,10 @@ Status TracerHelper::DecodeWriteRecord(Trace* trace, int trace_file_version, assert(trace != nullptr); assert(trace->type == kTraceWrite); + if (record != nullptr) { + record->reset(nullptr); + } + PinnableSlice rep; if (trace_file_version < 2) { rep.PinSelf(trace->payload); @@ -168,6 +172,10 @@ Status TracerHelper::DecodeGetRecord(Trace* trace, int trace_file_version, assert(trace != nullptr); assert(trace->type == kTraceGet); + if (record != nullptr) { + record->reset(nullptr); + } + uint32_t cf_id = 0; Slice get_key; @@ -211,6 +219,10 @@ Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version, assert(trace->type == kTraceIteratorSeek || trace->type == kTraceIteratorSeekForPrev); + if (record != nullptr) { + record->reset(nullptr); + } + uint32_t cf_id = 0; Slice iter_key; @@ -265,6 +277,11 @@ Status TracerHelper::DecodeMultiGetRecord( std::unique_ptr* record) { assert(trace != nullptr); assert(trace->type == kTraceMultiGet); + + if (record != nullptr) { + record->reset(nullptr); + } + if (trace_file_version < 2) { return Status::Corruption("MultiGet is not supported."); } diff --git a/utilities/trace/replayer_impl.cc b/utilities/trace/replayer_impl.cc index 2462db468f..c98155d534 100644 --- a/utilities/trace/replayer_impl.cc +++ b/utilities/trace/replayer_impl.cc @@ -10,13 +10,9 @@ #include #include -#include "rocksdb/db.h" -#include "rocksdb/env.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" -#include "rocksdb/status.h" #include "rocksdb/system_clock.h" -#include "rocksdb/trace_reader_writer.h" #include "util/threadpool_imp.h" namespace ROCKSDB_NAMESPACE { @@ -77,17 +73,15 @@ Status ReplayerImpl::Next(std::unique_ptr* record) { return DecodeTraceRecord(&trace, trace_file_version_, record); } -Status ReplayerImpl::Execute(const std::unique_ptr& record) { - return record->Accept(exec_handler_.get()); +Status ReplayerImpl::Execute(const std::unique_ptr& record, + std::unique_ptr* result) { + return record->Accept(exec_handler_.get(), result); } -Status ReplayerImpl::Execute(std::unique_ptr&& record) { - Status s = record->Accept(exec_handler_.get()); - record.reset(); - return s; -} - -Status ReplayerImpl::Replay(const ReplayOptions& options) { +Status ReplayerImpl::Replay( + const ReplayOptions& options, + const std::function&&)>& + result_callback) { if (options.fast_forward <= 0.0) { return Status::InvalidArgument("Wrong fast forward speed!"); } @@ -124,19 +118,34 @@ Status ReplayerImpl::Replay(const ReplayOptions& options) { // In single-threaded replay, decode first then sleep. std::unique_ptr record; s = DecodeTraceRecord(&trace, trace_file_version_, &record); - // Skip unsupported traces, stop for other errors. - if (s.IsNotSupported()) { - continue; - } else if (!s.ok()) { + if (!s.ok() && !s.IsNotSupported()) { break; } - std::this_thread::sleep_until( + std::chrono::system_clock::time_point sleep_to = replay_epoch + std::chrono::microseconds(static_cast(std::llround( - 1.0 * (trace.ts - header_ts_) / options.fast_forward)))); + 1.0 * (trace.ts - header_ts_) / options.fast_forward))); + if (sleep_to > std::chrono::system_clock::now()) { + std::this_thread::sleep_until(sleep_to); + } - s = Execute(std::move(record)); + // Skip unsupported traces, stop for other errors. + if (s.IsNotSupported()) { + if (result_callback != nullptr) { + result_callback(s, nullptr); + } + s = Status::OK(); + continue; + } + + if (result_callback == nullptr) { + s = Execute(record, nullptr); + } else { + std::unique_ptr res; + s = Execute(record, &res); + result_callback(s, std::move(res)); + } } } else { // Multi-threaded replay. @@ -181,10 +190,13 @@ Status ReplayerImpl::Replay(const ReplayOptions& options) { // In multi-threaded replay, sleep first thatn start decoding and // execution in a thread. - std::this_thread::sleep_until( + std::chrono::system_clock::time_point sleep_to = replay_epoch + std::chrono::microseconds(static_cast(std::llround( - 1.0 * (trace.ts - header_ts_) / options.fast_forward)))); + 1.0 * (trace.ts - header_ts_) / options.fast_forward))); + if (sleep_to > std::chrono::system_clock::now()) { + std::this_thread::sleep_until(sleep_to); + } if (trace_type == kTraceWrite || trace_type == kTraceGet || trace_type == kTraceIteratorSeek || @@ -195,10 +207,16 @@ Status ReplayerImpl::Replay(const ReplayOptions& options) { ra->handler = exec_handler_.get(); ra->trace_file_version = trace_file_version_; ra->error_cb = error_cb; + ra->result_cb = result_callback; thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(), nullptr, nullptr); + } else { + // Skip unsupported traces. + if (result_callback != nullptr) { + result_callback(Status::NotSupported("Unsupported trace type."), + nullptr); + } } - // Skip unsupported traces. } thread_pool.WaitForJobsAndJoinAllThreads(); @@ -293,13 +311,26 @@ void ReplayerImpl::BackgroundWork(void* arg) { std::unique_ptr record; Status s = DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record); - if (s.ok()) { - s = record->Accept(ra->handler); - record.reset(); + if (!s.ok()) { + // Stop the replay + if (ra->error_cb != nullptr) { + ra->error_cb(s, ra->trace_entry.ts); + } + // Report the result + if (ra->result_cb != nullptr) { + ra->result_cb(s, nullptr); + } + return; } - if (!s.ok() && ra->error_cb) { - ra->error_cb(s, ra->trace_entry.ts); + + if (ra->result_cb == nullptr) { + s = record->Accept(ra->handler, nullptr); + } else { + std::unique_ptr res; + s = record->Accept(ra->handler, &res); + ra->result_cb(s, std::move(res)); } + record.reset(); } } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/trace/replayer_impl.h b/utilities/trace/replayer_impl.h index 6cf4455e95..9cf1829606 100644 --- a/utilities/trace/replayer_impl.h +++ b/utilities/trace/replayer_impl.h @@ -12,22 +12,17 @@ #include #include -#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/status.h" +#include "rocksdb/trace_reader_writer.h" #include "rocksdb/trace_record.h" +#include "rocksdb/trace_record_result.h" #include "rocksdb/utilities/replayer.h" #include "trace_replay/trace_replay.h" namespace ROCKSDB_NAMESPACE { -class ColumnFamilyHandle; -class DB; -class Env; -class TraceReader; -class TraceRecord; -class Status; - -struct ReplayOptions; - class ReplayerImpl : public Replayer { public: ReplayerImpl(DB* db, const std::vector& handles, @@ -41,11 +36,14 @@ class ReplayerImpl : public Replayer { Status Next(std::unique_ptr* record) override; using Replayer::Execute; - Status Execute(const std::unique_ptr& record) override; - Status Execute(std::unique_ptr&& record) override; + Status Execute(const std::unique_ptr& record, + std::unique_ptr* result) override; using Replayer::Replay; - Status Replay(const ReplayOptions& options) override; + Status Replay( + const ReplayOptions& options, + const std::function&&)>& + result_callback) override; using Replayer::GetHeaderTimestamp; uint64_t GetHeaderTimestamp() const override; @@ -84,6 +82,9 @@ struct ReplayerWorkerArg { // Callback function to report the error status and the timestamp of the // TraceRecord. std::function error_cb; + // Callback function to report the trace execution status and operation + // execution status/result(s). + std::function&&)> result_cb; }; } // namespace ROCKSDB_NAMESPACE