mirror of https://github.com/facebook/rocksdb.git
Fix a couple of missing cases of retry on corruption (#13007)
Summary: For SST checksum mismatch corruptions in the read path, RocksDB retries the read if the underlying file system supports verification and reconstruction of data (`FSSupportedOps::kVerifyAndReconstructRead`). There were a couple of places where the retry was missing - reading the SST footer and the properties block. This PR fixes the retry in those cases. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13007 Test Plan: Add new unit tests Reviewed By: jaykorean Differential Revision: D62519186 Pulled By: anand1976 fbshipit-source-id: 50aa38f18f2a53531a9fc8d4ccdf34fbf034ed59
This commit is contained in:
parent
e490f2b051
commit
cabd2d8718
|
@ -895,6 +895,81 @@ TEST_P(DBIOCorruptionTest, ManifestCorruptionRetry) {
|
|||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_P(DBIOCorruptionTest, FooterReadCorruptionRetry) {
|
||||
Random rnd(300);
|
||||
bool retry = false;
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"ReadFooterFromFileInternal:0", [&](void* arg) {
|
||||
Slice* data = static_cast<Slice*>(arg);
|
||||
if (!retry) {
|
||||
std::memcpy(const_cast<char*>(data->data()),
|
||||
rnd.RandomString(static_cast<int>(data->size())).c_str(),
|
||||
data->size());
|
||||
retry = true;
|
||||
}
|
||||
});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(Put("key1", "val1"));
|
||||
Status s = Flush();
|
||||
if (std::get<2>(GetParam())) {
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
|
||||
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
|
||||
1);
|
||||
|
||||
std::string val;
|
||||
ReadOptions ro;
|
||||
ro.async_io = std::get<1>(GetParam());
|
||||
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
|
||||
ASSERT_EQ(val, "val1");
|
||||
} else {
|
||||
ASSERT_NOK(s);
|
||||
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
|
||||
ASSERT_GT(stats()->getTickerCount(SST_FOOTER_CORRUPTION_COUNT), 0);
|
||||
}
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
TEST_P(DBIOCorruptionTest, TablePropertiesCorruptionRetry) {
|
||||
Random rnd(300);
|
||||
bool retry = false;
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"ReadTablePropertiesHelper:0", [&](void* arg) {
|
||||
Slice* data = static_cast<Slice*>(arg);
|
||||
if (!retry) {
|
||||
std::memcpy(const_cast<char*>(data->data()),
|
||||
rnd.RandomString(static_cast<int>(data->size())).c_str(),
|
||||
data->size());
|
||||
retry = true;
|
||||
}
|
||||
});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(Put("key1", "val1"));
|
||||
Status s = Flush();
|
||||
if (std::get<2>(GetParam())) {
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
|
||||
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
|
||||
1);
|
||||
|
||||
std::string val;
|
||||
ReadOptions ro;
|
||||
ro.async_io = std::get<1>(GetParam());
|
||||
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
|
||||
ASSERT_EQ(val, "val1");
|
||||
} else {
|
||||
ASSERT_NOK(s);
|
||||
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
|
||||
}
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
// The parameters are - 1. Use FS provided buffer, 2. Use async IO ReadOption,
|
||||
// 3. Retry with verify_and_reconstruct_read IOOption
|
||||
INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,
|
||||
|
|
|
@ -680,26 +680,12 @@ Status BlockBasedTable::Open(
|
|||
if (s.ok()) {
|
||||
s = ReadFooterFromFile(opts, file.get(), *ioptions.fs,
|
||||
prefetch_buffer.get(), file_size, &footer,
|
||||
kBlockBasedTableMagicNumber);
|
||||
}
|
||||
// If the footer is corrupted and the FS supports checksum verification and
|
||||
// correction, try reading the footer again
|
||||
if (s.IsCorruption()) {
|
||||
RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT);
|
||||
if (CheckFSFeatureSupport(ioptions.fs.get(),
|
||||
FSSupportedOps::kVerifyAndReconstructRead)) {
|
||||
IOOptions retry_opts = opts;
|
||||
retry_opts.verify_and_reconstruct_read = true;
|
||||
s = ReadFooterFromFile(retry_opts, file.get(), *ioptions.fs,
|
||||
prefetch_buffer.get(), file_size, &footer,
|
||||
kBlockBasedTableMagicNumber);
|
||||
RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_COUNT);
|
||||
if (s.ok()) {
|
||||
RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
|
||||
}
|
||||
}
|
||||
kBlockBasedTableMagicNumber, ioptions.stats);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
if (s.IsCorruption()) {
|
||||
RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
if (!IsSupportedFormatVersion(footer.format_version())) {
|
||||
|
|
|
@ -475,10 +475,12 @@ std::string Footer::ToString() const {
|
|||
return result;
|
||||
}
|
||||
|
||||
Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
|
||||
FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
|
||||
uint64_t file_size, Footer* footer,
|
||||
uint64_t enforce_table_magic_number) {
|
||||
static Status ReadFooterFromFileInternal(const IOOptions& opts,
|
||||
RandomAccessFileReader* file,
|
||||
FileSystem& fs,
|
||||
FilePrefetchBuffer* prefetch_buffer,
|
||||
uint64_t file_size, Footer* footer,
|
||||
uint64_t enforce_table_magic_number) {
|
||||
if (file_size < Footer::kMinEncodedLength) {
|
||||
return Status::Corruption("file is too short (" +
|
||||
std::to_string(file_size) +
|
||||
|
@ -516,6 +518,8 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
|
|||
}
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT_CALLBACK("ReadFooterFromFileInternal:0", &footer_input);
|
||||
|
||||
// Check that we actually read the whole footer from the file. It may be
|
||||
// that size isn't correct.
|
||||
if (footer_input.size() < Footer::kMinEncodedLength) {
|
||||
|
@ -543,6 +547,30 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
|
||||
FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
|
||||
uint64_t file_size, Footer* footer,
|
||||
uint64_t enforce_table_magic_number,
|
||||
Statistics* stats) {
|
||||
Status s =
|
||||
ReadFooterFromFileInternal(opts, file, fs, prefetch_buffer, file_size,
|
||||
footer, enforce_table_magic_number);
|
||||
if (s.IsCorruption() &&
|
||||
CheckFSFeatureSupport(&fs, FSSupportedOps::kVerifyAndReconstructRead)) {
|
||||
IOOptions new_opts = opts;
|
||||
new_opts.verify_and_reconstruct_read = true;
|
||||
footer->Reset();
|
||||
s = ReadFooterFromFileInternal(new_opts, file, fs, prefetch_buffer,
|
||||
file_size, footer,
|
||||
enforce_table_magic_number);
|
||||
RecordTick(stats, FILE_READ_CORRUPTION_RETRY_COUNT);
|
||||
if (s.ok()) {
|
||||
RecordTick(stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
namespace {
|
||||
// Custom handling for the last byte of a block, to avoid invoking streaming
|
||||
// API to get an effective block checksum. This function is its own inverse
|
||||
|
|
|
@ -186,6 +186,16 @@ class Footer {
|
|||
// Create empty. Populate using DecodeFrom.
|
||||
Footer() {}
|
||||
|
||||
void Reset() {
|
||||
table_magic_number_ = kNullTableMagicNumber;
|
||||
format_version_ = kInvalidFormatVersion;
|
||||
base_context_checksum_ = 0;
|
||||
metaindex_handle_ = BlockHandle::NullBlockHandle();
|
||||
index_handle_ = BlockHandle::NullBlockHandle();
|
||||
checksum_type_ = kInvalidChecksumType;
|
||||
block_trailer_size_ = 0;
|
||||
}
|
||||
|
||||
// Deserialize a footer (populate fields) from `input` and check for various
|
||||
// corruptions. `input_offset` is the offset within the target file of
|
||||
// `input` buffer, which is needed for verifying format_version >= 6 footer.
|
||||
|
@ -304,7 +314,8 @@ class FooterBuilder {
|
|||
Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
|
||||
FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
|
||||
uint64_t file_size, Footer* footer,
|
||||
uint64_t enforce_table_magic_number = 0);
|
||||
uint64_t enforce_table_magic_number = 0,
|
||||
Statistics* stats = nullptr);
|
||||
|
||||
// Computes a checksum using the given ChecksumType. Sometimes we need to
|
||||
// include one more input byte logically at the end but not part of the main
|
||||
|
|
|
@ -262,184 +262,232 @@ Status ReadTablePropertiesHelper(
|
|||
MemoryAllocator* memory_allocator) {
|
||||
assert(table_properties);
|
||||
|
||||
// If this is an external SST file ingested with write_global_seqno set to
|
||||
// true, then we expect the checksum mismatch because checksum was written
|
||||
// by SstFileWriter, but its global seqno in the properties block may have
|
||||
// been changed during ingestion. For this reason, we initially read
|
||||
// and process without checksum verification, then later try checksum
|
||||
// verification so that if it fails, we can copy to a temporary buffer with
|
||||
// global seqno set to its original value, i.e. 0, and attempt checksum
|
||||
// verification again.
|
||||
ReadOptions modified_ro = ro;
|
||||
modified_ro.verify_checksums = false;
|
||||
BlockContents block_contents;
|
||||
BlockFetcher block_fetcher(file, prefetch_buffer, footer, modified_ro, handle,
|
||||
&block_contents, ioptions, false /* decompress */,
|
||||
false /*maybe_compressed*/, BlockType::kProperties,
|
||||
UncompressionDict::GetEmptyDict(),
|
||||
PersistentCacheOptions::kEmpty, memory_allocator);
|
||||
Status s = block_fetcher.ReadBlockContents();
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
// Unfortunately, Block::size() might not equal block_contents.data.size(),
|
||||
// and Block hides block_contents
|
||||
uint64_t block_size = block_contents.data.size();
|
||||
Block properties_block(std::move(block_contents));
|
||||
std::unique_ptr<MetaBlockIter> iter(properties_block.NewMetaIterator());
|
||||
|
||||
std::unique_ptr<TableProperties> new_table_properties{new TableProperties};
|
||||
// All pre-defined properties of type uint64_t
|
||||
std::unordered_map<std::string, uint64_t*> predefined_uint64_properties = {
|
||||
{TablePropertiesNames::kOriginalFileNumber,
|
||||
&new_table_properties->orig_file_number},
|
||||
{TablePropertiesNames::kDataSize, &new_table_properties->data_size},
|
||||
{TablePropertiesNames::kIndexSize, &new_table_properties->index_size},
|
||||
{TablePropertiesNames::kIndexPartitions,
|
||||
&new_table_properties->index_partitions},
|
||||
{TablePropertiesNames::kTopLevelIndexSize,
|
||||
&new_table_properties->top_level_index_size},
|
||||
{TablePropertiesNames::kIndexKeyIsUserKey,
|
||||
&new_table_properties->index_key_is_user_key},
|
||||
{TablePropertiesNames::kIndexValueIsDeltaEncoded,
|
||||
&new_table_properties->index_value_is_delta_encoded},
|
||||
{TablePropertiesNames::kFilterSize, &new_table_properties->filter_size},
|
||||
{TablePropertiesNames::kRawKeySize, &new_table_properties->raw_key_size},
|
||||
{TablePropertiesNames::kRawValueSize,
|
||||
&new_table_properties->raw_value_size},
|
||||
{TablePropertiesNames::kNumDataBlocks,
|
||||
&new_table_properties->num_data_blocks},
|
||||
{TablePropertiesNames::kNumEntries, &new_table_properties->num_entries},
|
||||
{TablePropertiesNames::kNumFilterEntries,
|
||||
&new_table_properties->num_filter_entries},
|
||||
{TablePropertiesNames::kDeletedKeys,
|
||||
&new_table_properties->num_deletions},
|
||||
{TablePropertiesNames::kMergeOperands,
|
||||
&new_table_properties->num_merge_operands},
|
||||
{TablePropertiesNames::kNumRangeDeletions,
|
||||
&new_table_properties->num_range_deletions},
|
||||
{TablePropertiesNames::kFormatVersion,
|
||||
&new_table_properties->format_version},
|
||||
{TablePropertiesNames::kFixedKeyLen,
|
||||
&new_table_properties->fixed_key_len},
|
||||
{TablePropertiesNames::kColumnFamilyId,
|
||||
&new_table_properties->column_family_id},
|
||||
{TablePropertiesNames::kCreationTime,
|
||||
&new_table_properties->creation_time},
|
||||
{TablePropertiesNames::kOldestKeyTime,
|
||||
&new_table_properties->oldest_key_time},
|
||||
{TablePropertiesNames::kFileCreationTime,
|
||||
&new_table_properties->file_creation_time},
|
||||
{TablePropertiesNames::kSlowCompressionEstimatedDataSize,
|
||||
&new_table_properties->slow_compression_estimated_data_size},
|
||||
{TablePropertiesNames::kFastCompressionEstimatedDataSize,
|
||||
&new_table_properties->fast_compression_estimated_data_size},
|
||||
{TablePropertiesNames::kTailStartOffset,
|
||||
&new_table_properties->tail_start_offset},
|
||||
{TablePropertiesNames::kUserDefinedTimestampsPersisted,
|
||||
&new_table_properties->user_defined_timestamps_persisted},
|
||||
{TablePropertiesNames::kKeyLargestSeqno,
|
||||
&new_table_properties->key_largest_seqno},
|
||||
};
|
||||
|
||||
std::string last_key;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
s = iter->status();
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
Status s;
|
||||
bool retry = false;
|
||||
while (true) {
|
||||
BlockContents block_contents;
|
||||
size_t len = handle.size() + footer.GetBlockTrailerSize();
|
||||
// If this is an external SST file ingested with write_global_seqno set to
|
||||
// true, then we expect the checksum mismatch because checksum was written
|
||||
// by SstFileWriter, but its global seqno in the properties block may have
|
||||
// been changed during ingestion. For this reason, we initially read
|
||||
// and process without checksum verification, then later try checksum
|
||||
// verification so that if it fails, we can copy to a temporary buffer with
|
||||
// global seqno set to its original value, i.e. 0, and attempt checksum
|
||||
// verification again.
|
||||
if (!retry) {
|
||||
ReadOptions modified_ro = ro;
|
||||
modified_ro.verify_checksums = false;
|
||||
BlockFetcher block_fetcher(
|
||||
file, prefetch_buffer, footer, modified_ro, handle, &block_contents,
|
||||
ioptions, false /* decompress */, false /*maybe_compressed*/,
|
||||
BlockType::kProperties, UncompressionDict::GetEmptyDict(),
|
||||
PersistentCacheOptions::kEmpty, memory_allocator);
|
||||
s = block_fetcher.ReadBlockContents();
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
assert(block_fetcher.GetBlockSizeWithTrailer() == len);
|
||||
TEST_SYNC_POINT_CALLBACK("ReadTablePropertiesHelper:0",
|
||||
&block_contents.data);
|
||||
} else {
|
||||
assert(s.IsCorruption());
|
||||
// If retrying, use a stronger file system read to check and correct
|
||||
// data corruption
|
||||
IOOptions opts;
|
||||
if (PrepareIOFromReadOptions(ro, ioptions.clock, opts) !=
|
||||
IOStatus::OK()) {
|
||||
return s;
|
||||
}
|
||||
opts.verify_and_reconstruct_read = true;
|
||||
std::unique_ptr<char[]> data(new char[len]);
|
||||
Slice result;
|
||||
IOStatus io_s =
|
||||
file->Read(opts, handle.offset(), len, &result, data.get(), nullptr);
|
||||
RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_COUNT);
|
||||
if (!io_s.ok()) {
|
||||
ROCKS_LOG_INFO(ioptions.info_log,
|
||||
"Reading properties block failed - %s",
|
||||
io_s.ToString().c_str());
|
||||
// Return the original corruption error as that's more serious
|
||||
return s;
|
||||
}
|
||||
if (result.size() < len) {
|
||||
return Status::Corruption("Reading properties block failed - " +
|
||||
std::to_string(result.size()) +
|
||||
" bytes read");
|
||||
}
|
||||
RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
|
||||
block_contents = BlockContents(std::move(data), handle.size());
|
||||
}
|
||||
|
||||
auto key = iter->key().ToString();
|
||||
// properties block should be strictly sorted with no duplicate key.
|
||||
if (!last_key.empty() &&
|
||||
BytewiseComparator()->Compare(key, last_key) <= 0) {
|
||||
s = Status::Corruption("properties unsorted");
|
||||
break;
|
||||
}
|
||||
last_key = key;
|
||||
uint64_t block_size = block_contents.data.size();
|
||||
Block properties_block(std::move(block_contents));
|
||||
// Unfortunately, Block::size() might not equal block_contents.data.size(),
|
||||
// and Block hides block_contents
|
||||
std::unique_ptr<MetaBlockIter> iter(properties_block.NewMetaIterator());
|
||||
|
||||
auto raw_val = iter->value();
|
||||
auto pos = predefined_uint64_properties.find(key);
|
||||
std::unique_ptr<TableProperties> new_table_properties{new TableProperties};
|
||||
// All pre-defined properties of type uint64_t
|
||||
std::unordered_map<std::string, uint64_t*> predefined_uint64_properties = {
|
||||
{TablePropertiesNames::kOriginalFileNumber,
|
||||
&new_table_properties->orig_file_number},
|
||||
{TablePropertiesNames::kDataSize, &new_table_properties->data_size},
|
||||
{TablePropertiesNames::kIndexSize, &new_table_properties->index_size},
|
||||
{TablePropertiesNames::kIndexPartitions,
|
||||
&new_table_properties->index_partitions},
|
||||
{TablePropertiesNames::kTopLevelIndexSize,
|
||||
&new_table_properties->top_level_index_size},
|
||||
{TablePropertiesNames::kIndexKeyIsUserKey,
|
||||
&new_table_properties->index_key_is_user_key},
|
||||
{TablePropertiesNames::kIndexValueIsDeltaEncoded,
|
||||
&new_table_properties->index_value_is_delta_encoded},
|
||||
{TablePropertiesNames::kFilterSize, &new_table_properties->filter_size},
|
||||
{TablePropertiesNames::kRawKeySize,
|
||||
&new_table_properties->raw_key_size},
|
||||
{TablePropertiesNames::kRawValueSize,
|
||||
&new_table_properties->raw_value_size},
|
||||
{TablePropertiesNames::kNumDataBlocks,
|
||||
&new_table_properties->num_data_blocks},
|
||||
{TablePropertiesNames::kNumEntries, &new_table_properties->num_entries},
|
||||
{TablePropertiesNames::kNumFilterEntries,
|
||||
&new_table_properties->num_filter_entries},
|
||||
{TablePropertiesNames::kDeletedKeys,
|
||||
&new_table_properties->num_deletions},
|
||||
{TablePropertiesNames::kMergeOperands,
|
||||
&new_table_properties->num_merge_operands},
|
||||
{TablePropertiesNames::kNumRangeDeletions,
|
||||
&new_table_properties->num_range_deletions},
|
||||
{TablePropertiesNames::kFormatVersion,
|
||||
&new_table_properties->format_version},
|
||||
{TablePropertiesNames::kFixedKeyLen,
|
||||
&new_table_properties->fixed_key_len},
|
||||
{TablePropertiesNames::kColumnFamilyId,
|
||||
&new_table_properties->column_family_id},
|
||||
{TablePropertiesNames::kCreationTime,
|
||||
&new_table_properties->creation_time},
|
||||
{TablePropertiesNames::kOldestKeyTime,
|
||||
&new_table_properties->oldest_key_time},
|
||||
{TablePropertiesNames::kFileCreationTime,
|
||||
&new_table_properties->file_creation_time},
|
||||
{TablePropertiesNames::kSlowCompressionEstimatedDataSize,
|
||||
&new_table_properties->slow_compression_estimated_data_size},
|
||||
{TablePropertiesNames::kFastCompressionEstimatedDataSize,
|
||||
&new_table_properties->fast_compression_estimated_data_size},
|
||||
{TablePropertiesNames::kTailStartOffset,
|
||||
&new_table_properties->tail_start_offset},
|
||||
{TablePropertiesNames::kUserDefinedTimestampsPersisted,
|
||||
&new_table_properties->user_defined_timestamps_persisted},
|
||||
{TablePropertiesNames::kKeyLargestSeqno,
|
||||
&new_table_properties->key_largest_seqno},
|
||||
};
|
||||
|
||||
if (key == ExternalSstFilePropertyNames::kGlobalSeqno) {
|
||||
new_table_properties->external_sst_file_global_seqno_offset =
|
||||
handle.offset() + iter->ValueOffset();
|
||||
}
|
||||
std::string last_key;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
s = iter->status();
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pos != predefined_uint64_properties.end()) {
|
||||
if (key == TablePropertiesNames::kDeletedKeys ||
|
||||
key == TablePropertiesNames::kMergeOperands) {
|
||||
// Insert in user-collected properties for API backwards compatibility
|
||||
auto key = iter->key().ToString();
|
||||
// properties block should be strictly sorted with no duplicate key.
|
||||
if (!last_key.empty() &&
|
||||
BytewiseComparator()->Compare(key, last_key) <= 0) {
|
||||
s = Status::Corruption("properties unsorted");
|
||||
break;
|
||||
}
|
||||
last_key = key;
|
||||
|
||||
auto raw_val = iter->value();
|
||||
auto pos = predefined_uint64_properties.find(key);
|
||||
|
||||
if (key == ExternalSstFilePropertyNames::kGlobalSeqno) {
|
||||
new_table_properties->external_sst_file_global_seqno_offset =
|
||||
handle.offset() + iter->ValueOffset();
|
||||
}
|
||||
|
||||
if (pos != predefined_uint64_properties.end()) {
|
||||
if (key == TablePropertiesNames::kDeletedKeys ||
|
||||
key == TablePropertiesNames::kMergeOperands) {
|
||||
// Insert in user-collected properties for API backwards compatibility
|
||||
new_table_properties->user_collected_properties.insert(
|
||||
{key, raw_val.ToString()});
|
||||
}
|
||||
// handle predefined rocksdb properties
|
||||
uint64_t val;
|
||||
if (!GetVarint64(&raw_val, &val)) {
|
||||
// skip malformed value
|
||||
auto error_msg =
|
||||
"Detect malformed value in properties meta-block:"
|
||||
"\tkey: " +
|
||||
key + "\tval: " + raw_val.ToString();
|
||||
ROCKS_LOG_ERROR(ioptions.logger, "%s", error_msg.c_str());
|
||||
continue;
|
||||
}
|
||||
*(pos->second) = val;
|
||||
} else if (key == TablePropertiesNames::kDbId) {
|
||||
new_table_properties->db_id = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kDbSessionId) {
|
||||
new_table_properties->db_session_id = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kDbHostId) {
|
||||
new_table_properties->db_host_id = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kFilterPolicy) {
|
||||
new_table_properties->filter_policy_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kColumnFamilyName) {
|
||||
new_table_properties->column_family_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kComparator) {
|
||||
new_table_properties->comparator_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kMergeOperator) {
|
||||
new_table_properties->merge_operator_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kPrefixExtractorName) {
|
||||
new_table_properties->prefix_extractor_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kPropertyCollectors) {
|
||||
new_table_properties->property_collectors_names = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kCompression) {
|
||||
new_table_properties->compression_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kCompressionOptions) {
|
||||
new_table_properties->compression_options = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kSequenceNumberTimeMapping) {
|
||||
new_table_properties->seqno_to_time_mapping = raw_val.ToString();
|
||||
} else {
|
||||
// handle user-collected properties
|
||||
new_table_properties->user_collected_properties.insert(
|
||||
{key, raw_val.ToString()});
|
||||
}
|
||||
// handle predefined rocksdb properties
|
||||
uint64_t val;
|
||||
if (!GetVarint64(&raw_val, &val)) {
|
||||
// skip malformed value
|
||||
auto error_msg =
|
||||
"Detect malformed value in properties meta-block:"
|
||||
"\tkey: " +
|
||||
key + "\tval: " + raw_val.ToString();
|
||||
ROCKS_LOG_ERROR(ioptions.logger, "%s", error_msg.c_str());
|
||||
continue;
|
||||
}
|
||||
|
||||
// Modified version of BlockFetcher checksum verification
|
||||
// (See write_global_seqno comment above)
|
||||
if (s.ok() && footer.GetBlockTrailerSize() > 0) {
|
||||
s = VerifyBlockChecksum(footer, properties_block.data(), block_size,
|
||||
file->file_name(), handle.offset());
|
||||
if (s.IsCorruption()) {
|
||||
if (new_table_properties->external_sst_file_global_seqno_offset != 0) {
|
||||
std::string tmp_buf(properties_block.data(), len);
|
||||
uint64_t global_seqno_offset =
|
||||
new_table_properties->external_sst_file_global_seqno_offset -
|
||||
handle.offset();
|
||||
EncodeFixed64(&tmp_buf[static_cast<size_t>(global_seqno_offset)], 0);
|
||||
s = VerifyBlockChecksum(footer, tmp_buf.data(), block_size,
|
||||
file->file_name(), handle.offset());
|
||||
}
|
||||
}
|
||||
*(pos->second) = val;
|
||||
} else if (key == TablePropertiesNames::kDbId) {
|
||||
new_table_properties->db_id = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kDbSessionId) {
|
||||
new_table_properties->db_session_id = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kDbHostId) {
|
||||
new_table_properties->db_host_id = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kFilterPolicy) {
|
||||
new_table_properties->filter_policy_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kColumnFamilyName) {
|
||||
new_table_properties->column_family_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kComparator) {
|
||||
new_table_properties->comparator_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kMergeOperator) {
|
||||
new_table_properties->merge_operator_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kPrefixExtractorName) {
|
||||
new_table_properties->prefix_extractor_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kPropertyCollectors) {
|
||||
new_table_properties->property_collectors_names = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kCompression) {
|
||||
new_table_properties->compression_name = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kCompressionOptions) {
|
||||
new_table_properties->compression_options = raw_val.ToString();
|
||||
} else if (key == TablePropertiesNames::kSequenceNumberTimeMapping) {
|
||||
new_table_properties->seqno_to_time_mapping = raw_val.ToString();
|
||||
}
|
||||
|
||||
// If we detected a corruption and the file system supports verification
|
||||
// and reconstruction, retry the read
|
||||
if (s.IsCorruption() && !retry &&
|
||||
CheckFSFeatureSupport(ioptions.fs.get(),
|
||||
FSSupportedOps::kVerifyAndReconstructRead)) {
|
||||
retry = true;
|
||||
} else {
|
||||
// handle user-collected properties
|
||||
new_table_properties->user_collected_properties.insert(
|
||||
{key, raw_val.ToString()});
|
||||
}
|
||||
}
|
||||
|
||||
// Modified version of BlockFetcher checksum verification
|
||||
// (See write_global_seqno comment above)
|
||||
if (s.ok() && footer.GetBlockTrailerSize() > 0) {
|
||||
s = VerifyBlockChecksum(footer, properties_block.data(), block_size,
|
||||
file->file_name(), handle.offset());
|
||||
if (s.IsCorruption()) {
|
||||
if (new_table_properties->external_sst_file_global_seqno_offset != 0) {
|
||||
std::string tmp_buf(properties_block.data(),
|
||||
block_fetcher.GetBlockSizeWithTrailer());
|
||||
uint64_t global_seqno_offset =
|
||||
new_table_properties->external_sst_file_global_seqno_offset -
|
||||
handle.offset();
|
||||
EncodeFixed64(&tmp_buf[static_cast<size_t>(global_seqno_offset)], 0);
|
||||
s = VerifyBlockChecksum(footer, tmp_buf.data(), block_size,
|
||||
file->file_name(), handle.offset());
|
||||
if (s.ok()) {
|
||||
*table_properties = std::move(new_table_properties);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
*table_properties = std::move(new_table_properties);
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue