Add SstFileReader to read sst files (#4717)

Summary:
A user friendly sst file reader is useful when we want to access sst
files outside of RocksDB. For example, we can generate an sst file
with SstFileWriter and send it to other places, then use SstFileReader
to read the file and process the entries in other ways.

Also rename the original SstFileReader to SstFileDumper because of
name conflict, and seems SstFileDumper is more appropriate for tools.

TODO: there is only a very simple test now, because I want to get some feedback first.
If the changes look good, I will add more tests soon.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4717

Differential Revision: D13212686

Pulled By: ajkr

fbshipit-source-id: 737593383264c954b79e63edaf44aaae0d947e56
This commit is contained in:
Huachao Huang 2018-11-27 12:59:27 -08:00 committed by Facebook Github Bot
parent 3fa80f0e85
commit 5e72bc113a
11 changed files with 270 additions and 29 deletions

View File

@ -578,6 +578,7 @@ set(SOURCES
table/plain_table_index.cc
table/plain_table_key_coding.cc
table/plain_table_reader.cc
table/sst_file_reader.cc
table/sst_file_writer.cc
table/table_properties.cc
table/two_level_iterator.cc
@ -935,6 +936,7 @@ if(WITH_TESTS)
table/data_block_hash_index_test.cc
table/full_filter_block_test.cc
table/merger_test.cc
table/sst_file_reader_test.cc
table/table_test.cc
tools/ldb_cmd_test.cc
tools/reduce_levels_test.cc

View File

@ -555,6 +555,7 @@ TESTS = \
repeatable_thread_test \
range_tombstone_fragmenter_test \
range_del_aggregator_v2_test \
sst_file_reader_test \
PARALLEL_TEST = \
backupable_db_test \
@ -1590,6 +1591,9 @@ range_tombstone_fragmenter_test: db/range_tombstone_fragmenter_test.o db/db_test
range_del_aggregator_v2_test: db/range_del_aggregator_v2_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
sst_file_reader_test: table/sst_file_reader_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
#-------------------------------------------------
# make install related stuff
INSTALL_PATH ?= /usr/local

View File

@ -202,6 +202,7 @@ cpp_library(
"table/plain_table_index.cc",
"table/plain_table_key_coding.cc",
"table/plain_table_reader.cc",
"table/sst_file_reader.cc",
"table/sst_file_writer.cc",
"table/table_properties.cc",
"table/two_level_iterator.cc",
@ -1098,6 +1099,11 @@ ROCKS_TESTS = [
"utilities/transactions/write_unprepared_transaction_test.cc",
"parallel",
],
[
"sst_file_reader_test",
"table/sst_file_reader_test.cc",
"serial",
],
]
# Generate a test rule for each entry in ROCKS_TESTS

View File

@ -0,0 +1,45 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include "rocksdb/iterator.h"
#include "rocksdb/table_properties.h"
namespace rocksdb {
// SstFileReader is used to read sst files that are generated by DB or
// SstFileWriter.
class SstFileReader {
public:
SstFileReader(const Options& options);
~SstFileReader();
// Prepares to read from the file located at "file_path".
Status Open(const std::string& file_path);
// Returns a new iterator over the table contents.
// Most read options provide the same control as we read from DB.
// If "snapshot" is nullptr, the iterator returns only the latest keys.
Iterator* NewIterator(const ReadOptions& options);
std::shared_ptr<const TableProperties> GetTableProperties() const;
// Verifies whether there is corruption in this table.
Status VerifyChecksum();
private:
struct Rep;
std::unique_ptr<Rep> rep_;
};
} // namespace rocksdb
#endif // !ROCKSDB_LITE

View File

@ -452,7 +452,7 @@ class TableFactory {
// NewTableReader() is called in three places:
// (1) TableCache::FindTable() calls the function when table cache miss
// and cache the table object returned.
// (2) SstFileReader (for SST Dump) opens the table and dump the table
// (2) SstFileDumper (for SST Dump) opens the table and dump the table
// contents using the iterator of the table.
// (3) DBImpl::IngestExternalFile() calls this function to read the contents of
// the sst file it's attempting to add

2
src.mk
View File

@ -122,6 +122,7 @@ LIB_SOURCES = \
table/plain_table_index.cc \
table/plain_table_key_coding.cc \
table/plain_table_reader.cc \
table/sst_file_reader.cc \
table/sst_file_writer.cc \
table/table_properties.cc \
table/two_level_iterator.cc \
@ -362,6 +363,7 @@ MAIN_SOURCES = \
table/data_block_hash_index_test.cc \
table/full_filter_block_test.cc \
table/merger_test.cc \
table/sst_file_reader_test.cc \
table/table_reader_bench.cc \
table/table_test.cc \
third-party/gtest-1.7.0/fused-src/gtest/gtest-all.cc \

84
table/sst_file_reader.cc Normal file
View File

@ -0,0 +1,84 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "rocksdb/sst_file_reader.h"
#include "db/db_iter.h"
#include "options/cf_options.h"
#include "table/get_context.h"
#include "table/table_reader.h"
#include "table/table_builder.h"
#include "util/file_reader_writer.h"
namespace rocksdb {
struct SstFileReader::Rep {
Options options;
EnvOptions soptions;
ImmutableCFOptions ioptions;
MutableCFOptions moptions;
std::unique_ptr<TableReader> table_reader;
Rep(const Options& opts)
: options(opts),
soptions(options),
ioptions(options),
moptions(ColumnFamilyOptions(options)) {}
};
SstFileReader::SstFileReader(const Options& options)
: rep_(new Rep(options)) {}
SstFileReader::~SstFileReader() {}
Status SstFileReader::Open(const std::string& file_path) {
auto r = rep_.get();
Status s;
uint64_t file_size = 0;
std::unique_ptr<RandomAccessFile> file;
std::unique_ptr<RandomAccessFileReader> file_reader;
s = r->options.env->GetFileSize(file_path, &file_size);
if (s.ok()) {
s = r->options.env->NewRandomAccessFile(file_path, &file, r->soptions);
}
if (s.ok()) {
file_reader.reset(new RandomAccessFileReader(std::move(file), file_path));
}
if (s.ok()) {
s = r->options.table_factory->NewTableReader(
TableReaderOptions(r->ioptions, r->moptions.prefix_extractor.get(),
r->soptions, r->ioptions.internal_comparator),
std::move(file_reader), file_size, &r->table_reader);
}
return s;
}
Iterator* SstFileReader::NewIterator(const ReadOptions& options) {
auto r = rep_.get();
auto sequence = options.snapshot != nullptr ?
options.snapshot->GetSequenceNumber() :
kMaxSequenceNumber;
auto internal_iter = r->table_reader->NewIterator(
options, r->moptions.prefix_extractor.get());
return NewDBIterator(r->options.env, options, r->ioptions, r->moptions,
r->ioptions.user_comparator, internal_iter, sequence,
r->moptions.max_sequential_skip_in_iterations,
nullptr /* read_callback */);
}
std::shared_ptr<const TableProperties> SstFileReader::GetTableProperties() const {
return rep_->table_reader->GetTableProperties();
}
Status SstFileReader::VerifyChecksum() {
return rep_->table_reader->VerifyChecksum();
}
} // namespace rocksdb
#endif // !ROCKSDB_LITE

View File

@ -0,0 +1,98 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include <inttypes.h>
#include "rocksdb/sst_file_reader.h"
#include "rocksdb/sst_file_writer.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h"
namespace rocksdb {
std::string EncodeAsString(uint64_t v) {
char buf[16];
snprintf(buf, sizeof(buf), "%08" PRIu64, v);
return std::string(buf);
}
std::string EncodeAsUint64(uint64_t v) {
std::string dst;
PutFixed64(&dst, v);
return dst;
}
class SstFileReaderTest : public testing::Test {
public:
SstFileReaderTest() {
options_.merge_operator = MergeOperators::CreateUInt64AddOperator();
sst_name_ = test::PerThreadDBPath("sst_file");
}
void CreateFileAndCheck(const std::vector<std::string>& keys) {
SstFileWriter writer(soptions_, options_);
ASSERT_OK(writer.Open(sst_name_));
for (size_t i = 0; i + 2 < keys.size(); i += 3) {
ASSERT_OK(writer.Put(keys[i], keys[i]));
ASSERT_OK(writer.Merge(keys[i+1], EncodeAsUint64(i+1)));
ASSERT_OK(writer.Delete(keys[i+2]));
}
ASSERT_OK(writer.Finish());
ReadOptions ropts;
SstFileReader reader(options_);
ASSERT_OK(reader.Open(sst_name_));
ASSERT_OK(reader.VerifyChecksum());
std::unique_ptr<Iterator> iter(reader.NewIterator(ropts));
iter->SeekToFirst();
for (size_t i = 0; i + 2 < keys.size(); i += 3) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(keys[i]), 0);
ASSERT_EQ(iter->value().compare(keys[i]), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(keys[i+1]), 0);
ASSERT_EQ(iter->value().compare(EncodeAsUint64(i+1)), 0);
iter->Next();
}
ASSERT_FALSE(iter->Valid());
}
protected:
Options options_;
EnvOptions soptions_;
std::string sst_name_;
};
const uint64_t kNumKeys = 100;
TEST_F(SstFileReaderTest, Basic) {
std::vector<std::string> keys;
for (uint64_t i = 0; i < kNumKeys; i++) {
keys.emplace_back(EncodeAsString(i));
}
CreateFileAndCheck(keys);
}
TEST_F(SstFileReaderTest, Uint64Comparator) {
options_.comparator = test::Uint64Comparator();
std::vector<std::string> keys;
for (uint64_t i = 0; i < kNumKeys; i++) {
keys.emplace_back(EncodeAsUint64(i));
}
CreateFileAndCheck(keys);
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#endif // ROCKSDB_LITE

View File

@ -2845,8 +2845,8 @@ void DumpSstFile(std::string filename, bool output_hex, bool show_properties) {
return;
}
// no verification
rocksdb::SstFileReader reader(filename, false, output_hex);
Status st = reader.ReadSequential(true, std::numeric_limits<uint64_t>::max(), false, // has_from
rocksdb::SstFileDumper dumper(filename, false, output_hex);
Status st = dumper.ReadSequential(true, std::numeric_limits<uint64_t>::max(), false, // has_from
from_key, false, // has_to
to_key);
if (!st.ok()) {
@ -2860,11 +2860,11 @@ void DumpSstFile(std::string filename, bool output_hex, bool show_properties) {
std::shared_ptr<const rocksdb::TableProperties>
table_properties_from_reader;
st = reader.ReadTableProperties(&table_properties_from_reader);
st = dumper.ReadTableProperties(&table_properties_from_reader);
if (!st.ok()) {
std::cerr << filename << ": " << st.ToString()
<< ". Try to use initial table properties" << std::endl;
table_properties = reader.GetInitTableProperties();
table_properties = dumper.GetInitTableProperties();
} else {
table_properties = table_properties_from_reader.get();
}

View File

@ -43,7 +43,7 @@
namespace rocksdb {
SstFileReader::SstFileReader(const std::string& file_path, bool verify_checksum,
SstFileDumper::SstFileDumper(const std::string& file_path, bool verify_checksum,
bool output_hex)
: file_name_(file_path),
read_num_(0),
@ -74,7 +74,7 @@ static const std::vector<std::pair<CompressionType, const char*>>
{CompressionType::kXpressCompression, "kXpressCompression"},
{CompressionType::kZSTD, "kZSTD"}};
Status SstFileReader::GetTableReader(const std::string& file_path) {
Status SstFileDumper::GetTableReader(const std::string& file_path) {
// Warning about 'magic_number' being uninitialized shows up only in UBsan
// builds. Though access is guarded by 's.ok()' checks, fix the issue to
// avoid any warnings.
@ -123,7 +123,7 @@ Status SstFileReader::GetTableReader(const std::string& file_path) {
return s;
}
Status SstFileReader::NewTableReader(
Status SstFileDumper::NewTableReader(
const ImmutableCFOptions& /*ioptions*/, const EnvOptions& /*soptions*/,
const InternalKeyComparator& /*internal_comparator*/, uint64_t file_size,
std::unique_ptr<TableReader>* /*table_reader*/) {
@ -143,11 +143,11 @@ Status SstFileReader::NewTableReader(
std::move(file_), file_size, &table_reader_);
}
Status SstFileReader::VerifyChecksum() {
Status SstFileDumper::VerifyChecksum() {
return table_reader_->VerifyChecksum();
}
Status SstFileReader::DumpTable(const std::string& out_filename) {
Status SstFileDumper::DumpTable(const std::string& out_filename) {
std::unique_ptr<WritableFile> out_file;
Env* env = Env::Default();
env->NewWritableFile(out_filename, &out_file, soptions_);
@ -157,7 +157,7 @@ Status SstFileReader::DumpTable(const std::string& out_filename) {
return s;
}
uint64_t SstFileReader::CalculateCompressedTableSize(
uint64_t SstFileDumper::CalculateCompressedTableSize(
const TableBuilderOptions& tb_options, size_t block_size) {
std::unique_ptr<WritableFile> out_file;
std::unique_ptr<Env> env(NewMemEnv(Env::Default()));
@ -192,7 +192,7 @@ uint64_t SstFileReader::CalculateCompressedTableSize(
return size;
}
int SstFileReader::ShowAllCompressionSizes(
int SstFileDumper::ShowAllCompressionSizes(
size_t block_size,
const std::vector<std::pair<CompressionType, const char*>>&
compression_types) {
@ -226,7 +226,7 @@ int SstFileReader::ShowAllCompressionSizes(
return 0;
}
Status SstFileReader::ReadTableProperties(uint64_t table_magic_number,
Status SstFileDumper::ReadTableProperties(uint64_t table_magic_number,
RandomAccessFileReader* file,
uint64_t file_size) {
TableProperties* table_properties = nullptr;
@ -240,7 +240,7 @@ Status SstFileReader::ReadTableProperties(uint64_t table_magic_number,
return s;
}
Status SstFileReader::SetTableOptionsByMagicNumber(
Status SstFileDumper::SetTableOptionsByMagicNumber(
uint64_t table_magic_number) {
assert(table_properties_);
if (table_magic_number == kBlockBasedTableMagicNumber ||
@ -283,7 +283,7 @@ Status SstFileReader::SetTableOptionsByMagicNumber(
return Status::OK();
}
Status SstFileReader::SetOldTableOptions() {
Status SstFileDumper::SetOldTableOptions() {
assert(table_properties_ == nullptr);
options_.table_factory = std::make_shared<BlockBasedTableFactory>();
fprintf(stdout, "Sst file format: block-based(old version)\n");
@ -291,7 +291,7 @@ Status SstFileReader::SetOldTableOptions() {
return Status::OK();
}
Status SstFileReader::ReadSequential(bool print_kv, uint64_t read_num,
Status SstFileDumper::ReadSequential(bool print_kv, uint64_t read_num,
bool has_from, const std::string& from_key,
bool has_to, const std::string& to_key,
bool use_from_as_prefix) {
@ -348,7 +348,7 @@ Status SstFileReader::ReadSequential(bool print_kv, uint64_t read_num,
return ret;
}
Status SstFileReader::ReadTableProperties(
Status SstFileDumper::ReadTableProperties(
std::shared_ptr<const TableProperties>* table_properties) {
if (!table_reader_) {
return init_result_;
@ -570,16 +570,16 @@ int SSTDumpTool::Run(int argc, char** argv) {
filename = std::string(dir_or_file) + "/" + filename;
}
rocksdb::SstFileReader reader(filename, verify_checksum,
rocksdb::SstFileDumper dumper(filename, verify_checksum,
output_hex);
if (!reader.getStatus().ok()) {
if (!dumper.getStatus().ok()) {
fprintf(stderr, "%s: %s\n", filename.c_str(),
reader.getStatus().ToString().c_str());
dumper.getStatus().ToString().c_str());
continue;
}
if (command == "recompress") {
reader.ShowAllCompressionSizes(
dumper.ShowAllCompressionSizes(
set_block_size ? block_size : 16384,
compression_types.empty() ? kCompressions : compression_types);
return 0;
@ -589,7 +589,7 @@ int SSTDumpTool::Run(int argc, char** argv) {
std::string out_filename = filename.substr(0, filename.length() - 4);
out_filename.append("_dump.txt");
st = reader.DumpTable(out_filename);
st = dumper.DumpTable(out_filename);
if (!st.ok()) {
fprintf(stderr, "%s: %s\n", filename.c_str(), st.ToString().c_str());
exit(1);
@ -601,7 +601,7 @@ int SSTDumpTool::Run(int argc, char** argv) {
// scan all files in give file path.
if (command == "" || command == "scan" || command == "check") {
st = reader.ReadSequential(
st = dumper.ReadSequential(
command == "scan", read_num > 0 ? (read_num - total_read) : read_num,
has_from || use_from_as_prefix, from_key, has_to, to_key,
use_from_as_prefix);
@ -609,14 +609,14 @@ int SSTDumpTool::Run(int argc, char** argv) {
fprintf(stderr, "%s: %s\n", filename.c_str(),
st.ToString().c_str());
}
total_read += reader.GetReadNumber();
total_read += dumper.GetReadNumber();
if (read_num > 0 && total_read > read_num) {
break;
}
}
if (command == "verify") {
st = reader.VerifyChecksum();
st = dumper.VerifyChecksum();
if (!st.ok()) {
fprintf(stderr, "%s is corrupted: %s\n", filename.c_str(),
st.ToString().c_str());
@ -631,11 +631,11 @@ int SSTDumpTool::Run(int argc, char** argv) {
std::shared_ptr<const rocksdb::TableProperties>
table_properties_from_reader;
st = reader.ReadTableProperties(&table_properties_from_reader);
st = dumper.ReadTableProperties(&table_properties_from_reader);
if (!st.ok()) {
fprintf(stderr, "%s: %s\n", filename.c_str(), st.ToString().c_str());
fprintf(stderr, "Try to use initial table properties\n");
table_properties = reader.GetInitTableProperties();
table_properties = dumper.GetInitTableProperties();
} else {
table_properties = table_properties_from_reader.get();
}

View File

@ -15,9 +15,9 @@
namespace rocksdb {
class SstFileReader {
class SstFileDumper {
public:
explicit SstFileReader(const std::string& file_name, bool verify_checksum,
explicit SstFileDumper(const std::string& file_name, bool verify_checksum,
bool output_hex);
Status ReadSequential(bool print_kv, uint64_t read_num, bool has_from,