mirror of https://github.com/facebook/rocksdb.git
Fix block checksum for >=4GB, refactor (#6978)
Summary: Although RocksDB falls over in various other ways with KVs around 4GB or more, this change fixes how XXH32 and XXH64 were being called by the block checksum code to support >= 4GB in case that should ever happen, or the code copied for other uses. This change is not a schema compatibility issue because the checksum verification code would checksum the first (block_size + 1) mod 2^32 bytes while the checksum construction code would checksum the first block_size mod 2^32 plus the compression type byte, meaning the XXH32/64 checksums for >=4GB block would not match about 255/256 times. While touching this code, I refactored to consolidate redundant implementations, improving diagnostics and performance tracking in some cases. Also used less confusing language in those diagnostics. Makes https://github.com/facebook/rocksdb/issues/6875 obsolete. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6978 Test Plan: I was able to write a test for this using an SST file writer and VerifyChecksum in a reader. The test fails before the fix, though I'm leaving the test disabled because I don't think it's worth the expense of running regularly. Reviewed By: gg814 Differential Revision: D22143260 Pulled By: pdillinger fbshipit-source-id: 982993d16134e8c50bea2269047f901c1783726e
This commit is contained in:
parent
d76eed4839
commit
25a0d0ca30
|
@ -12,6 +12,7 @@
|
|||
#include "file/filename.h"
|
||||
#include "port/port.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/sst_file_reader.h"
|
||||
#include "rocksdb/sst_file_writer.h"
|
||||
#include "test_util/fault_injection_test_env.h"
|
||||
#include "test_util/testutil.h"
|
||||
|
@ -2393,6 +2394,41 @@ TEST_F(ExternalSSTFileTest, IngestFileWrittenWithCompressionDictionary) {
|
|||
ASSERT_EQ(1, num_compression_dicts);
|
||||
}
|
||||
|
||||
// Very slow, not worth the cost to run regularly
|
||||
TEST_F(ExternalSSTFileTest, DISABLED_HugeBlockChecksum) {
|
||||
int max_checksum = static_cast<int>(kxxHash64);
|
||||
for (int i = 0; i <= max_checksum; ++i) {
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.checksum = static_cast<ChecksumType>(i);
|
||||
Options options = CurrentOptions();
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options);
|
||||
|
||||
// 2^32 - 1, will lead to data block with more than 2^32 bytes
|
||||
size_t huge_size = port::kMaxUint32;
|
||||
|
||||
std::string f = sst_files_dir_ + "f.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(f));
|
||||
{
|
||||
Random64 r(123);
|
||||
std::string huge(huge_size, 0);
|
||||
for (size_t j = 0; j + 7 < huge_size; j += 8) {
|
||||
EncodeFixed64(&huge[j], r.Next());
|
||||
}
|
||||
ASSERT_OK(sst_file_writer.Put("Huge", huge));
|
||||
}
|
||||
|
||||
ExternalSstFileInfo f_info;
|
||||
ASSERT_OK(sst_file_writer.Finish(&f_info));
|
||||
ASSERT_GT(f_info.file_size, uint64_t{huge_size} + 10);
|
||||
|
||||
SstFileReader sst_file_reader(options);
|
||||
ASSERT_OK(sst_file_reader.Open(f));
|
||||
ASSERT_OK(sst_file_reader.VerifyChecksum());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
||||
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
|
||||
new FaultInjectionTestEnv(env_));
|
||||
|
|
|
@ -132,7 +132,7 @@ class RandomAccessFileReader {
|
|||
|
||||
FSRandomAccessFile* file() { return file_.get(); }
|
||||
|
||||
std::string file_name() const { return file_name_; }
|
||||
const std::string& file_name() const { return file_name_; }
|
||||
|
||||
bool use_direct_io() const { return file_->use_direct_io(); }
|
||||
|
||||
|
|
|
@ -1093,42 +1093,43 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
|
|||
if (io_s.ok()) {
|
||||
char trailer[kBlockTrailerSize];
|
||||
trailer[0] = type;
|
||||
char* trailer_without_type = trailer + 1;
|
||||
uint32_t checksum = 0;
|
||||
switch (r->table_options.checksum) {
|
||||
case kNoChecksum:
|
||||
EncodeFixed32(trailer_without_type, 0);
|
||||
break;
|
||||
case kCRC32c: {
|
||||
auto crc = crc32c::Value(block_contents.data(), block_contents.size());
|
||||
crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type
|
||||
EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
|
||||
uint32_t crc =
|
||||
crc32c::Value(block_contents.data(), block_contents.size());
|
||||
// Extend to cover compression type
|
||||
crc = crc32c::Extend(crc, trailer, 1);
|
||||
checksum = crc32c::Mask(crc);
|
||||
break;
|
||||
}
|
||||
case kxxHash: {
|
||||
XXH32_state_t* const state = XXH32_createState();
|
||||
XXH32_reset(state, 0);
|
||||
XXH32_update(state, block_contents.data(),
|
||||
static_cast<uint32_t>(block_contents.size()));
|
||||
XXH32_update(state, trailer, 1); // Extend to cover block type
|
||||
EncodeFixed32(trailer_without_type, XXH32_digest(state));
|
||||
XXH32_update(state, block_contents.data(), block_contents.size());
|
||||
// Extend to cover compression type
|
||||
XXH32_update(state, trailer, 1);
|
||||
checksum = XXH32_digest(state);
|
||||
XXH32_freeState(state);
|
||||
break;
|
||||
}
|
||||
case kxxHash64: {
|
||||
XXH64_state_t* const state = XXH64_createState();
|
||||
XXH64_reset(state, 0);
|
||||
XXH64_update(state, block_contents.data(),
|
||||
static_cast<uint32_t>(block_contents.size()));
|
||||
XXH64_update(state, trailer, 1); // Extend to cover block type
|
||||
EncodeFixed32(
|
||||
trailer_without_type,
|
||||
static_cast<uint32_t>(XXH64_digest(state) & // lower 32 bits
|
||||
uint64_t{0xffffffff}));
|
||||
XXH64_update(state, block_contents.data(), block_contents.size());
|
||||
// Extend to cover compression type
|
||||
XXH64_update(state, trailer, 1);
|
||||
checksum = Lower32of64(XXH64_digest(state));
|
||||
XXH64_freeState(state);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
assert(false);
|
||||
break;
|
||||
}
|
||||
|
||||
EncodeFixed32(trailer + 1, checksum);
|
||||
assert(io_s.ok());
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
|
||||
|
|
|
@ -776,9 +776,9 @@ Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno(
|
|||
EncodeFixed64(
|
||||
tmp_buf.get() + global_seqno_offset - props_block_handle.offset(), 0);
|
||||
}
|
||||
uint32_t value = DecodeFixed32(tmp_buf.get() + block_size + 1);
|
||||
s = ROCKSDB_NAMESPACE::VerifyChecksum(rep_->footer.checksum(),
|
||||
tmp_buf.get(), block_size + 1, value);
|
||||
s = ROCKSDB_NAMESPACE::VerifyBlockChecksum(
|
||||
rep_->footer.checksum(), tmp_buf.get(), block_size,
|
||||
rep_->file->file_name(), props_block_handle.offset());
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -1728,15 +1728,14 @@ void BlockBasedTable::RetrieveMultipleBlocks(
|
|||
if (options.verify_checksums) {
|
||||
PERF_TIMER_GUARD(block_checksum_time);
|
||||
const char* data = req.result.data();
|
||||
uint32_t expected =
|
||||
DecodeFixed32(data + req_offset + handle.size() + 1);
|
||||
// Since the scratch might be shared. the offset of the data block in
|
||||
// Since the scratch might be shared, the offset of the data block in
|
||||
// the buffer might not be 0. req.result.data() only point to the
|
||||
// begin address of each read request, we need to add the offset
|
||||
// in each read request. Checksum is stored in the block trailer,
|
||||
// which is handle.size() + 1.
|
||||
s = ROCKSDB_NAMESPACE::VerifyChecksum(
|
||||
footer.checksum(), data + req_offset, handle.size() + 1, expected);
|
||||
// beyond the payload size.
|
||||
s = ROCKSDB_NAMESPACE::VerifyBlockChecksum(
|
||||
footer.checksum(), data + req_offset, handle.size(),
|
||||
rep_->file->file_name(), handle.offset());
|
||||
TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
|
||||
}
|
||||
} else if (!use_shared_buffer) {
|
||||
|
|
|
@ -8,7 +8,11 @@
|
|||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
#include "table/block_based/reader_common.h"
|
||||
|
||||
#include "monitoring/perf_context_imp.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/crc32c.h"
|
||||
#include "util/hash.h"
|
||||
#include "util/string_util.h"
|
||||
#include "util/xxhash.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
@ -18,29 +22,42 @@ void ForceReleaseCachedEntry(void* arg, void* h) {
|
|||
cache->Release(handle, true /* force_erase */);
|
||||
}
|
||||
|
||||
Status VerifyChecksum(const ChecksumType type, const char* buf, size_t len,
|
||||
uint32_t expected) {
|
||||
Status VerifyBlockChecksum(ChecksumType type, const char* data,
|
||||
size_t block_size, const std::string& file_name,
|
||||
uint64_t offset) {
|
||||
PERF_TIMER_GUARD(block_checksum_time);
|
||||
// After block_size bytes is compression type (1 byte), which is part of
|
||||
// the checksummed section.
|
||||
size_t len = block_size + 1;
|
||||
// And then the stored checksum value (4 bytes).
|
||||
uint32_t stored = DecodeFixed32(data + len);
|
||||
|
||||
Status s;
|
||||
uint32_t actual = 0;
|
||||
uint32_t computed = 0;
|
||||
switch (type) {
|
||||
case kNoChecksum:
|
||||
break;
|
||||
case kCRC32c:
|
||||
expected = crc32c::Unmask(expected);
|
||||
actual = crc32c::Value(buf, len);
|
||||
stored = crc32c::Unmask(stored);
|
||||
computed = crc32c::Value(data, len);
|
||||
break;
|
||||
case kxxHash:
|
||||
actual = XXH32(buf, static_cast<int>(len), 0);
|
||||
computed = XXH32(data, len, 0);
|
||||
break;
|
||||
case kxxHash64:
|
||||
actual = static_cast<uint32_t>(XXH64(buf, static_cast<int>(len), 0) &
|
||||
uint64_t{0xffffffff});
|
||||
computed = Lower32of64(XXH64(data, len, 0));
|
||||
break;
|
||||
default:
|
||||
s = Status::Corruption("unknown checksum type");
|
||||
s = Status::Corruption(
|
||||
"unknown checksum type " + ToString(type) + " from footer of " +
|
||||
file_name + ", while checking block at offset " + ToString(offset) +
|
||||
" size " + ToString(block_size));
|
||||
}
|
||||
if (s.ok() && actual != expected) {
|
||||
s = Status::Corruption("properties block checksum mismatched");
|
||||
if (s.ok() && stored != computed) {
|
||||
s = Status::Corruption(
|
||||
"block checksum mismatch: stored = " + ToString(stored) +
|
||||
", computed = " + ToString(computed) + " in " + file_name +
|
||||
" offset " + ToString(offset) + " size " + ToString(block_size));
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,10 @@ inline MemoryAllocator* GetMemoryAllocatorForCompressedBlock(
|
|||
: nullptr;
|
||||
}
|
||||
|
||||
extern Status VerifyChecksum(const ChecksumType type, const char* buf,
|
||||
size_t len, uint32_t expected);
|
||||
// Assumes block has a trailer as in format.h. file_name and offset provided
|
||||
// for generating a diagnostic message in returned status.
|
||||
extern Status VerifyBlockChecksum(ChecksumType type, const char* data,
|
||||
size_t block_size,
|
||||
const std::string& file_name,
|
||||
uint64_t offset);
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
|
|
@ -19,51 +19,20 @@
|
|||
#include "rocksdb/env.h"
|
||||
#include "table/block_based/block.h"
|
||||
#include "table/block_based/block_based_table_reader.h"
|
||||
#include "table/block_based/reader_common.h"
|
||||
#include "table/format.h"
|
||||
#include "table/persistent_cache_helper.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/compression.h"
|
||||
#include "util/crc32c.h"
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/string_util.h"
|
||||
#include "util/xxhash.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
inline void BlockFetcher::CheckBlockChecksum() {
|
||||
// Check the crc of the type and the block contents
|
||||
if (read_options_.verify_checksums) {
|
||||
const char* data = slice_.data(); // Pointer to where Read put the data
|
||||
PERF_TIMER_GUARD(block_checksum_time);
|
||||
uint32_t value = DecodeFixed32(data + block_size_ + 1);
|
||||
uint32_t actual = 0;
|
||||
switch (footer_.checksum()) {
|
||||
case kNoChecksum:
|
||||
break;
|
||||
case kCRC32c:
|
||||
value = crc32c::Unmask(value);
|
||||
actual = crc32c::Value(data, block_size_ + 1);
|
||||
break;
|
||||
case kxxHash:
|
||||
actual = XXH32(data, static_cast<int>(block_size_) + 1, 0);
|
||||
break;
|
||||
case kxxHash64:
|
||||
actual = static_cast<uint32_t>(
|
||||
XXH64(data, static_cast<int>(block_size_) + 1, 0) &
|
||||
uint64_t{0xffffffff});
|
||||
break;
|
||||
default:
|
||||
status_ = Status::Corruption(
|
||||
"unknown checksum type " + ToString(footer_.checksum()) + " in " +
|
||||
file_->file_name() + " offset " + ToString(handle_.offset()) +
|
||||
" size " + ToString(block_size_));
|
||||
}
|
||||
if (status_.ok() && actual != value) {
|
||||
status_ = Status::Corruption(
|
||||
"block checksum mismatch: expected " + ToString(actual) + ", got " +
|
||||
ToString(value) + " in " + file_->file_name() + " offset " +
|
||||
ToString(handle_.offset()) + " size " + ToString(block_size_));
|
||||
}
|
||||
status_ = ROCKSDB_NAMESPACE::VerifyBlockChecksum(
|
||||
footer_.checksum(), slice_.data(), block_size_, file_->file_name(),
|
||||
handle_.offset());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -220,7 +220,7 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
|
|||
uint64_t file_size, Footer* footer,
|
||||
uint64_t enforce_table_magic_number = 0);
|
||||
|
||||
// 1-byte type + 32-bit crc
|
||||
// 1-byte compression type + 32-bit checksum
|
||||
static const size_t kBlockTrailerSize = 5;
|
||||
|
||||
// Make block size calculation for IO less error prone
|
||||
|
|
Loading…
Reference in New Issue