mirror of https://github.com/facebook/rocksdb.git
Add support in SstFileReader to get a raw table iterator (#12385)
Summary: This PR adds support to programmatically iterate a raw table file with an iterator returned by `SstFileReader::NewTableIterator`. For third party tools to use to observe SST files created by RocksDB. The original feature request was from this merge request: https://github.com/facebook/rocksdb/pull/12370 Since keys returned by raw table iterators are internal keys, this PR also adds a struct `ParsedEntryInfo` and util method `ParseEntry` to support user to parse internal key. `GetInternalKeyForSeek`, and `GetInternalKeyForSeekForPrev` to support users to create internal keys for seek operations with this raw table iterator. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12385 Test Plan: Added unit tests Reviewed By: cbi42 Differential Revision: D55662855 Pulled By: jowlyzhang fbshipit-source-id: 0716a173ee95924fbd4e1f9b6cccf06525c40049
This commit is contained in:
parent
8e6e8957fb
commit
74d419be4d
|
@ -954,6 +954,7 @@ set(SOURCES
|
||||||
utilities/transactions/write_prepared_txn_db.cc
|
utilities/transactions/write_prepared_txn_db.cc
|
||||||
utilities/transactions/write_unprepared_txn.cc
|
utilities/transactions/write_unprepared_txn.cc
|
||||||
utilities/transactions/write_unprepared_txn_db.cc
|
utilities/transactions/write_unprepared_txn_db.cc
|
||||||
|
utilities/types_util.cc
|
||||||
utilities/ttl/db_ttl_impl.cc
|
utilities/ttl/db_ttl_impl.cc
|
||||||
utilities/wal_filter.cc
|
utilities/wal_filter.cc
|
||||||
utilities/write_batch_with_index/write_batch_with_index.cc
|
utilities/write_batch_with_index/write_batch_with_index.cc
|
||||||
|
@ -1484,6 +1485,7 @@ if(WITH_TESTS)
|
||||||
utilities/transactions/lock/range/range_locking_test.cc
|
utilities/transactions/lock/range/range_locking_test.cc
|
||||||
utilities/transactions/timestamped_snapshot_test.cc
|
utilities/transactions/timestamped_snapshot_test.cc
|
||||||
utilities/ttl/ttl_test.cc
|
utilities/ttl/ttl_test.cc
|
||||||
|
utilities/types_util_test.cc
|
||||||
utilities/util_merge_operators_test.cc
|
utilities/util_merge_operators_test.cc
|
||||||
utilities/write_batch_with_index/write_batch_with_index_test.cc
|
utilities/write_batch_with_index/write_batch_with_index_test.cc
|
||||||
${PLUGIN_TESTS}
|
${PLUGIN_TESTS}
|
||||||
|
|
3
Makefile
3
Makefile
|
@ -1610,6 +1610,9 @@ object_registry_test: $(OBJ_DIR)/utilities/object_registry_test.o $(TEST_LIBRARY
|
||||||
ttl_test: $(OBJ_DIR)/utilities/ttl/ttl_test.o $(TEST_LIBRARY) $(LIBRARY)
|
ttl_test: $(OBJ_DIR)/utilities/ttl/ttl_test.o $(TEST_LIBRARY) $(LIBRARY)
|
||||||
$(AM_LINK)
|
$(AM_LINK)
|
||||||
|
|
||||||
|
types_util_test: $(OBJ_DIR)/utilities/types_util_test.o $(TEST_LIBRARY) $(LIBRARY)
|
||||||
|
$(AM_LINK)
|
||||||
|
|
||||||
write_batch_with_index_test: $(OBJ_DIR)/utilities/write_batch_with_index/write_batch_with_index_test.o $(TEST_LIBRARY) $(LIBRARY)
|
write_batch_with_index_test: $(OBJ_DIR)/utilities/write_batch_with_index/write_batch_with_index_test.o $(TEST_LIBRARY) $(LIBRARY)
|
||||||
$(AM_LINK)
|
$(AM_LINK)
|
||||||
|
|
||||||
|
|
7
TARGETS
7
TARGETS
|
@ -346,6 +346,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
|
||||||
"utilities/transactions/write_unprepared_txn.cc",
|
"utilities/transactions/write_unprepared_txn.cc",
|
||||||
"utilities/transactions/write_unprepared_txn_db.cc",
|
"utilities/transactions/write_unprepared_txn_db.cc",
|
||||||
"utilities/ttl/db_ttl_impl.cc",
|
"utilities/ttl/db_ttl_impl.cc",
|
||||||
|
"utilities/types_util.cc",
|
||||||
"utilities/wal_filter.cc",
|
"utilities/wal_filter.cc",
|
||||||
"utilities/write_batch_with_index/write_batch_with_index.cc",
|
"utilities/write_batch_with_index/write_batch_with_index.cc",
|
||||||
"utilities/write_batch_with_index/write_batch_with_index_internal.cc",
|
"utilities/write_batch_with_index/write_batch_with_index_internal.cc",
|
||||||
|
@ -5527,6 +5528,12 @@ cpp_unittest_wrapper(name="ttl_test",
|
||||||
extra_compiler_flags=[])
|
extra_compiler_flags=[])
|
||||||
|
|
||||||
|
|
||||||
|
cpp_unittest_wrapper(name="types_util_test",
|
||||||
|
srcs=["utilities/types_util_test.cc"],
|
||||||
|
deps=[":rocksdb_test_lib"],
|
||||||
|
extra_compiler_flags=[])
|
||||||
|
|
||||||
|
|
||||||
cpp_unittest_wrapper(name="udt_util_test",
|
cpp_unittest_wrapper(name="udt_util_test",
|
||||||
srcs=["util/udt_util_test.cc"],
|
srcs=["util/udt_util_test.cc"],
|
||||||
deps=[":rocksdb_test_lib"],
|
deps=[":rocksdb_test_lib"],
|
||||||
|
|
|
@ -120,6 +120,28 @@ class Comparator : public Customizable, public CompareInterface {
|
||||||
|
|
||||||
inline size_t timestamp_size() const { return timestamp_size_; }
|
inline size_t timestamp_size() const { return timestamp_size_; }
|
||||||
|
|
||||||
|
// Return what this Comparator considers as the maximum timestamp.
|
||||||
|
// The default implementation only works for when `timestamp_size_` is 0,
|
||||||
|
// subclasses for which this is not the case needs to override this function.
|
||||||
|
virtual Slice GetMaxTimestamp() const {
|
||||||
|
if (timestamp_size_ == 0) {
|
||||||
|
return Slice();
|
||||||
|
}
|
||||||
|
assert(false);
|
||||||
|
return Slice();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return what this Comparator considers as the min timestamp.
|
||||||
|
// The default implementation only works for when `timestamp_size_` is 0,
|
||||||
|
// subclasses for which this is not the case needs to override this function.
|
||||||
|
virtual Slice GetMinTimestamp() const {
|
||||||
|
if (timestamp_size_ == 0) {
|
||||||
|
return Slice();
|
||||||
|
}
|
||||||
|
assert(false);
|
||||||
|
return Slice();
|
||||||
|
}
|
||||||
|
|
||||||
int CompareWithoutTimestamp(const Slice& a, const Slice& b) const {
|
int CompareWithoutTimestamp(const Slice& a, const Slice& b) const {
|
||||||
return CompareWithoutTimestamp(a, /*a_has_ts=*/true, b, /*b_has_ts=*/true);
|
return CompareWithoutTimestamp(a, /*a_has_ts=*/true, b, /*b_has_ts=*/true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,11 +24,21 @@ class SstFileReader {
|
||||||
// Prepares to read from the file located at "file_path".
|
// Prepares to read from the file located at "file_path".
|
||||||
Status Open(const std::string& file_path);
|
Status Open(const std::string& file_path);
|
||||||
|
|
||||||
// Returns a new iterator over the table contents.
|
// Returns a new iterator over the table contents as a DB iterator, a.k.a
|
||||||
|
// a `DBIter` that iterates logically visible entries, for example, a delete
|
||||||
|
// entry is not logically visible.
|
||||||
// Most read options provide the same control as we read from DB.
|
// Most read options provide the same control as we read from DB.
|
||||||
// If "snapshot" is nullptr, the iterator returns only the latest keys.
|
// If "snapshot" is nullptr, the iterator returns only the latest keys.
|
||||||
Iterator* NewIterator(const ReadOptions& options);
|
Iterator* NewIterator(const ReadOptions& options);
|
||||||
|
|
||||||
|
// Returns a new iterator over the table contents as a raw table iterator,
|
||||||
|
// a.k.a a `TableIterator`that iterates all point data entries in the table
|
||||||
|
// including logically invisible entries like delete entries.
|
||||||
|
// This API is intended to provide a programmatic way to observe SST files
|
||||||
|
// created by a DB, to be used by third party tools. DB optimization
|
||||||
|
// capabilities like filling cache, read ahead are disabled.
|
||||||
|
std::unique_ptr<Iterator> NewTableIterator();
|
||||||
|
|
||||||
std::shared_ptr<const TableProperties> GetTableProperties() const;
|
std::shared_ptr<const TableProperties> GetTableProperties() const;
|
||||||
|
|
||||||
// Verifies whether there is corruption in this table.
|
// Verifies whether there is corruption in this table.
|
||||||
|
|
|
@ -70,6 +70,17 @@ enum EntryType {
|
||||||
kEntryOther,
|
kEntryOther,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Structured user-oriented representation of an internal key. It includes user
|
||||||
|
// key, sequence number, and type.
|
||||||
|
// If user-defined timestamp is enabled, `timestamp` contains the user-defined
|
||||||
|
// timestamp, it's otherwise an empty Slice.
|
||||||
|
struct ParsedEntryInfo {
|
||||||
|
Slice user_key;
|
||||||
|
Slice timestamp;
|
||||||
|
SequenceNumber sequence;
|
||||||
|
EntryType type;
|
||||||
|
};
|
||||||
|
|
||||||
enum class WriteStallCause {
|
enum class WriteStallCause {
|
||||||
// Beginning of CF-scope write stall causes
|
// Beginning of CF-scope write stall causes
|
||||||
//
|
//
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||||
|
//
|
||||||
|
// This source code is licensed under both the GPLv2 (found in the
|
||||||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "rocksdb/comparator.h"
|
||||||
|
#include "rocksdb/slice.h"
|
||||||
|
#include "rocksdb/status.h"
|
||||||
|
#include "rocksdb/types.h"
|
||||||
|
|
||||||
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
|
||||||
|
// Given a user key, creates the internal key used for `Seek` operation for a
|
||||||
|
// raw table iterator. The internal key is stored in `buf`.
|
||||||
|
// `comparator` should be the same as the `Options.comparator` used to create
|
||||||
|
// the column family or the `SstFileWriter`.
|
||||||
|
Status GetInternalKeyForSeek(const Slice& user_key,
|
||||||
|
const Comparator* comparator, std::string* buf);
|
||||||
|
|
||||||
|
// Given a user key, creates the internal key used for `SeekForPrev` operation
|
||||||
|
// for a raw table iterator. The internal key is stored in `buf`.
|
||||||
|
// `comparator`: see doc for `GetInternalKeyForSeek`.
|
||||||
|
Status GetInternalKeyForSeekForPrev(const Slice& user_key,
|
||||||
|
const Comparator* comparator,
|
||||||
|
std::string* buf);
|
||||||
|
|
||||||
|
// Util method that takes an internal key and parse it to get `ParsedEntryInfo`.
|
||||||
|
// Such an internal key usually comes from a table iterator.
|
||||||
|
// `comparator`: see doc for `GetInternalKeyForSeek`.
|
||||||
|
Status ParseEntry(const Slice& internal_key, const Comparator* comparator,
|
||||||
|
ParsedEntryInfo* parsed_entry);
|
||||||
|
|
||||||
|
} // namespace ROCKSDB_NAMESPACE
|
2
src.mk
2
src.mk
|
@ -320,6 +320,7 @@ LIB_SOURCES = \
|
||||||
utilities/transactions/write_unprepared_txn.cc \
|
utilities/transactions/write_unprepared_txn.cc \
|
||||||
utilities/transactions/write_unprepared_txn_db.cc \
|
utilities/transactions/write_unprepared_txn_db.cc \
|
||||||
utilities/ttl/db_ttl_impl.cc \
|
utilities/ttl/db_ttl_impl.cc \
|
||||||
|
utilities/types_util.cc \
|
||||||
utilities/wal_filter.cc \
|
utilities/wal_filter.cc \
|
||||||
utilities/write_batch_with_index/write_batch_with_index.cc \
|
utilities/write_batch_with_index/write_batch_with_index.cc \
|
||||||
utilities/write_batch_with_index/write_batch_with_index_internal.cc \
|
utilities/write_batch_with_index/write_batch_with_index_internal.cc \
|
||||||
|
@ -633,6 +634,7 @@ TEST_MAIN_SOURCES = \
|
||||||
utilities/transactions/write_committed_transaction_ts_test.cc \
|
utilities/transactions/write_committed_transaction_ts_test.cc \
|
||||||
utilities/transactions/timestamped_snapshot_test.cc \
|
utilities/transactions/timestamped_snapshot_test.cc \
|
||||||
utilities/ttl/ttl_test.cc \
|
utilities/ttl/ttl_test.cc \
|
||||||
|
utilities/types_util_test.cc \
|
||||||
utilities/util_merge_operators_test.cc \
|
utilities/util_merge_operators_test.cc \
|
||||||
utilities/write_batch_with_index/write_batch_with_index_test.cc \
|
utilities/write_batch_with_index/write_batch_with_index_test.cc \
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
#include "rocksdb/file_system.h"
|
#include "rocksdb/file_system.h"
|
||||||
#include "table/get_context.h"
|
#include "table/get_context.h"
|
||||||
#include "table/table_builder.h"
|
#include "table/table_builder.h"
|
||||||
|
#include "table/table_iterator.h"
|
||||||
#include "table/table_reader.h"
|
#include "table/table_reader.h"
|
||||||
|
|
||||||
namespace ROCKSDB_NAMESPACE {
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
@ -24,6 +25,9 @@ struct SstFileReader::Rep {
|
||||||
EnvOptions soptions;
|
EnvOptions soptions;
|
||||||
ImmutableOptions ioptions;
|
ImmutableOptions ioptions;
|
||||||
MutableCFOptions moptions;
|
MutableCFOptions moptions;
|
||||||
|
// Keep a member variable for this, since `NewIterator()` uses a const
|
||||||
|
// reference of `ReadOptions`.
|
||||||
|
ReadOptions roptions_for_table_iter;
|
||||||
|
|
||||||
std::unique_ptr<TableReader> table_reader;
|
std::unique_ptr<TableReader> table_reader;
|
||||||
|
|
||||||
|
@ -31,7 +35,10 @@ struct SstFileReader::Rep {
|
||||||
: options(opts),
|
: options(opts),
|
||||||
soptions(options),
|
soptions(options),
|
||||||
ioptions(options),
|
ioptions(options),
|
||||||
moptions(ColumnFamilyOptions(options)) {}
|
moptions(ColumnFamilyOptions(options)) {
|
||||||
|
roptions_for_table_iter =
|
||||||
|
ReadOptions(/*_verify_checksums=*/true, /*_fill_cache=*/false);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
SstFileReader::SstFileReader(const Options& options) : rep_(new Rep(options)) {}
|
SstFileReader::SstFileReader(const Options& options) : rep_(new Rep(options)) {}
|
||||||
|
@ -94,6 +101,21 @@ Iterator* SstFileReader::NewIterator(const ReadOptions& roptions) {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<Iterator> SstFileReader::NewTableIterator() {
|
||||||
|
auto r = rep_.get();
|
||||||
|
InternalIterator* internal_iter = r->table_reader->NewIterator(
|
||||||
|
r->roptions_for_table_iter, r->moptions.prefix_extractor.get(),
|
||||||
|
/*arena*/ nullptr, false /* skip_filters */,
|
||||||
|
TableReaderCaller::kSSTFileReader);
|
||||||
|
assert(internal_iter);
|
||||||
|
if (internal_iter == nullptr) {
|
||||||
|
// Do not attempt to create a TableIterator if we cannot get a valid
|
||||||
|
// InternalIterator.
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
return std::make_unique<TableIterator>(internal_iter);
|
||||||
|
}
|
||||||
|
|
||||||
std::shared_ptr<const TableProperties> SstFileReader::GetTableProperties()
|
std::shared_ptr<const TableProperties> SstFileReader::GetTableProperties()
|
||||||
const {
|
const {
|
||||||
return rep_->table_reader->GetTableProperties();
|
return rep_->table_reader->GetTableProperties();
|
||||||
|
|
|
@ -8,10 +8,12 @@
|
||||||
|
|
||||||
#include <cinttypes>
|
#include <cinttypes>
|
||||||
|
|
||||||
|
#include "db/db_test_util.h"
|
||||||
#include "port/stack_trace.h"
|
#include "port/stack_trace.h"
|
||||||
#include "rocksdb/convenience.h"
|
#include "rocksdb/convenience.h"
|
||||||
#include "rocksdb/db.h"
|
#include "rocksdb/db.h"
|
||||||
#include "rocksdb/sst_file_writer.h"
|
#include "rocksdb/sst_file_writer.h"
|
||||||
|
#include "rocksdb/utilities/types_util.h"
|
||||||
#include "table/sst_file_writer_collectors.h"
|
#include "table/sst_file_writer_collectors.h"
|
||||||
#include "test_util/testharness.h"
|
#include "test_util/testharness.h"
|
||||||
#include "test_util/testutil.h"
|
#include "test_util/testutil.h"
|
||||||
|
@ -578,6 +580,195 @@ TEST_F(SstFileReaderTest, VerifyNumEntriesCorruption) {
|
||||||
ASSERT_TRUE(std::strstr(oss.str().c_str(), s.getState()));
|
ASSERT_TRUE(std::strstr(oss.str().c_str(), s.getState()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class SstFileReaderTableIteratorTest : public DBTestBase {
|
||||||
|
public:
|
||||||
|
SstFileReaderTableIteratorTest()
|
||||||
|
: DBTestBase("sst_file_reader_table_iterator_test",
|
||||||
|
/*env_do_fsync=*/false) {}
|
||||||
|
|
||||||
|
void VerifyTableEntry(Iterator* iter, const std::string& user_key,
|
||||||
|
ValueType value_type,
|
||||||
|
std::optional<std::string> expected_value,
|
||||||
|
bool backward_iteration = false) {
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_TRUE(iter->status().ok());
|
||||||
|
ParsedInternalKey pikey;
|
||||||
|
ASSERT_OK(ParseInternalKey(iter->key(), &pikey, /*log_err_key=*/false));
|
||||||
|
ASSERT_EQ(pikey.user_key, user_key);
|
||||||
|
ASSERT_EQ(pikey.type, value_type);
|
||||||
|
if (expected_value.has_value()) {
|
||||||
|
ASSERT_EQ(iter->value(), expected_value.value());
|
||||||
|
}
|
||||||
|
if (!backward_iteration) {
|
||||||
|
iter->Next();
|
||||||
|
} else {
|
||||||
|
iter->Prev();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(SstFileReaderTableIteratorTest, Basic) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
const Comparator* ucmp = BytewiseComparator();
|
||||||
|
options.comparator = ucmp;
|
||||||
|
options.disable_auto_compactions = true;
|
||||||
|
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
// Create a L0 sst file with 4 entries, two for each user key.
|
||||||
|
// The file should have these entries in ascending internal key order:
|
||||||
|
// 'bar, seq: 4, type: kTypeValue => val2'
|
||||||
|
// 'bar, seq: 3, type: kTypeDeletion'
|
||||||
|
// 'foo, seq: 2, type: kTypeDeletion'
|
||||||
|
// 'foo, seq: 1, type: kTypeValue => val1'
|
||||||
|
ASSERT_OK(Put("foo", "val1"));
|
||||||
|
const Snapshot* snapshot1 = dbfull()->GetSnapshot();
|
||||||
|
ASSERT_OK(Delete("foo"));
|
||||||
|
ASSERT_OK(Delete("bar"));
|
||||||
|
const Snapshot* snapshot2 = dbfull()->GetSnapshot();
|
||||||
|
ASSERT_OK(Put("bar", "val2"));
|
||||||
|
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
|
||||||
|
std::vector<LiveFileMetaData> files;
|
||||||
|
dbfull()->GetLiveFilesMetaData(&files);
|
||||||
|
ASSERT_TRUE(files.size() == 1);
|
||||||
|
ASSERT_TRUE(files[0].level == 0);
|
||||||
|
std::string file_name = files[0].directory + "/" + files[0].relative_filename;
|
||||||
|
|
||||||
|
SstFileReader reader(options);
|
||||||
|
ASSERT_OK(reader.Open(file_name));
|
||||||
|
ASSERT_OK(reader.VerifyChecksum());
|
||||||
|
|
||||||
|
// When iterating the file as a DB iterator, only one data entry for "bar" is
|
||||||
|
// visible.
|
||||||
|
std::unique_ptr<Iterator> db_iter(reader.NewIterator(ReadOptions()));
|
||||||
|
db_iter->SeekToFirst();
|
||||||
|
ASSERT_TRUE(db_iter->Valid());
|
||||||
|
ASSERT_EQ(db_iter->key(), "bar");
|
||||||
|
ASSERT_EQ(db_iter->value(), "val2");
|
||||||
|
db_iter->Next();
|
||||||
|
ASSERT_FALSE(db_iter->Valid());
|
||||||
|
db_iter.reset();
|
||||||
|
|
||||||
|
// When iterating the file with a raw table iterator, all the data entries are
|
||||||
|
// surfaced in ascending internal key order.
|
||||||
|
std::unique_ptr<Iterator> table_iter = reader.NewTableIterator();
|
||||||
|
|
||||||
|
table_iter->SeekToFirst();
|
||||||
|
VerifyTableEntry(table_iter.get(), "bar", kTypeValue, "val2");
|
||||||
|
VerifyTableEntry(table_iter.get(), "bar", kTypeDeletion, std::nullopt);
|
||||||
|
VerifyTableEntry(table_iter.get(), "foo", kTypeDeletion, std::nullopt);
|
||||||
|
VerifyTableEntry(table_iter.get(), "foo", kTypeValue, "val1");
|
||||||
|
ASSERT_FALSE(table_iter->Valid());
|
||||||
|
|
||||||
|
std::string seek_key_buf;
|
||||||
|
ASSERT_OK(GetInternalKeyForSeek("foo", ucmp, &seek_key_buf));
|
||||||
|
Slice seek_target = seek_key_buf;
|
||||||
|
table_iter->Seek(seek_target);
|
||||||
|
VerifyTableEntry(table_iter.get(), "foo", kTypeDeletion, std::nullopt);
|
||||||
|
VerifyTableEntry(table_iter.get(), "foo", kTypeValue, "val1");
|
||||||
|
ASSERT_FALSE(table_iter->Valid());
|
||||||
|
|
||||||
|
ASSERT_OK(GetInternalKeyForSeekForPrev("bar", ucmp, &seek_key_buf));
|
||||||
|
Slice seek_for_prev_target = seek_key_buf;
|
||||||
|
table_iter->SeekForPrev(seek_for_prev_target);
|
||||||
|
VerifyTableEntry(table_iter.get(), "bar", kTypeDeletion, std::nullopt,
|
||||||
|
/*backward_iteration=*/true);
|
||||||
|
VerifyTableEntry(table_iter.get(), "bar", kTypeValue, "val2",
|
||||||
|
/*backward_iteration=*/true);
|
||||||
|
ASSERT_FALSE(table_iter->Valid());
|
||||||
|
|
||||||
|
dbfull()->ReleaseSnapshot(snapshot1);
|
||||||
|
dbfull()->ReleaseSnapshot(snapshot2);
|
||||||
|
Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(SstFileReaderTableIteratorTest, UserDefinedTimestampsEnabled) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
const Comparator* ucmp = test::BytewiseComparatorWithU64TsWrapper();
|
||||||
|
options.comparator = ucmp;
|
||||||
|
options.disable_auto_compactions = true;
|
||||||
|
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
// Create a L0 sst file with 4 entries, two for each user key.
|
||||||
|
// The file should have these entries in ascending internal key order:
|
||||||
|
// 'bar, ts=3, seq: 4, type: kTypeValue => val2'
|
||||||
|
// 'bar, ts=2, seq: 3, type: kTypeDeletionWithTimestamp'
|
||||||
|
// 'foo, ts=4, seq: 2, type: kTypeDeletionWithTimestamp'
|
||||||
|
// 'foo, ts=3, seq: 1, type: kTypeValue => val1'
|
||||||
|
WriteOptions wopt;
|
||||||
|
ColumnFamilyHandle* cfd = db_->DefaultColumnFamily();
|
||||||
|
ASSERT_OK(db_->Put(wopt, cfd, "foo", EncodeAsUint64(3), "val1"));
|
||||||
|
ASSERT_OK(db_->Delete(wopt, cfd, "foo", EncodeAsUint64(4)));
|
||||||
|
ASSERT_OK(db_->Delete(wopt, cfd, "bar", EncodeAsUint64(2)));
|
||||||
|
ASSERT_OK(db_->Put(wopt, cfd, "bar", EncodeAsUint64(3), "val2"));
|
||||||
|
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
|
||||||
|
std::vector<LiveFileMetaData> files;
|
||||||
|
dbfull()->GetLiveFilesMetaData(&files);
|
||||||
|
ASSERT_TRUE(files.size() == 1);
|
||||||
|
ASSERT_TRUE(files[0].level == 0);
|
||||||
|
std::string file_name = files[0].directory + "/" + files[0].relative_filename;
|
||||||
|
|
||||||
|
SstFileReader reader(options);
|
||||||
|
ASSERT_OK(reader.Open(file_name));
|
||||||
|
ASSERT_OK(reader.VerifyChecksum());
|
||||||
|
|
||||||
|
// When iterating the file as a DB iterator, only one data entry for "bar" is
|
||||||
|
// visible.
|
||||||
|
ReadOptions ropts;
|
||||||
|
std::string read_ts = EncodeAsUint64(4);
|
||||||
|
Slice read_ts_slice = read_ts;
|
||||||
|
ropts.timestamp = &read_ts_slice;
|
||||||
|
std::unique_ptr<Iterator> db_iter(reader.NewIterator(ropts));
|
||||||
|
db_iter->SeekToFirst();
|
||||||
|
ASSERT_TRUE(db_iter->Valid());
|
||||||
|
ASSERT_EQ(db_iter->key(), "bar");
|
||||||
|
ASSERT_EQ(db_iter->value(), "val2");
|
||||||
|
ASSERT_EQ(db_iter->timestamp(), EncodeAsUint64(3));
|
||||||
|
db_iter->Next();
|
||||||
|
ASSERT_FALSE(db_iter->Valid());
|
||||||
|
db_iter.reset();
|
||||||
|
|
||||||
|
std::unique_ptr<Iterator> table_iter = reader.NewTableIterator();
|
||||||
|
|
||||||
|
table_iter->SeekToFirst();
|
||||||
|
VerifyTableEntry(table_iter.get(), "bar" + EncodeAsUint64(3), kTypeValue,
|
||||||
|
"val2");
|
||||||
|
VerifyTableEntry(table_iter.get(), "bar" + EncodeAsUint64(2),
|
||||||
|
kTypeDeletionWithTimestamp, std::nullopt);
|
||||||
|
VerifyTableEntry(table_iter.get(), "foo" + EncodeAsUint64(4),
|
||||||
|
kTypeDeletionWithTimestamp, std::nullopt);
|
||||||
|
VerifyTableEntry(table_iter.get(), "foo" + EncodeAsUint64(3), kTypeValue,
|
||||||
|
"val1");
|
||||||
|
ASSERT_FALSE(table_iter->Valid());
|
||||||
|
|
||||||
|
std::string seek_key_buf;
|
||||||
|
ASSERT_OK(GetInternalKeyForSeek("foo", ucmp, &seek_key_buf));
|
||||||
|
Slice seek_target = seek_key_buf;
|
||||||
|
table_iter->Seek(seek_target);
|
||||||
|
VerifyTableEntry(table_iter.get(), "foo" + EncodeAsUint64(4),
|
||||||
|
kTypeDeletionWithTimestamp, std::nullopt);
|
||||||
|
VerifyTableEntry(table_iter.get(), "foo" + EncodeAsUint64(3), kTypeValue,
|
||||||
|
"val1");
|
||||||
|
ASSERT_FALSE(table_iter->Valid());
|
||||||
|
|
||||||
|
ASSERT_OK(GetInternalKeyForSeekForPrev("bar", ucmp, &seek_key_buf));
|
||||||
|
Slice seek_for_prev_target = seek_key_buf;
|
||||||
|
table_iter->SeekForPrev(seek_for_prev_target);
|
||||||
|
VerifyTableEntry(table_iter.get(), "bar" + EncodeAsUint64(2),
|
||||||
|
kTypeDeletionWithTimestamp, std::nullopt,
|
||||||
|
/*backward_iteration=*/true);
|
||||||
|
VerifyTableEntry(table_iter.get(), "bar" + EncodeAsUint64(3), kTypeValue,
|
||||||
|
"val2", /*backward_iteration=*/true);
|
||||||
|
ASSERT_FALSE(table_iter->Valid());
|
||||||
|
|
||||||
|
Close();
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||||
|
//
|
||||||
|
// This source code is licensed under both the GPLv2 (found in the
|
||||||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "rocksdb/iterator.h"
|
||||||
|
#include "table/internal_iterator.h"
|
||||||
|
|
||||||
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
// An iterator wrapper class used to wrap an `InternalIterator` created by API
|
||||||
|
// `TableReader::NewIterator`. The `InternalIterator` should be allocated with
|
||||||
|
// the default allocator, not on an arena.
|
||||||
|
// NOTE: Callers should ensure the wrapped `InternalIterator*` is a valid
|
||||||
|
// pointer before constructing a `TableIterator` with it.
|
||||||
|
class TableIterator : public Iterator {
|
||||||
|
void reset(InternalIterator* iter) noexcept {
|
||||||
|
if (iter_ != nullptr) {
|
||||||
|
delete iter_;
|
||||||
|
}
|
||||||
|
iter_ = iter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit TableIterator(InternalIterator* iter) : iter_(iter) {}
|
||||||
|
|
||||||
|
TableIterator(const TableIterator&) = delete;
|
||||||
|
TableIterator& operator=(const TableIterator&) = delete;
|
||||||
|
|
||||||
|
TableIterator(TableIterator&& o) noexcept {
|
||||||
|
iter_ = o.iter_;
|
||||||
|
o.iter_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
TableIterator& operator=(TableIterator&& o) noexcept {
|
||||||
|
reset(o.iter_);
|
||||||
|
o.iter_ = nullptr;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
InternalIterator* operator->() { return iter_; }
|
||||||
|
InternalIterator* get() { return iter_; }
|
||||||
|
|
||||||
|
~TableIterator() override { reset(nullptr); }
|
||||||
|
|
||||||
|
bool Valid() const override { return iter_->Valid(); }
|
||||||
|
void SeekToFirst() override { return iter_->SeekToFirst(); }
|
||||||
|
void SeekToLast() override { return iter_->SeekToLast(); }
|
||||||
|
void Seek(const Slice& target) override { return iter_->Seek(target); }
|
||||||
|
void SeekForPrev(const Slice& target) override {
|
||||||
|
return iter_->SeekForPrev(target);
|
||||||
|
}
|
||||||
|
void Next() override { return iter_->Next(); }
|
||||||
|
void Prev() override { return iter_->Prev(); }
|
||||||
|
Slice key() const override { return iter_->key(); }
|
||||||
|
Slice value() const override { return iter_->value(); }
|
||||||
|
Status status() const override { return iter_->status(); }
|
||||||
|
Status GetProperty(std::string /*prop_name*/,
|
||||||
|
std::string* /*prop*/) override {
|
||||||
|
assert(false);
|
||||||
|
return Status::NotSupported("TableIterator does not support GetProperty.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
InternalIterator* iter_;
|
||||||
|
};
|
||||||
|
} // namespace ROCKSDB_NAMESPACE
|
|
@ -0,0 +1 @@
|
||||||
|
*Adds a `SstFileReader::NewTableIterator` API to support programmatically read a SST file as a raw table file.
|
|
@ -271,6 +271,11 @@ class ComparatorWithU64TsImpl : public Comparator {
|
||||||
return -CompareTimestamp(ExtractTimestampFromUserKey(a, ts_sz),
|
return -CompareTimestamp(ExtractTimestampFromUserKey(a, ts_sz),
|
||||||
ExtractTimestampFromUserKey(b, ts_sz));
|
ExtractTimestampFromUserKey(b, ts_sz));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Slice GetMaxTimestamp() const override { return MaxU64Ts(); }
|
||||||
|
|
||||||
|
Slice GetMinTimestamp() const override { return MinU64Ts(); }
|
||||||
|
|
||||||
using Comparator::CompareWithoutTimestamp;
|
using Comparator::CompareWithoutTimestamp;
|
||||||
int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
|
int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
|
||||||
bool b_has_ts) const override {
|
bool b_has_ts) const override {
|
||||||
|
|
|
@ -0,0 +1,88 @@
|
||||||
|
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||||
|
//
|
||||||
|
// This source code is licensed under both the GPLv2 (found in the
|
||||||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
|
||||||
|
#include "rocksdb/utilities/types_util.h"
|
||||||
|
|
||||||
|
#include "db/dbformat.h"
|
||||||
|
|
||||||
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
|
||||||
|
Status GetInternalKeyForSeek(const Slice& user_key,
|
||||||
|
const Comparator* comparator, std::string* buf) {
|
||||||
|
if (!comparator) {
|
||||||
|
return Status::InvalidArgument(
|
||||||
|
"Constructing an internal key requires user key comparator.");
|
||||||
|
}
|
||||||
|
size_t ts_sz = comparator->timestamp_size();
|
||||||
|
Slice max_ts = comparator->GetMaxTimestamp();
|
||||||
|
if (ts_sz != max_ts.size()) {
|
||||||
|
return Status::InvalidArgument(
|
||||||
|
"The maximum timestamp returned by Comparator::GetMaxTimestamp is "
|
||||||
|
"invalid.");
|
||||||
|
}
|
||||||
|
buf->reserve(user_key.size() + ts_sz + kNumInternalBytes);
|
||||||
|
buf->assign(user_key.data(), user_key.size());
|
||||||
|
if (ts_sz) {
|
||||||
|
buf->append(max_ts.data(), max_ts.size());
|
||||||
|
}
|
||||||
|
PutFixed64(buf, PackSequenceAndType(kMaxSequenceNumber, kValueTypeForSeek));
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
Status GetInternalKeyForSeekForPrev(const Slice& user_key,
|
||||||
|
const Comparator* comparator,
|
||||||
|
std::string* buf) {
|
||||||
|
if (!comparator) {
|
||||||
|
return Status::InvalidArgument(
|
||||||
|
"Constructing an internal key requires user key comparator.");
|
||||||
|
}
|
||||||
|
size_t ts_sz = comparator->timestamp_size();
|
||||||
|
Slice min_ts = comparator->GetMinTimestamp();
|
||||||
|
if (ts_sz != min_ts.size()) {
|
||||||
|
return Status::InvalidArgument(
|
||||||
|
"The minimum timestamp returned by Comparator::GetMinTimestamp is "
|
||||||
|
"invalid.");
|
||||||
|
}
|
||||||
|
buf->reserve(user_key.size() + ts_sz + kNumInternalBytes);
|
||||||
|
buf->assign(user_key.data(), user_key.size());
|
||||||
|
if (ts_sz) {
|
||||||
|
buf->append(min_ts.data(), min_ts.size());
|
||||||
|
}
|
||||||
|
PutFixed64(buf, PackSequenceAndType(0, kValueTypeForSeekForPrev));
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
Status ParseEntry(const Slice& internal_key, const Comparator* comparator,
|
||||||
|
ParsedEntryInfo* parsed_entry) {
|
||||||
|
if (internal_key.size() < kNumInternalBytes) {
|
||||||
|
return Status::InvalidArgument("Internal key size invalid.");
|
||||||
|
}
|
||||||
|
if (!comparator) {
|
||||||
|
return Status::InvalidArgument(
|
||||||
|
"Parsing an internal key requires user key comparator.");
|
||||||
|
}
|
||||||
|
ParsedInternalKey pikey;
|
||||||
|
Status status = ParseInternalKey(internal_key, &pikey, /*log_err_key=*/false);
|
||||||
|
if (!status.ok()) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t ts_sz = comparator->timestamp_size();
|
||||||
|
if (pikey.user_key.size() < ts_sz) {
|
||||||
|
return Status::InvalidArgument("User key(with timestamp) size invalid.");
|
||||||
|
}
|
||||||
|
if (ts_sz == 0) {
|
||||||
|
parsed_entry->user_key = pikey.user_key;
|
||||||
|
} else {
|
||||||
|
parsed_entry->user_key = StripTimestampFromUserKey(pikey.user_key, ts_sz);
|
||||||
|
parsed_entry->timestamp =
|
||||||
|
ExtractTimestampFromUserKey(pikey.user_key, ts_sz);
|
||||||
|
}
|
||||||
|
parsed_entry->sequence = pikey.sequence;
|
||||||
|
parsed_entry->type = ROCKSDB_NAMESPACE::GetEntryType(pikey.type);
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
} // namespace ROCKSDB_NAMESPACE
|
|
@ -0,0 +1,98 @@
|
||||||
|
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||||
|
//
|
||||||
|
// This source code is licensed under both the GPLv2 (found in the
|
||||||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
|
||||||
|
#include "rocksdb/utilities/types_util.h"
|
||||||
|
|
||||||
|
#include "db/dbformat.h"
|
||||||
|
#include "port/stack_trace.h"
|
||||||
|
#include "rocksdb/types.h"
|
||||||
|
#include "test_util/testharness.h"
|
||||||
|
|
||||||
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
namespace {
|
||||||
|
std::string EncodeAsUint64(uint64_t v) {
|
||||||
|
std::string dst;
|
||||||
|
PutFixed64(&dst, v);
|
||||||
|
return dst;
|
||||||
|
}
|
||||||
|
std::string IKey(const std::string& user_key, uint64_t seq, ValueType vt,
|
||||||
|
std::optional<uint64_t> timestamp) {
|
||||||
|
std::string encoded;
|
||||||
|
encoded.assign(user_key.data(), user_key.size());
|
||||||
|
if (timestamp.has_value()) {
|
||||||
|
PutFixed64(&encoded, timestamp.value());
|
||||||
|
}
|
||||||
|
PutFixed64(&encoded, PackSequenceAndType(seq, vt));
|
||||||
|
return encoded;
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
TEST(ParseEntryTest, InvalidInternalKey) {
|
||||||
|
const Comparator* ucmp = BytewiseComparator();
|
||||||
|
std::string invalid_ikey = "foo";
|
||||||
|
Slice ikey_slice = invalid_ikey;
|
||||||
|
ParsedEntryInfo parsed_entry;
|
||||||
|
ASSERT_TRUE(ParseEntry(ikey_slice, ucmp, &parsed_entry).IsInvalidArgument());
|
||||||
|
|
||||||
|
std::string ikey =
|
||||||
|
IKey("foo", 3, ValueType::kTypeValue, /*timestamp=*/std::nullopt);
|
||||||
|
ikey_slice = ikey;
|
||||||
|
ASSERT_TRUE(
|
||||||
|
ParseEntry(ikey_slice, nullptr, &parsed_entry).IsInvalidArgument());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ParseEntryTest, Basic) {
|
||||||
|
const Comparator* ucmp = BytewiseComparator();
|
||||||
|
std::string ikey =
|
||||||
|
IKey("foo", 3, ValueType::kTypeValue, /*timestamp=*/std::nullopt);
|
||||||
|
Slice ikey_slice = ikey;
|
||||||
|
|
||||||
|
ParsedEntryInfo parsed_entry;
|
||||||
|
ASSERT_OK(ParseEntry(ikey_slice, ucmp, &parsed_entry));
|
||||||
|
ASSERT_EQ(parsed_entry.user_key, "foo");
|
||||||
|
ASSERT_EQ(parsed_entry.timestamp, "");
|
||||||
|
ASSERT_EQ(parsed_entry.sequence, 3);
|
||||||
|
ASSERT_EQ(parsed_entry.type, EntryType::kEntryPut);
|
||||||
|
|
||||||
|
ikey = IKey("bar", 5, ValueType::kTypeDeletion, /*timestamp=*/std::nullopt);
|
||||||
|
ikey_slice = ikey;
|
||||||
|
|
||||||
|
ASSERT_OK(ParseEntry(ikey_slice, ucmp, &parsed_entry));
|
||||||
|
ASSERT_EQ(parsed_entry.user_key, "bar");
|
||||||
|
ASSERT_EQ(parsed_entry.timestamp, "");
|
||||||
|
ASSERT_EQ(parsed_entry.sequence, 5);
|
||||||
|
ASSERT_EQ(parsed_entry.type, EntryType::kEntryDelete);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ParseEntryTest, UserKeyIncludesTimestamp) {
|
||||||
|
const Comparator* ucmp = BytewiseComparatorWithU64Ts();
|
||||||
|
std::string ikey = IKey("foo", 3, ValueType::kTypeValue, 50);
|
||||||
|
Slice ikey_slice = ikey;
|
||||||
|
|
||||||
|
ParsedEntryInfo parsed_entry;
|
||||||
|
ASSERT_OK(ParseEntry(ikey_slice, ucmp, &parsed_entry));
|
||||||
|
ASSERT_EQ(parsed_entry.user_key, "foo");
|
||||||
|
ASSERT_EQ(parsed_entry.timestamp, EncodeAsUint64(50));
|
||||||
|
ASSERT_EQ(parsed_entry.sequence, 3);
|
||||||
|
ASSERT_EQ(parsed_entry.type, EntryType::kEntryPut);
|
||||||
|
|
||||||
|
ikey = IKey("bar", 5, ValueType::kTypeDeletion, 30);
|
||||||
|
ikey_slice = ikey;
|
||||||
|
|
||||||
|
ASSERT_OK(ParseEntry(ikey_slice, ucmp, &parsed_entry));
|
||||||
|
ASSERT_EQ(parsed_entry.user_key, "bar");
|
||||||
|
ASSERT_EQ(parsed_entry.timestamp, EncodeAsUint64(30));
|
||||||
|
ASSERT_EQ(parsed_entry.sequence, 5);
|
||||||
|
ASSERT_EQ(parsed_entry.type, EntryType::kEntryDelete);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
||||||
|
::testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
Loading…
Reference in New Issue