mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-30 04:41:49 +00:00
2443ebf810
Summary:
**Context/Summary:**
WAL write can continue onto the the WAL file that has encountered error and thus crash at 3f5bd46a07/file/writable_file_writer.cc (L67)
in below scenario:
<img width="698" alt="Screenshot 2024-03-15 at 1 52 45 PM" src="https://github.com/facebook/rocksdb/assets/83968999/cd631ef2-c87c-4926-91ab-a0dc10f1eb14">
Note that GetLiveFilesStorageInfo() can happen concurrently with PUT() for the non-WAL-write part where db lock isn't held
This PR added an error check in LogWriter layer to prevent thread 2 from starting to write WAL after thread 1's write error.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12448
Test Plan:
Step 1 Apply the patch below to simulate frequent WAL write error for the purpose of repro
```
diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc
index b47fa89e6..31930e976 100644
--- a/db_stress_tool/db_stress_driver.cc
+++ b/db_stress_tool/db_stress_driver.cc
@@ -98,7 +98,7 @@ bool RunStressTestImpl(SharedState* shared) {
// MANIFEST, CURRENT, and WAL files.
fault_fs_guard->SetRandomWriteError(
shared->GetSeed(), FLAGS_write_fault_one_in, error_msg,
- /*inject_for_all_file_types=*/false, {FileType::kTableFile});
+ /*inject_for_all_file_types=*/false, {FileType::kWalFile});
fault_fs_guard->SetFilesystemDirectWritable(false);
fault_fs_guard->EnableWriteErrorInjection();
}
diff --git a/utilities/fault_injection_fs.cc b/utilities/fault_injection_fs.cc
index 0ffb43ea6..589912cf4 100644
--- a/utilities/fault_injection_fs.cc
+++ b/utilities/fault_injection_fs.cc
@@ -1042,7 +1042,7 @@ IOStatus FaultInjectionTestFS::InjectWriteError(const std::string& file_name) {
}
if (allowed_type) {
- if (write_error_rand_.OneIn(write_error_one_in_)) {
+ if (write_error_rand_.OneIn(1)) {
return GetError();
}
}
```
Step 2 Run below
```
./db_stress --WAL_size_limit_MB=1 --WAL_ttl_seconds=60 --acquire_snapshot_one_in=100 --adaptive_readahead=1 --advise_random_on_open=1 --allow_concurrent_memtable_write=1 --allow_data_in_errors=True --allow_fallocate=1 --async_io=1 --auto_readahead_size=0 --avoid_flush_during_recovery=0 --avoid_flush_during_shutdown=0 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=1000 --batch_protection_bytes_per_key=8 --bgerror_resume_retry_interval=1000000 --block_protection_bytes_per_key=8 --block_size=16384 --bloom_before_level=2147483646 --bloom_bits=41.19540459544058 --bottommost_compression_type=disable --bottommost_file_compaction_delay=3600 --bytes_per_sync=0 --cache_index_and_filter_blocks=1 --cache_index_and_filter_blocks_with_high_priority=1 --cache_size=33554432 --cache_type=fixed_hyper_clock_cache --charge_compression_dictionary_building_buffer=0 --charge_file_metadata=0 --charge_filter_construction=0 --charge_table_reader=1 --checkpoint_one_in=1000000 --checksum_type=kCRC32c --clear_column_family_one_in=0 --column_families=1 --compact_files_one_in=1000000 --compact_range_one_in=1000 --compaction_pri=0 --compaction_readahead_size=1048576 --compaction_ttl=0 --compress_format_version=1 --compressed_secondary_cache_size=8388608 --compression_checksum=1 --compression_max_dict_buffer_bytes=68719476735 --compression_max_dict_bytes=16384 --compression_parallel_threads=1 --compression_type=zlib --compression_use_zstd_dict_trainer=0 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --data_block_index_type=0 --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --db_write_buffer_size=1048576 --delete_obsolete_files_period_micros=30000000 --delpercent=4 --delrangepercent=1 --destroy_db_initially=0 --detect_filter_construct_corruption=1 --disable_wal=0 --dump_malloc_stats=0 --enable_checksum_handoff=1 --enable_compaction_filter=0 --enable_index_compression=1 --enable_pipelined_write=1 --enable_thread_tracking=1 --enable_write_thread_adaptive_yield=0 --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --fail_if_options_file_error=0 --fifo_allow_compaction=1 --file_checksum_impl=big --fill_cache=1 --flush_one_in=1000 --format_version=6 --get_current_wal_file_one_in=0 --get_live_files_one_in=10000 --get_property_one_in=100000 --get_sorted_wal_files_one_in=0 --hard_pending_compaction_bytes_limit=274877906944 --high_pri_pool_ratio=0.5 --index_block_restart_interval=15 --index_shortening=2 --index_type=0 --ingest_external_file_one_in=0 --initial_auto_readahead_size=524288 --iterpercent=10 --key_len_percent_dist=1,30,69 --kill_random_test=888887 --level_compaction_dynamic_level_bytes=1 --lock_wal_one_in=10000 --log2_keys_per_lock=10 --log_file_time_to_roll=0 --log_readahead_size=16777216 --long_running_snapshots=0 --low_pri_pool_ratio=0.5 --manifest_preallocation_size=5120 --manual_wal_flush_one_in=1000 --mark_for_compaction_one_file_in=0 --max_auto_readahead_size=524288 --max_background_compactions=1 --max_bytes_for_level_base=67108864 --max_key=100000 --max_key_len=3 --max_log_file_size=1048576 --max_manifest_file_size=1073741824 --max_total_wal_size=0 --max_write_batch_group_size_bytes=64 --max_write_buffer_number=10 --max_write_buffer_size_to_maintain=1048576 --memtable_insert_hint_per_batch=1 --memtable_max_range_deletions=0 --memtable_prefix_bloom_size_ratio=0.5 --memtable_protection_bytes_per_key=8 --memtable_whole_key_filtering=0 --memtablerep=skip_list --metadata_charge_policy=0 --min_write_buffer_number_to_merge=2 --mmap_read=0 --mock_direct_io=True --nooverwritepercent=1 --num_file_reads_for_auto_readahead=2 --open_files=500000 --open_metadata_write_fault_one_in=8 --open_read_fault_one_in=32 --open_write_fault_one_in=0 --ops_per_thread=20000000 --optimize_filters_for_hits=1 --optimize_filters_for_memory=1 --optimize_multiget_for_io=0 --paranoid_file_checks=1 --partition_filters=0 --partition_pinning=3 --pause_background_one_in=10000 --periodic_compaction_seconds=0 --prefix_size=5 --prefixpercent=5 --prepopulate_block_cache=0 --preserve_internal_time_seconds=3600 --progress_reports=0 --read_amp_bytes_per_bit=0 --read_fault_one_in=1000 --readahead_size=16384 --readpercent=45 --recycle_log_file_num=0 --reopen=20 --report_bg_io_stats=0 --sample_for_compression=5 --secondary_cache_fault_one_in=32 --secondary_cache_uri= --skip_stats_update_on_db_open=1 --snapshot_hold_ops=100000 --soft_pending_compaction_bytes_limit=68719476736 --sst_file_manager_bytes_per_sec=0 --sst_file_manager_bytes_per_truncate=0 --stats_dump_period_sec=600 --stats_history_buffer_size=0 --strict_bytes_per_sync=0 --subcompactions=4 --sync=0 --sync_fault_injection=1 --table_cache_numshardbits=-1 --target_file_size_base=16777216 --target_file_size_multiplier=1 --test_batches_snapshots=0 --top_level_index_pinning=1 --unpartitioned_pinning=1 --use_adaptive_mutex=1 --use_adaptive_mutex_lru=1 --use_delta_encoding=1 --use_direct_io_for_flush_and_compaction=1 --use_direct_reads=1 --use_full_merge_v1=0 --use_get_entity=0 --use_merge=1 --use_multi_get_entity=0 --use_multiget=0 --use_put_entity_one_in=1 --use_write_buffer_manager=1 --user_timestamp_size=0 --value_size_mult=32 --verification_only=0 --verify_checksum=1 --verify_checksum_one_in=1000000 --verify_compression=1 --verify_db_one_in=100000 --verify_file_checksums_one_in=1000000 --verify_iterator_with_expected_state_one_in=5 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=none --write_buffer_size=33554432 --write_dbid_to_manifest=1 --write_fault_one_in=1000 --writepercent=35
```
Pre-PR:
```
db_stress: ./file/writable_file_writer.h:309: rocksdb::IOStatus rocksdb::WritableFileWriter::AssertFalseAndGetStatusForPrevError(): Assertion `sync_without_flush_called_' failed.
```
Post-PR
```
2024/03/15-13:44:08 Starting database operations
put or merge error: IO error: Retryable injected write error
```
Note: The patch is NOT included in the PR as we first need to figure out how to handle this type of failed write in stress test (planned for the near future). It's sufficient to show the stress test does not crash as pre-PR for the purpose of this PR.
Reviewed By: ajkr
Differential Revision: D54969287
Pulled By: hx235
fbshipit-source-id: 0ba4eabfec44ea7656d4d7117836f388897562f2
307 lines
10 KiB
C++
307 lines
10 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "db/log_writer.h"
|
|
|
|
#include <cstdint>
|
|
|
|
#include "file/writable_file_writer.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/io_status.h"
|
|
#include "util/coding.h"
|
|
#include "util/crc32c.h"
|
|
#include "util/udt_util.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE::log {
|
|
|
|
Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
|
|
bool recycle_log_files, bool manual_flush,
|
|
CompressionType compression_type)
|
|
: dest_(std::move(dest)),
|
|
block_offset_(0),
|
|
log_number_(log_number),
|
|
recycle_log_files_(recycle_log_files),
|
|
manual_flush_(manual_flush),
|
|
compression_type_(compression_type),
|
|
compress_(nullptr) {
|
|
for (int i = 0; i <= kMaxRecordType; i++) {
|
|
char t = static_cast<char>(i);
|
|
type_crc_[i] = crc32c::Value(&t, 1);
|
|
}
|
|
}
|
|
|
|
Writer::~Writer() {
|
|
ThreadStatus::OperationType cur_op_type =
|
|
ThreadStatusUtil::GetThreadOperation();
|
|
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_UNKNOWN);
|
|
if (dest_) {
|
|
WriteBuffer(WriteOptions()).PermitUncheckedError();
|
|
}
|
|
if (compress_) {
|
|
delete compress_;
|
|
}
|
|
ThreadStatusUtil::SetThreadOperation(cur_op_type);
|
|
}
|
|
|
|
IOStatus Writer::WriteBuffer(const WriteOptions& write_options) {
|
|
if (dest_->seen_error()) {
|
|
return IOStatus::IOError("Seen error. Skip writing buffer.");
|
|
}
|
|
IOOptions opts;
|
|
IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
return dest_->Flush(opts);
|
|
}
|
|
|
|
IOStatus Writer::Close(const WriteOptions& write_options) {
|
|
IOStatus s;
|
|
IOOptions opts;
|
|
s = WritableFileWriter::PrepareIOOptions(write_options, opts);
|
|
if (s.ok() && dest_) {
|
|
s = dest_->Close(opts);
|
|
dest_.reset();
|
|
}
|
|
return s;
|
|
}
|
|
|
|
IOStatus Writer::AddRecord(const WriteOptions& write_options,
|
|
const Slice& slice) {
|
|
if (dest_->seen_error()) {
|
|
return IOStatus::IOError("Seen error. Skip writing buffer.");
|
|
}
|
|
const char* ptr = slice.data();
|
|
size_t left = slice.size();
|
|
|
|
// Header size varies depending on whether we are recycling or not.
|
|
const int header_size =
|
|
recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize;
|
|
|
|
// Fragment the record if necessary and emit it. Note that if slice
|
|
// is empty, we still want to iterate once to emit a single
|
|
// zero-length record
|
|
bool begin = true;
|
|
int compress_remaining = 0;
|
|
bool compress_start = false;
|
|
if (compress_) {
|
|
compress_->Reset();
|
|
compress_start = true;
|
|
}
|
|
|
|
IOStatus s;
|
|
IOOptions opts;
|
|
s = WritableFileWriter::PrepareIOOptions(write_options, opts);
|
|
if (s.ok()) {
|
|
do {
|
|
const int64_t leftover = kBlockSize - block_offset_;
|
|
assert(leftover >= 0);
|
|
if (leftover < header_size) {
|
|
// Switch to a new block
|
|
if (leftover > 0) {
|
|
// Fill the trailer (literal below relies on kHeaderSize and
|
|
// kRecyclableHeaderSize being <= 11)
|
|
assert(header_size <= 11);
|
|
s = dest_->Append(opts,
|
|
Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
|
|
static_cast<size_t>(leftover)),
|
|
0 /* crc32c_checksum */);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
}
|
|
block_offset_ = 0;
|
|
}
|
|
|
|
// Invariant: we never leave < header_size bytes in a block.
|
|
assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size);
|
|
|
|
const size_t avail = kBlockSize - block_offset_ - header_size;
|
|
|
|
// Compress the record if compression is enabled.
|
|
// Compress() is called at least once (compress_start=true) and after the
|
|
// previous generated compressed chunk is written out as one or more
|
|
// physical records (left=0).
|
|
if (compress_ && (compress_start || left == 0)) {
|
|
compress_remaining = compress_->Compress(
|
|
slice.data(), slice.size(), compressed_buffer_.get(), &left);
|
|
|
|
if (compress_remaining < 0) {
|
|
// Set failure status
|
|
s = IOStatus::IOError("Unexpected WAL compression error");
|
|
s.SetDataLoss(true);
|
|
break;
|
|
} else if (left == 0) {
|
|
// Nothing left to compress
|
|
if (!compress_start) {
|
|
break;
|
|
}
|
|
}
|
|
compress_start = false;
|
|
ptr = compressed_buffer_.get();
|
|
}
|
|
|
|
const size_t fragment_length = (left < avail) ? left : avail;
|
|
|
|
RecordType type;
|
|
const bool end = (left == fragment_length && compress_remaining == 0);
|
|
if (begin && end) {
|
|
type = recycle_log_files_ ? kRecyclableFullType : kFullType;
|
|
} else if (begin) {
|
|
type = recycle_log_files_ ? kRecyclableFirstType : kFirstType;
|
|
} else if (end) {
|
|
type = recycle_log_files_ ? kRecyclableLastType : kLastType;
|
|
} else {
|
|
type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType;
|
|
}
|
|
|
|
s = EmitPhysicalRecord(write_options, type, ptr, fragment_length);
|
|
ptr += fragment_length;
|
|
left -= fragment_length;
|
|
begin = false;
|
|
} while (s.ok() && (left > 0 || compress_remaining > 0));
|
|
}
|
|
if (s.ok()) {
|
|
if (!manual_flush_) {
|
|
s = dest_->Flush(opts);
|
|
}
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) {
|
|
// Should be the first record
|
|
assert(block_offset_ == 0);
|
|
|
|
if (compression_type_ == kNoCompression) {
|
|
// No need to add a record
|
|
return IOStatus::OK();
|
|
}
|
|
|
|
if (dest_->seen_error()) {
|
|
return IOStatus::IOError("Seen error. Skip writing buffer.");
|
|
}
|
|
|
|
CompressionTypeRecord record(compression_type_);
|
|
std::string encode;
|
|
record.EncodeTo(&encode);
|
|
IOStatus s = EmitPhysicalRecord(write_options, kSetCompressionType,
|
|
encode.data(), encode.size());
|
|
if (s.ok()) {
|
|
if (!manual_flush_) {
|
|
IOOptions io_opts;
|
|
s = WritableFileWriter::PrepareIOOptions(write_options, io_opts);
|
|
if (s.ok()) {
|
|
s = dest_->Flush(io_opts);
|
|
}
|
|
}
|
|
// Initialize fields required for compression
|
|
const size_t max_output_buffer_len =
|
|
kBlockSize - (recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize);
|
|
CompressionOptions opts;
|
|
constexpr uint32_t compression_format_version = 2;
|
|
compress_ = StreamingCompress::Create(compression_type_, opts,
|
|
compression_format_version,
|
|
max_output_buffer_len);
|
|
assert(compress_ != nullptr);
|
|
compressed_buffer_ =
|
|
std::unique_ptr<char[]>(new char[max_output_buffer_len]);
|
|
assert(compressed_buffer_);
|
|
} else {
|
|
// Disable compression if the record could not be added.
|
|
compression_type_ = kNoCompression;
|
|
}
|
|
return s;
|
|
}
|
|
|
|
IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord(
|
|
const WriteOptions& write_options,
|
|
const UnorderedMap<uint32_t, size_t>& cf_to_ts_sz) {
|
|
std::vector<std::pair<uint32_t, size_t>> ts_sz_to_record;
|
|
for (const auto& [cf_id, ts_sz] : cf_to_ts_sz) {
|
|
if (recorded_cf_to_ts_sz_.count(cf_id) != 0) {
|
|
// A column family's user-defined timestamp size should not be
|
|
// updated while DB is running.
|
|
assert(recorded_cf_to_ts_sz_[cf_id] == ts_sz);
|
|
} else if (ts_sz != 0) {
|
|
ts_sz_to_record.emplace_back(cf_id, ts_sz);
|
|
recorded_cf_to_ts_sz_.insert(std::make_pair(cf_id, ts_sz));
|
|
}
|
|
}
|
|
if (ts_sz_to_record.empty()) {
|
|
return IOStatus::OK();
|
|
}
|
|
|
|
UserDefinedTimestampSizeRecord record(std::move(ts_sz_to_record));
|
|
std::string encoded;
|
|
record.EncodeTo(&encoded);
|
|
RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType
|
|
: kUserDefinedTimestampSizeType;
|
|
return EmitPhysicalRecord(write_options, type, encoded.data(),
|
|
encoded.size());
|
|
}
|
|
|
|
bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); }
|
|
|
|
IOStatus Writer::EmitPhysicalRecord(const WriteOptions& write_options,
|
|
RecordType t, const char* ptr, size_t n) {
|
|
assert(n <= 0xffff); // Must fit in two bytes
|
|
|
|
size_t header_size;
|
|
char buf[kRecyclableHeaderSize];
|
|
|
|
// Format the header
|
|
buf[4] = static_cast<char>(n & 0xff);
|
|
buf[5] = static_cast<char>(n >> 8);
|
|
buf[6] = static_cast<char>(t);
|
|
|
|
uint32_t crc = type_crc_[t];
|
|
if (t < kRecyclableFullType || t == kSetCompressionType ||
|
|
t == kUserDefinedTimestampSizeType) {
|
|
// Legacy record format
|
|
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
|
|
header_size = kHeaderSize;
|
|
} else {
|
|
// Recyclable record format
|
|
assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize);
|
|
header_size = kRecyclableHeaderSize;
|
|
|
|
// Only encode low 32-bits of the 64-bit log number. This means
|
|
// we will fail to detect an old record if we recycled a log from
|
|
// ~4 billion logs ago, but that is effectively impossible, and
|
|
// even if it were we'dbe far more likely to see a false positive
|
|
// on the 32-bit CRC.
|
|
EncodeFixed32(buf + 7, static_cast<uint32_t>(log_number_));
|
|
crc = crc32c::Extend(crc, buf + 7, 4);
|
|
}
|
|
|
|
// Compute the crc of the record type and the payload.
|
|
uint32_t payload_crc = crc32c::Value(ptr, n);
|
|
crc = crc32c::Crc32cCombine(crc, payload_crc, n);
|
|
crc = crc32c::Mask(crc); // Adjust for storage
|
|
TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum",
|
|
&crc);
|
|
EncodeFixed32(buf, crc);
|
|
|
|
// Write the header and the payload
|
|
IOOptions opts;
|
|
IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts);
|
|
if (s.ok()) {
|
|
s = dest_->Append(opts, Slice(buf, header_size), 0 /* crc32c_checksum */);
|
|
}
|
|
if (s.ok()) {
|
|
s = dest_->Append(opts, Slice(ptr, n), payload_crc);
|
|
}
|
|
block_offset_ += header_size + n;
|
|
return s;
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE::log
|