rocksdb/file/writable_file_writer.cc
Hui Xiao d7b938882e Sync WAL during db Close() (#12556)
Summary:
**Context/Summary:**
Below crash test found out we don't sync WAL upon DB close, which can lead to unsynced data loss. This PR syncs it.
```
./db_stress --threads=1 --disable_auto_compactions=1 --WAL_size_limit_MB=0 --WAL_ttl_seconds=0 --acquire_snapshot_one_in=0 --adaptive_readahead=0 --adm_policy=1 --advise_random_on_open=1 --allow_concurrent_memtable_write=1 --allow_data_in_errors=True --allow_fallocate=0 --async_io=0 --auto_readahead_size=0 --avoid_flush_during_recovery=1 --avoid_flush_during_shutdown=0 --avoid_unnecessary_blocking_io=1 --backup_max_size=104857600 --backup_one_in=0 --batch_protection_bytes_per_key=0 --bgerror_resume_retry_interval=1000000 --block_align=0 --block_protection_bytes_per_key=2 --block_size=16384 --bloom_before_level=1 --bloom_bits=29.895303579352174 --bottommost_compression_type=disable --bottommost_file_compaction_delay=0 --bytes_per_sync=0 --cache_index_and_filter_blocks=0 --cache_index_and_filter_blocks_with_high_priority=1 --cache_size=33554432 --cache_type=lru_cache --charge_compression_dictionary_building_buffer=1 --charge_file_metadata=0 --charge_filter_construction=1 --charge_table_reader=1 --checkpoint_one_in=0 --checksum_type=kxxHash64 --clear_column_family_one_in=0 --column_families=1 --compact_files_one_in=0 --compact_range_one_in=0 --compaction_pri=0 --compaction_readahead_size=0 --compaction_style=0 --compaction_ttl=0 --compress_format_version=2 --compressed_secondary_cache_ratio=0 --compressed_secondary_cache_size=0 --compression_checksum=1 --compression_max_dict_buffer_bytes=0 --compression_max_dict_bytes=0 --compression_parallel_threads=4 --compression_type=zstd --compression_use_zstd_dict_trainer=1 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --data_block_index_type=0 --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --db_write_buffer_size=0 --default_temperature=kUnknown --default_write_temperature=kUnknown --delete_obsolete_files_period_micros=0 --delpercent=0 --delrangepercent=0 --destroy_db_initially=1 --detect_filter_construct_corruption=1 --disable_wal=0 --dump_malloc_stats=0 --enable_checksum_handoff=0 --enable_compaction_filter=0 --enable_custom_split_merge=0 --enable_do_not_compress_roles=1 --enable_index_compression=1 --enable_memtable_insert_with_hint_prefix_extractor=0 --enable_pipelined_write=0 --enable_sst_partitioner_factory=0 --enable_thread_tracking=1 --enable_write_thread_adaptive_yield=0 --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --fail_if_options_file_error=0 --fifo_allow_compaction=1 --file_checksum_impl=none --fill_cache=0 --flush_one_in=1000 --format_version=5 --get_current_wal_file_one_in=0 --get_live_files_one_in=0 --get_property_one_in=0 --get_sorted_wal_files_one_in=0 --hard_pending_compaction_bytes_limit=274877906944 --high_pri_pool_ratio=0 --index_block_restart_interval=6 --index_shortening=0 --index_type=0 --ingest_external_file_one_in=0 --initial_auto_readahead_size=16384 --iterpercent=0 --key_len_percent_dist=1,30,69 --last_level_temperature=kUnknown --level_compaction_dynamic_level_bytes=1 --lock_wal_one_in=0 --log2_keys_per_lock=10 --log_file_time_to_roll=0 --log_readahead_size=16777216 --long_running_snapshots=0 --low_pri_pool_ratio=0 --lowest_used_cache_tier=0 --manifest_preallocation_size=5120 --manual_wal_flush_one_in=0 --mark_for_compaction_one_file_in=0 --max_auto_readahead_size=0 --max_background_compactions=1 --max_bytes_for_level_base=67108864 --max_key=2500000 --max_key_len=3 --max_log_file_size=0 --max_manifest_file_size=1073741824 --max_sequential_skip_in_iterations=8 --max_total_wal_size=0 --max_write_batch_group_size_bytes=64 --max_write_buffer_number=10 --max_write_buffer_size_to_maintain=0 --memtable_insert_hint_per_batch=0 --memtable_max_range_deletions=0 --memtable_prefix_bloom_size_ratio=0.5 --memtable_protection_bytes_per_key=1 --memtable_whole_key_filtering=1 --memtablerep=skip_list --metadata_charge_policy=0 --min_write_buffer_number_to_merge=1 --mmap_read=0 --mock_direct_io=True --nooverwritepercent=1 --num_file_reads_for_auto_readahead=0 --num_levels=1 --open_files=-1 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=3 --optimize_filters_for_hits=1 --optimize_filters_for_memory=1 --optimize_multiget_for_io=0 --paranoid_file_checks=0 --partition_filters=0 --partition_pinning=1 --pause_background_one_in=0 --periodic_compaction_seconds=0 --prefix_size=1 --prefixpercent=0 --prepopulate_block_cache=0 --preserve_internal_time_seconds=3600 --progress_reports=0 --read_amp_bytes_per_bit=0 --read_fault_one_in=0 --readahead_size=16384 --readpercent=0 --recycle_log_file_num=0 --reopen=2 --report_bg_io_stats=1 --sample_for_compression=5 --secondary_cache_fault_one_in=0 --secondary_cache_uri= --skip_stats_update_on_db_open=1 --snapshot_hold_ops=0 --soft_pending_compaction_bytes_limit=68719476736 --sst_file_manager_bytes_per_sec=0 --sst_file_manager_bytes_per_truncate=0 --stats_dump_period_sec=10 --stats_history_buffer_size=1048576 --strict_bytes_per_sync=0 --subcompactions=3 --sync=0 --sync_fault_injection=1 --table_cache_numshardbits=6 --target_file_size_base=16777216 --target_file_size_multiplier=1 --test_batches_snapshots=0 --top_level_index_pinning=0 --unpartitioned_pinning=3 --use_adaptive_mutex=1 --use_adaptive_mutex_lru=0 --use_delta_encoding=1 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_get_entity=0 --use_merge=0 --use_multi_get_entity=0 --use_multiget=1 --use_put_entity_one_in=0 --use_write_buffer_manager=0 --user_timestamp_size=0 --value_size_mult=32 --verification_only=0 --verify_checksum=1 --verify_checksum_one_in=1000 --verify_compression=0 --verify_db_one_in=100000 --verify_file_checksums_one_in=0 --verify_iterator_with_expected_state_one_in=5 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=zstd --write_buffer_size=33554432 --write_dbid_to_manifest=0 --write_fault_one_in=0 --writepercent=100

 Verification failed for column family 0 key 000000000000B9D1000000000000012B000000000000017D (4756691): value_from_db: , value_from_expected: 010000000504070609080B0A0D0C0F0E111013121514171619181B1A1D1C1F1E212023222524272629282B2A2D2C2F2E313033323534373639383B3A3D3C3F3E, msg: Iterator verification: Value not found: NotFound:
Verification failed :(
```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12556

Test Plan:
- New UT
- Same stress test command failed before this fix but pass after
- CI

Reviewed By: ajkr

Differential Revision: D56267964

Pulled By: hx235

fbshipit-source-id: af1b7e8769c129f64ba1c7f1ff17102f1239b929
2024-05-20 17:33:43 -07:00

1008 lines
35 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "file/writable_file_writer.h"
#include <algorithm>
#include <mutex>
#include "db/version_edit.h"
#include "file/file_util.h"
#include "monitoring/histogram.h"
#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
#include "rocksdb/io_status.h"
#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"
#include "util/crc32c.h"
#include "util/random.h"
#include "util/rate_limiter_impl.h"
namespace ROCKSDB_NAMESPACE {
inline Histograms GetFileWriteHistograms(Histograms file_writer_hist,
Env::IOActivity io_activity) {
if (file_writer_hist == Histograms::SST_WRITE_MICROS ||
file_writer_hist == Histograms::BLOB_DB_BLOB_FILE_WRITE_MICROS) {
switch (io_activity) {
case Env::IOActivity::kFlush:
return Histograms::FILE_WRITE_FLUSH_MICROS;
case Env::IOActivity::kCompaction:
return Histograms::FILE_WRITE_COMPACTION_MICROS;
case Env::IOActivity::kDBOpen:
return Histograms::FILE_WRITE_DB_OPEN_MICROS;
default:
break;
}
}
return Histograms::HISTOGRAM_ENUM_MAX;
}
IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<WritableFileWriter>* writer,
IODebugContext* dbg) {
if (file_opts.use_direct_writes &&
0 == file_opts.writable_file_max_buffer_size) {
return IOStatus::InvalidArgument(
"Direct write requires writable_file_max_buffer_size > 0");
}
std::unique_ptr<FSWritableFile> file;
IOStatus io_s = fs->NewWritableFile(fname, file_opts, &file, dbg);
if (io_s.ok()) {
writer->reset(new WritableFileWriter(std::move(file), fname, file_opts));
}
return io_s;
}
IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data,
uint32_t crc32c_checksum) {
if (seen_error()) {
return GetWriterHasPreviousErrorStatus();
}
StopWatch sw(clock_, stats_, hist_type_,
GetFileWriteHistograms(hist_type_, opts.io_activity));
const IOOptions io_options = FinalizeIOOptions(opts);
const char* src = data.data();
size_t left = data.size();
IOStatus s;
pending_sync_ = true;
TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Append:0", REDUCE_ODDS2);
// Calculate the checksum of appended data
UpdateFileChecksum(data);
{
IOSTATS_TIMER_GUARD(prepare_write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
io_options, nullptr);
}
// See whether we need to enlarge the buffer to avoid the flush
if (buf_.Capacity() - buf_.CurrentSize() < left) {
for (size_t cap = buf_.Capacity();
cap < max_buffer_size_; // There is still room to increase
cap *= 2) {
// See whether the next available size is large enough.
// Buffer will never be increased to more than max_buffer_size_.
size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
if (desired_capacity - buf_.CurrentSize() >= left ||
(use_direct_io() && desired_capacity == max_buffer_size_)) {
buf_.AllocateNewBuffer(desired_capacity, true);
break;
}
}
}
// Flush only when buffered I/O
if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
if (buf_.CurrentSize() > 0) {
s = Flush(io_options);
if (!s.ok()) {
set_seen_error();
return s;
}
}
assert(buf_.CurrentSize() == 0);
}
if (perform_data_verification_ && buffered_data_with_checksum_ &&
crc32c_checksum != 0) {
// Since we want to use the checksum of the input data, we cannot break it
// into several pieces. We will only write them in the buffer when buffer
// size is enough. Otherwise, we will directly write it down.
if (use_direct_io() || (buf_.Capacity() - buf_.CurrentSize()) >= left) {
if ((buf_.Capacity() - buf_.CurrentSize()) >= left) {
size_t appended = buf_.Append(src, left);
if (appended != left) {
s = IOStatus::Corruption("Write buffer append failure");
}
buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine(
buffered_data_crc32c_checksum_, crc32c_checksum, appended);
} else {
while (left > 0) {
size_t appended = buf_.Append(src, left);
buffered_data_crc32c_checksum_ =
crc32c::Extend(buffered_data_crc32c_checksum_, src, appended);
left -= appended;
src += appended;
if (left > 0) {
s = Flush(io_options);
if (!s.ok()) {
break;
}
}
}
}
} else {
assert(buf_.CurrentSize() == 0);
buffered_data_crc32c_checksum_ = crc32c_checksum;
s = WriteBufferedWithChecksum(io_options, src, left);
}
} else {
// In this case, either we do not need to do the data verification or
// caller does not provide the checksum of the data (crc32c_checksum = 0).
//
// We never write directly to disk with direct I/O on.
// or we simply use it for its original purpose to accumulate many small
// chunks
if (use_direct_io() || (buf_.Capacity() >= left)) {
while (left > 0) {
size_t appended = buf_.Append(src, left);
if (perform_data_verification_ && buffered_data_with_checksum_) {
buffered_data_crc32c_checksum_ =
crc32c::Extend(buffered_data_crc32c_checksum_, src, appended);
}
left -= appended;
src += appended;
if (left > 0) {
s = Flush(io_options);
if (!s.ok()) {
break;
}
}
}
} else {
// Writing directly to file bypassing the buffer
assert(buf_.CurrentSize() == 0);
if (perform_data_verification_ && buffered_data_with_checksum_) {
buffered_data_crc32c_checksum_ = crc32c::Value(src, left);
s = WriteBufferedWithChecksum(io_options, src, left);
} else {
s = WriteBuffered(io_options, src, left);
}
}
}
TEST_KILL_RANDOM("WritableFileWriter::Append:1");
if (s.ok()) {
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
filesize_.store(cur_size + data.size(), std::memory_order_release);
} else {
set_seen_error();
}
return s;
}
IOStatus WritableFileWriter::Pad(const IOOptions& opts,
const size_t pad_bytes) {
if (seen_error()) {
return GetWriterHasPreviousErrorStatus();
}
const IOOptions io_options = FinalizeIOOptions(opts);
assert(pad_bytes < kDefaultPageSize);
size_t left = pad_bytes;
size_t cap = buf_.Capacity() - buf_.CurrentSize();
// Assume pad_bytes is small compared to buf_ capacity. So we always
// use buf_ rather than write directly to file in certain cases like
// Append() does.
while (left) {
size_t append_bytes = std::min(cap, left);
buf_.PadWith(append_bytes, 0);
left -= append_bytes;
Slice data(buf_.BufferStart() + buf_.CurrentSize() - append_bytes,
append_bytes);
UpdateFileChecksum(data);
if (perform_data_verification_) {
buffered_data_crc32c_checksum_ = crc32c::Extend(
buffered_data_crc32c_checksum_,
buf_.BufferStart() + buf_.CurrentSize() - append_bytes, append_bytes);
}
if (left > 0) {
IOStatus s = Flush(io_options);
if (!s.ok()) {
set_seen_error();
return s;
}
}
cap = buf_.Capacity() - buf_.CurrentSize();
}
pending_sync_ = true;
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
filesize_.store(cur_size + pad_bytes, std::memory_order_release);
return IOStatus::OK();
}
IOStatus WritableFileWriter::Close(const IOOptions& opts) {
IOOptions io_options = FinalizeIOOptions(opts);
if (seen_error()) {
IOStatus interim;
if (writable_file_.get() != nullptr) {
interim = writable_file_->Close(io_options, nullptr);
writable_file_.reset();
}
if (interim.ok()) {
return IOStatus::IOError(
"File is closed but data not flushed as writer has previous error.");
} else {
return interim;
}
}
// Do not quit immediately on failure the file MUST be closed
// Possible to close it twice now as we MUST close
// in __dtor, simply flushing is not enough
// Windows when pre-allocating does not fill with zeros
// also with unbuffered access we also set the end of data.
if (writable_file_.get() == nullptr) {
return IOStatus::OK();
}
IOStatus s;
s = Flush(io_options); // flush cache to OS
IOStatus interim;
// In direct I/O mode we write whole pages so
// we need to let the file know where data ends.
if (use_direct_io()) {
{
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
uint64_t filesz = filesize_.load(std::memory_order_acquire);
interim = writable_file_->Truncate(filesz, io_options, nullptr);
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
if (!interim.ok()) {
NotifyOnIOError(interim, FileOperationType::kTruncate, file_name(),
filesz);
}
}
}
if (interim.ok()) {
{
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
interim = writable_file_->Fsync(io_options, nullptr);
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileSyncFinish(start_ts, finish_ts, s,
FileOperationType::kFsync);
if (!interim.ok()) {
NotifyOnIOError(interim, FileOperationType::kFsync, file_name());
}
}
}
}
if (!interim.ok() && s.ok()) {
s = interim;
}
}
TEST_KILL_RANDOM("WritableFileWriter::Close:0");
{
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
interim = writable_file_->Close(io_options, nullptr);
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileCloseFinish(start_ts, finish_ts, s);
if (!interim.ok()) {
NotifyOnIOError(interim, FileOperationType::kClose, file_name());
}
}
}
if (!interim.ok() && s.ok()) {
s = interim;
}
writable_file_.reset();
TEST_KILL_RANDOM("WritableFileWriter::Close:1");
if (s.ok()) {
if (checksum_generator_ != nullptr && !checksum_finalized_) {
checksum_generator_->Finalize();
checksum_finalized_ = true;
}
} else {
set_seen_error();
}
return s;
}
// write out the cached data to the OS cache or storage if direct I/O
// enabled
IOStatus WritableFileWriter::Flush(const IOOptions& opts) {
if (seen_error()) {
return GetWriterHasPreviousErrorStatus();
}
const IOOptions io_options = FinalizeIOOptions(opts);
IOStatus s;
TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2);
if (buf_.CurrentSize() > 0) {
if (use_direct_io()) {
if (pending_sync_) {
if (perform_data_verification_ && buffered_data_with_checksum_) {
s = WriteDirectWithChecksum(io_options);
} else {
s = WriteDirect(io_options);
}
}
} else {
if (perform_data_verification_ && buffered_data_with_checksum_) {
s = WriteBufferedWithChecksum(io_options, buf_.BufferStart(),
buf_.CurrentSize());
} else {
s = WriteBuffered(io_options, buf_.BufferStart(), buf_.CurrentSize());
}
}
if (!s.ok()) {
set_seen_error();
return s;
}
}
{
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
s = writable_file_->Flush(io_options, nullptr);
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileFlushFinish(start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kFlush, file_name());
}
}
}
if (!s.ok()) {
set_seen_error();
return s;
}
// sync OS cache to disk for every bytes_per_sync_
// TODO: give log file and sst file different options (log
// files could be potentially cached in OS for their whole
// life time, thus we might not want to flush at all).
// We try to avoid sync to the last 1MB of data. For two reasons:
// (1) avoid rewrite the same page that is modified later.
// (2) for older version of OS, write can block while writing out
// the page.
// Xfs does neighbor page flushing outside of the specified ranges. We
// need to make sure sync range is far from the write offset.
if (!use_direct_io() && bytes_per_sync_) {
const uint64_t kBytesNotSyncRange =
1024 * 1024; // recent 1MB is not synced.
const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
if (cur_size > kBytesNotSyncRange) {
uint64_t offset_sync_to = cur_size - kBytesNotSyncRange;
offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
assert(offset_sync_to >= last_sync_size_);
if (offset_sync_to > 0 &&
offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
s = RangeSync(io_options, last_sync_size_,
offset_sync_to - last_sync_size_);
if (!s.ok()) {
set_seen_error();
}
last_sync_size_ = offset_sync_to;
}
}
}
return s;
}
std::string WritableFileWriter::GetFileChecksum() {
if (checksum_generator_ != nullptr) {
assert(checksum_finalized_);
return checksum_generator_->GetChecksum();
} else {
return kUnknownFileChecksum;
}
}
const char* WritableFileWriter::GetFileChecksumFuncName() const {
if (checksum_generator_ != nullptr) {
return checksum_generator_->Name();
} else {
return kUnknownFileChecksumFuncName;
}
}
IOStatus WritableFileWriter::PrepareIOOptions(const WriteOptions& wo,
IOOptions& opts) {
return PrepareIOFromWriteOptions(wo, opts);
}
IOStatus WritableFileWriter::Sync(const IOOptions& opts, bool use_fsync) {
if (seen_error()) {
return GetWriterHasPreviousErrorStatus();
}
IOOptions io_options = FinalizeIOOptions(opts);
IOStatus s = Flush(io_options);
if (!s.ok()) {
set_seen_error();
return s;
}
TEST_KILL_RANDOM("WritableFileWriter::Sync:0");
if (!use_direct_io() && pending_sync_) {
s = SyncInternal(io_options, use_fsync);
if (!s.ok()) {
set_seen_error();
return s;
}
}
TEST_KILL_RANDOM("WritableFileWriter::Sync:1");
pending_sync_ = false;
return IOStatus::OK();
}
IOStatus WritableFileWriter::SyncWithoutFlush(const IOOptions& opts,
bool use_fsync) {
if (seen_error()) {
return GetWriterHasPreviousErrorStatus();
}
IOOptions io_options = FinalizeIOOptions(opts);
if (!writable_file_->IsSyncThreadSafe()) {
return IOStatus::NotSupported(
"Can't WritableFileWriter::SyncWithoutFlush() because "
"WritableFile::IsSyncThreadSafe() is false");
}
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
IOStatus s = SyncInternal(io_options, use_fsync);
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
if (!s.ok()) {
set_seen_error();
}
return s;
}
IOStatus WritableFileWriter::SyncInternal(const IOOptions& opts,
bool use_fsync) {
// Caller is supposed to check seen_error_
IOStatus s;
IOSTATS_TIMER_GUARD(fsync_nanos);
TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
auto prev_perf_level = GetPerfLevel();
IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
if (use_fsync) {
s = writable_file_->Fsync(opts, nullptr);
} else {
s = writable_file_->Sync(opts, nullptr);
}
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileSyncFinish(
start_ts, finish_ts, s,
use_fsync ? FileOperationType::kFsync : FileOperationType::kSync);
if (!s.ok()) {
NotifyOnIOError(
s, (use_fsync ? FileOperationType::kFsync : FileOperationType::kSync),
file_name());
}
}
SetPerfLevel(prev_perf_level);
// The caller will be responsible to call set_seen_error() if s is not OK.
return s;
}
IOStatus WritableFileWriter::RangeSync(const IOOptions& opts, uint64_t offset,
uint64_t nbytes) {
if (seen_error()) {
return GetWriterHasPreviousErrorStatus();
}
IOSTATS_TIMER_GUARD(range_sync_nanos);
TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
IOStatus s = writable_file_->RangeSync(offset, nbytes, opts, nullptr);
if (!s.ok()) {
set_seen_error();
}
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileRangeSyncFinish(offset, nbytes, start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kRangeSync, file_name(), nbytes,
offset);
}
}
return s;
}
// This method writes to disk the specified data and makes use of the rate
// limiter if available
IOStatus WritableFileWriter::WriteBuffered(const IOOptions& opts,
const char* data, size_t size) {
if (seen_error()) {
return GetWriterHasPreviousErrorStatus();
}
IOStatus s;
assert(!use_direct_io());
const char* src = data;
size_t left = size;
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];
Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
while (left > 0) {
size_t allowed = left;
if (rate_limiter_ != nullptr &&
rate_limiter_priority_used != Env::IO_TOTAL) {
allowed = rate_limiter_->RequestToken(left, 0 /* alignment */,
rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite);
}
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
FileOperationInfo::StartTimePoint start_ts;
uint64_t old_size = writable_file_->GetFileSize(opts, nullptr);
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
old_size = next_write_offset_;
}
{
auto prev_perf_level = GetPerfLevel();
IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
if (perform_data_verification_) {
Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->Append(Slice(src, allowed), opts, v_info,
nullptr);
} else {
s = writable_file_->Append(Slice(src, allowed), opts, nullptr);
}
if (!s.ok()) {
// If writable_file_->Append() failed, then the data may or may not
// exist in the underlying memory buffer, OS page cache, remote file
// system's buffer, etc. If WritableFileWriter keeps the data in
// buf_, then a future Close() or write retry may send the data to
// the underlying file again. If the data does exist in the
// underlying buffer and gets written to the file eventually despite
// returning error, the file may end up with two duplicate pieces of
// data. Therefore, clear the buf_ at the WritableFileWriter layer
// and let caller determine error handling.
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
}
SetPerfLevel(prev_perf_level);
}
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kAppend, file_name(), allowed,
old_size);
}
}
if (!s.ok()) {
set_seen_error();
return s;
}
}
IOSTATS_ADD(bytes_written, allowed);
TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0");
left -= allowed;
src += allowed;
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + allowed, std::memory_order_release);
}
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
if (!s.ok()) {
set_seen_error();
}
return s;
}
IOStatus WritableFileWriter::WriteBufferedWithChecksum(const IOOptions& opts,
const char* data,
size_t size) {
if (seen_error()) {
return GetWriterHasPreviousErrorStatus();
}
IOStatus s;
assert(!use_direct_io());
assert(perform_data_verification_ && buffered_data_with_checksum_);
const char* src = data;
size_t left = size;
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];
Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
// Check how much is allowed. Here, we loop until the rate limiter allows to
// write the entire buffer.
// TODO: need to be improved since it sort of defeats the purpose of the rate
// limiter
size_t data_size = left;
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
while (data_size > 0) {
size_t tmp_size;
tmp_size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite);
data_size -= tmp_size;
}
}
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
FileOperationInfo::StartTimePoint start_ts;
uint64_t old_size = writable_file_->GetFileSize(opts, nullptr);
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
old_size = next_write_offset_;
}
{
auto prev_perf_level = GetPerfLevel();
IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->Append(Slice(src, left), opts, v_info, nullptr);
SetPerfLevel(prev_perf_level);
}
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileWriteFinish(old_size, left, start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kAppend, file_name(), left,
old_size);
}
}
if (!s.ok()) {
// If writable_file_->Append() failed, then the data may or may not
// exist in the underlying memory buffer, OS page cache, remote file
// system's buffer, etc. If WritableFileWriter keeps the data in
// buf_, then a future Close() or write retry may send the data to
// the underlying file again. If the data does exist in the
// underlying buffer and gets written to the file eventually despite
// returning error, the file may end up with two duplicate pieces of
// data. Therefore, clear the buf_ at the WritableFileWriter layer
// and let caller determine error handling.
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
set_seen_error();
return s;
}
}
IOSTATS_ADD(bytes_written, left);
TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0");
// Buffer write is successful, reset the buffer current size to 0 and reset
// the corresponding checksum value
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + left, std::memory_order_release);
if (!s.ok()) {
set_seen_error();
}
return s;
}
void WritableFileWriter::UpdateFileChecksum(const Slice& data) {
if (checksum_generator_ != nullptr) {
checksum_generator_->Update(data.data(), data.size());
}
}
// Currently, crc32c checksum is used to calculate the checksum value of the
// content in the input buffer for handoff. In the future, the checksum might be
// calculated from the existing crc32c checksums of the in WAl and Manifest
// records, or even SST file blocks.
// TODO: effectively use the existing checksum of the data being writing to
// generate the crc32c checksum instead of a raw calculation.
void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data,
size_t size,
char* buf) {
uint32_t v_crc32c = crc32c::Extend(0, data, size);
EncodeFixed32(buf, v_crc32c);
}
// This flushes the accumulated data in the buffer. We pad data with zeros if
// necessary to the whole page.
// However, during automatic flushes padding would not be necessary.
// We always use RateLimiter if available. We move (Refit) any buffer bytes
// that are left over the
// whole number of pages to be written again on the next flush because we can
// only write on aligned
// offsets.
IOStatus WritableFileWriter::WriteDirect(const IOOptions& opts) {
if (seen_error()) {
assert(false);
return IOStatus::IOError("Writer has previous error.");
}
assert(use_direct_io());
IOStatus s;
const size_t alignment = buf_.Alignment();
assert((next_write_offset_ % alignment) == 0);
// Calculate whole page final file advance if all writes succeed
const size_t file_advance =
TruncateToPageBoundary(alignment, buf_.CurrentSize());
// Calculate the leftover tail, we write it here padded with zeros BUT we
// will write it again in the future either on Close() OR when the current
// whole page fills out.
const size_t leftover_tail = buf_.CurrentSize() - file_advance;
// Round up and pad
buf_.PadToAlignmentWith(0);
const char* src = buf_.BufferStart();
uint64_t write_offset = next_write_offset_;
size_t left = buf_.CurrentSize();
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];
Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
while (left > 0) {
// Check how much is allowed
size_t size = left;
if (rate_limiter_ != nullptr &&
rate_limiter_priority_used != Env::IO_TOTAL) {
size = rate_limiter_->RequestToken(left, buf_.Alignment(),
rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite);
}
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
// direct writes must be positional
if (perform_data_verification_) {
Crc32cHandoffChecksumCalculation(src, size, checksum_buf);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
opts, v_info, nullptr);
} else {
s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
opts, nullptr);
}
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(),
size, write_offset);
}
}
if (!s.ok()) {
buf_.Size(file_advance + leftover_tail);
set_seen_error();
return s;
}
}
IOSTATS_ADD(bytes_written, size);
left -= size;
src += size;
write_offset += size;
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + size, std::memory_order_release);
assert((next_write_offset_ % alignment) == 0);
}
if (s.ok()) {
// Move the tail to the beginning of the buffer
// This never happens during normal Append but rather during
// explicit call to Flush()/Sync() or Close()
buf_.RefitTail(file_advance, leftover_tail);
// This is where we start writing next time which may or not be
// the actual file size on disk. They match if the buffer size
// is a multiple of whole pages otherwise filesize_ is leftover_tail
// behind
next_write_offset_ += file_advance;
} else {
set_seen_error();
}
return s;
}
IOStatus WritableFileWriter::WriteDirectWithChecksum(const IOOptions& opts) {
if (seen_error()) {
return GetWriterHasPreviousErrorStatus();
}
assert(use_direct_io());
assert(perform_data_verification_ && buffered_data_with_checksum_);
IOStatus s;
const size_t alignment = buf_.Alignment();
assert((next_write_offset_ % alignment) == 0);
// Calculate whole page final file advance if all writes succeed
const size_t file_advance =
TruncateToPageBoundary(alignment, buf_.CurrentSize());
// Calculate the leftover tail, we write it here padded with zeros BUT we
// will write it again in the future either on Close() OR when the current
// whole page fills out.
const size_t leftover_tail = buf_.CurrentSize() - file_advance;
// Round up, pad, and combine the checksum.
size_t last_cur_size = buf_.CurrentSize();
buf_.PadToAlignmentWith(0);
size_t padded_size = buf_.CurrentSize() - last_cur_size;
const char* padded_start = buf_.BufferStart() + last_cur_size;
uint32_t padded_checksum = crc32c::Value(padded_start, padded_size);
buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine(
buffered_data_crc32c_checksum_, padded_checksum, padded_size);
const char* src = buf_.BufferStart();
uint64_t write_offset = next_write_offset_;
size_t left = buf_.CurrentSize();
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];
Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
// Check how much is allowed. Here, we loop until the rate limiter allows to
// write the entire buffer.
// TODO: need to be improved since it sort of defeats the purpose of the rate
// limiter
size_t data_size = left;
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
while (data_size > 0) {
size_t size;
size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite);
data_size -= size;
}
}
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
// direct writes must be positional
EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->PositionedAppend(Slice(src, left), write_offset, opts,
v_info, nullptr);
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileWriteFinish(write_offset, left, start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(),
left, write_offset);
}
}
if (!s.ok()) {
// In this case, we do not change buffered_data_crc32c_checksum_ because
// it still aligns with the data in the buffer.
buf_.Size(file_advance + leftover_tail);
buffered_data_crc32c_checksum_ =
crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
set_seen_error();
return s;
}
}
IOSTATS_ADD(bytes_written, left);
assert((next_write_offset_ % alignment) == 0);
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + left, std::memory_order_release);
if (s.ok()) {
// Move the tail to the beginning of the buffer
// This never happens during normal Append but rather during
// explicit call to Flush()/Sync() or Close(). Also the buffer checksum will
// recalculated accordingly.
buf_.RefitTail(file_advance, leftover_tail);
// Adjust the checksum value to align with the data in the buffer
buffered_data_crc32c_checksum_ =
crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
// This is where we start writing next time which may or not be
// the actual file size on disk. They match if the buffer size
// is a multiple of whole pages otherwise filesize_ is leftover_tail
// behind
next_write_offset_ += file_advance;
} else {
set_seen_error();
}
return s;
}
Env::IOPriority WritableFileWriter::DecideRateLimiterPriority(
Env::IOPriority writable_file_io_priority,
Env::IOPriority op_rate_limiter_priority) {
if (writable_file_io_priority == Env::IO_TOTAL &&
op_rate_limiter_priority == Env::IO_TOTAL) {
return Env::IO_TOTAL;
} else if (writable_file_io_priority == Env::IO_TOTAL) {
return op_rate_limiter_priority;
} else if (op_rate_limiter_priority == Env::IO_TOTAL) {
return writable_file_io_priority;
} else {
return op_rate_limiter_priority;
}
}
IOOptions WritableFileWriter::FinalizeIOOptions(const IOOptions& opts) const {
Env::IOPriority op_rate_limiter_priority = opts.rate_limiter_priority;
IOOptions io_options(opts);
if (writable_file_.get() != nullptr) {
io_options.rate_limiter_priority =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
}
return io_options;
}
} // namespace ROCKSDB_NAMESPACE