// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include #include "db/db_impl/db_impl.h" #include "db/error_handler.h" #include "db/event_helpers.h" #include "logging/logging.h" #include "monitoring/perf_context_imp.h" #include "options/options_helper.h" #include "test_util/sync_point.h" #include "util/cast_util.h" namespace ROCKSDB_NAMESPACE { // Convenience methods Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { const Status s = FailIfCfHasTs(column_family); if (!s.ok()) { return s; } return DB::Put(o, column_family, key, val); } Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, const Slice& key, const Slice& ts, const Slice& val) { const Status s = FailIfTsMismatchCf(column_family, ts); if (!s.ok()) { return s; } return DB::Put(o, column_family, key, ts, val); } Status DBImpl::PutEntity(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const WideColumns& columns) { const Status s = FailIfCfHasTs(column_family); if (!s.ok()) { return s; } return DB::PutEntity(options, column_family, key, columns); } Status DBImpl::PutEntity(const WriteOptions& options, const Slice& key, const AttributeGroups& attribute_groups) { for (const AttributeGroup& ag : attribute_groups) { const Status s = FailIfCfHasTs(ag.column_family()); if (!s.ok()) { return s; } } return DB::PutEntity(options, key, attribute_groups); } Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { const Status s = FailIfCfHasTs(column_family); if (!s.ok()) { return s; } auto cfh = static_cast_with_check(column_family); if (!cfh->cfd()->ioptions()->merge_operator) { return Status::NotSupported("Provide a merge_operator when opening DB"); } else { return DB::Merge(o, column_family, key, val); } } Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, const Slice& key, const Slice& ts, const Slice& val) { const Status s = FailIfTsMismatchCf(column_family, ts); if (!s.ok()) { return s; } return DB::Merge(o, column_family, key, ts, val); } Status DBImpl::Delete(const WriteOptions& write_options, ColumnFamilyHandle* column_family, const Slice& key) { const Status s = FailIfCfHasTs(column_family); if (!s.ok()) { return s; } return DB::Delete(write_options, column_family, key); } Status DBImpl::Delete(const WriteOptions& write_options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& ts) { const Status s = FailIfTsMismatchCf(column_family, ts); if (!s.ok()) { return s; } return DB::Delete(write_options, column_family, key, ts); } Status DBImpl::SingleDelete(const WriteOptions& write_options, ColumnFamilyHandle* column_family, const Slice& key) { const Status s = FailIfCfHasTs(column_family); if (!s.ok()) { return s; } return DB::SingleDelete(write_options, column_family, key); } Status DBImpl::SingleDelete(const WriteOptions& write_options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& ts) { const Status s = FailIfTsMismatchCf(column_family, ts); if (!s.ok()) { return s; } return DB::SingleDelete(write_options, column_family, key, ts); } Status DBImpl::DeleteRange(const WriteOptions& write_options, ColumnFamilyHandle* column_family, const Slice& begin_key, const Slice& end_key) { const Status s = FailIfCfHasTs(column_family); if (!s.ok()) { return s; } return DB::DeleteRange(write_options, column_family, begin_key, end_key); } Status DBImpl::DeleteRange(const WriteOptions& write_options, ColumnFamilyHandle* column_family, const Slice& begin_key, const Slice& end_key, const Slice& ts) { const Status s = FailIfTsMismatchCf(column_family, ts); if (!s.ok()) { return s; } return DB::DeleteRange(write_options, column_family, begin_key, end_key, ts); } void DBImpl::SetRecoverableStatePreReleaseCallback( PreReleaseCallback* callback) { recoverable_state_pre_release_callback_.reset(callback); } Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { Status s; if (write_options.protection_bytes_per_key > 0) { s = WriteBatchInternal::UpdateProtectionInfo( my_batch, write_options.protection_bytes_per_key); } if (s.ok()) { s = WriteImpl(write_options, my_batch, /*callback=*/nullptr, /*user_write_cb=*/nullptr, /*log_used=*/nullptr); } return s; } Status DBImpl::WriteWithCallback(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, UserWriteCallback* user_write_cb) { Status s; if (write_options.protection_bytes_per_key > 0) { s = WriteBatchInternal::UpdateProtectionInfo( my_batch, write_options.protection_bytes_per_key); } if (s.ok()) { s = WriteImpl(write_options, my_batch, callback, user_write_cb); } return s; } Status DBImpl::WriteWithCallback(const WriteOptions& write_options, WriteBatch* my_batch, UserWriteCallback* user_write_cb) { Status s; if (write_options.protection_bytes_per_key > 0) { s = WriteBatchInternal::UpdateProtectionInfo( my_batch, write_options.protection_bytes_per_key); } if (s.ok()) { s = WriteImpl(write_options, my_batch, /*callback=*/nullptr, user_write_cb); } return s; } // The main write queue. This is the only write queue that updates LastSequence. // When using one write queue, the same sequence also indicates the last // published sequence. Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, UserWriteCallback* user_write_cb, uint64_t* log_used, uint64_t log_ref, bool disable_memtable, uint64_t* seq_used, size_t batch_cnt, PreReleaseCallback* pre_release_callback, PostMemTableCallback* post_memtable_callback) { assert(!seq_per_batch_ || batch_cnt != 0); assert(my_batch == nullptr || my_batch->Count() == 0 || write_options.protection_bytes_per_key == 0 || write_options.protection_bytes_per_key == my_batch->GetProtectionBytesPerKey()); if (my_batch == nullptr) { return Status::InvalidArgument("Batch is nullptr!"); } else if (!disable_memtable && WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) { // If writing to memtable, then we require the caller to set/update the // timestamps for the keys in the write batch. // Otherwise, it means we are just writing to the WAL, and we allow // timestamps unset for the keys in the write batch. This can happen if we // use TransactionDB with write-committed policy, and we currently do not // support user-defined timestamp with other policies. // In the prepare phase, a transaction can write the batch to the WAL // without inserting to memtable. The keys in the batch do not have to be // assigned timestamps because they will be used only during recovery if // there is a commit marker which includes their commit timestamp. return Status::InvalidArgument("write batch must have timestamp(s) set"); } else if (write_options.rate_limiter_priority != Env::IO_TOTAL && write_options.rate_limiter_priority != Env::IO_USER) { return Status::InvalidArgument( "WriteOptions::rate_limiter_priority only allows " "Env::IO_TOTAL and Env::IO_USER due to implementation constraints"); } else if (write_options.rate_limiter_priority != Env::IO_TOTAL && (write_options.disableWAL || manual_wal_flush_)) { return Status::InvalidArgument( "WriteOptions::rate_limiter_priority currently only supports " "rate-limiting automatic WAL flush, which requires " "`WriteOptions::disableWAL` and " "`DBOptions::manual_wal_flush` both set to false"); } else if (write_options.protection_bytes_per_key != 0 && write_options.protection_bytes_per_key != 8) { return Status::InvalidArgument( "`WriteOptions::protection_bytes_per_key` must be zero or eight"); } else if (write_options.disableWAL && immutable_db_options_.recycle_log_file_num > 0 && !(two_write_queues_ && disable_memtable)) { // Corruption detection in recycled WALs relies on sequential sequence // numbers, but WritePreparedTxnDB uses disableWAL internally for split // writes return Status::InvalidArgument( "WriteOptions::disableWAL option is not supported if " "DBOptions::recycle_log_file_num > 0"); } // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // grabs but does not seem thread-safe. if (tracer_) { InstrumentedMutexLock lock(&trace_mutex_); if (tracer_ && !tracer_->IsWriteOrderPreserved()) { // We don't have to preserve write order so can trace anywhere. It's more // efficient to trace here than to add latency to a phase of the log/apply // pipeline. // TODO: maybe handle the tracing status? tracer_->Write(my_batch).PermitUncheckedError(); } } if (write_options.sync && write_options.disableWAL) { return Status::InvalidArgument("Sync writes has to enable WAL."); } if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) { return Status::NotSupported( "pipelined_writes is not compatible with concurrent prepares"); } if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) { // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt return Status::NotSupported( "pipelined_writes is not compatible with seq_per_batch"); } if (immutable_db_options_.unordered_write && immutable_db_options_.enable_pipelined_write) { return Status::NotSupported( "pipelined_writes is not compatible with unordered_write"); } if (immutable_db_options_.enable_pipelined_write && post_memtable_callback != nullptr) { return Status::NotSupported( "pipelined write currently does not honor post_memtable_callback"); } if (seq_per_batch_ && post_memtable_callback != nullptr) { return Status::NotSupported( "seq_per_batch currently does not honor post_memtable_callback"); } if (my_batch->HasDeleteRange() && immutable_db_options_.row_cache) { return Status::NotSupported( "DeleteRange is not compatible with row cache."); } // Otherwise IsLatestPersistentState optimization does not make sense assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) || disable_memtable); if (write_options.low_pri) { Status s = ThrottleLowPriWritesIfNeeded(write_options, my_batch); if (!s.ok()) { return s; } } if (two_write_queues_ && disable_memtable) { AssignOrder assign_order = seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder; // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and // they don't consume sequence. return WriteImplWALOnly( &nonmem_write_thread_, write_options, my_batch, callback, user_write_cb, log_used, log_ref, seq_used, batch_cnt, pre_release_callback, assign_order, kDontPublishLastSeq, disable_memtable); } if (immutable_db_options_.unordered_write) { const size_t sub_batch_cnt = batch_cnt != 0 ? batch_cnt // every key is a sub-batch consuming a seq : WriteBatchInternal::Count(my_batch); uint64_t seq = 0; // Use a write thread to i) optimize for WAL write, ii) publish last // sequence in in increasing order, iii) call pre_release_callback serially Status status = WriteImplWALOnly( &write_thread_, write_options, my_batch, callback, user_write_cb, log_used, log_ref, &seq, sub_batch_cnt, pre_release_callback, kDoAssignOrder, kDoPublishLastSeq, disable_memtable); TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL"); if (!status.ok()) { return status; } if (seq_used) { *seq_used = seq; } if (!disable_memtable) { TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"); status = UnorderedWriteMemtable(write_options, my_batch, callback, log_ref, seq, sub_batch_cnt); } return status; } if (immutable_db_options_.enable_pipelined_write) { return PipelinedWriteImpl(write_options, my_batch, callback, user_write_cb, log_used, log_ref, disable_memtable, seq_used); } PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, user_write_cb, log_ref, disable_memtable, batch_cnt, pre_release_callback, post_memtable_callback); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); write_thread_.JoinBatchGroup(&w); if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_CALLER) { write_thread_.SetMemWritersEachStride(&w); } if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { // we are a non-leader in a parallel group if (w.ShouldWriteToMemtable()) { PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time); ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt, batch_per_txn_, write_options.memtable_insert_hint_per_batch); PERF_TIMER_START(write_pre_and_post_process_time); } if (write_thread_.CompleteParallelMemTableWriter(&w)) { // we're responsible for exit batch group // TODO(myabandeh): propagate status to write_group auto last_sequence = w.write_group->last_sequence; for (auto* tmp_w : *(w.write_group)) { assert(tmp_w); if (tmp_w->post_memtable_callback) { Status tmp_s = (*tmp_w->post_memtable_callback)(last_sequence, disable_memtable); // TODO: propagate the execution status of post_memtable_callback to // caller. assert(tmp_s.ok()); } } versions_->SetLastSequence(last_sequence); MemTableInsertStatusCheck(w.status); write_thread_.ExitAsBatchGroupFollower(&w); } assert(w.state == WriteThread::STATE_COMPLETED); // STATE_COMPLETED conditional below handles exit } if (w.state == WriteThread::STATE_COMPLETED) { if (log_used != nullptr) { *log_used = w.log_used; } if (seq_used != nullptr) { *seq_used = w.sequence; } // write is complete and leader has updated sequence return w.FinalStatus(); } // else we are the leader of the write batch group assert(w.state == WriteThread::STATE_GROUP_LEADER); Status status; // Once reaches this point, the current writer "w" will try to do its write // job. It may also pick up some of the remaining writers in the "writers_" // when it finds suitable, and finish them in the same write batch. // This is how a write job could be done by the other writer. WriteContext write_context; LogContext log_context(write_options.sync); WriteThread::WriteGroup write_group; bool in_parallel_group = false; uint64_t last_sequence = kMaxSequenceNumber; assert(!two_write_queues_ || !disable_memtable); { // With concurrent writes we do preprocess only in the write thread that // also does write to memtable to avoid sync issue on shared data structure // with the other thread // PreprocessWrite does its own perf timing. PERF_TIMER_STOP(write_pre_and_post_process_time); status = PreprocessWrite(write_options, &log_context, &write_context); if (!two_write_queues_) { // Assign it after ::PreprocessWrite since the sequence might advance // inside it by WriteRecoverableState last_sequence = versions_->LastSequence(); } PERF_TIMER_START(write_pre_and_post_process_time); } // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into memtables TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters"); last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader(&w, &write_group); IOStatus io_s; Status pre_release_cb_status; if (status.ok()) { // Rules for when we can update the memtable concurrently // 1. supported by memtable // 2. Puts are not okay if inplace_update_support // 3. Merges are not okay // // Rules 1..2 are enforced by checking the options // during startup (CheckConcurrentWritesSupported), so if // options.allow_concurrent_memtable_write is true then they can be // assumed to be true. Rule 3 is checked for each batch. We could // relax rules 2 if we could prevent write batches from referring // more than once to a particular key. bool parallel = immutable_db_options_.allow_concurrent_memtable_write && write_group.size > 1; size_t total_count = 0; size_t valid_batches = 0; size_t total_byte_size = 0; size_t pre_release_callback_cnt = 0; for (auto* writer : write_group) { assert(writer); if (writer->CheckCallback(this)) { valid_batches += writer->batch_cnt; if (writer->ShouldWriteToMemtable()) { total_count += WriteBatchInternal::Count(writer->batch); total_byte_size = WriteBatchInternal::AppendedByteSize( total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); parallel = parallel && !writer->batch->HasMerge(); } if (writer->pre_release_callback) { pre_release_callback_cnt++; } } } // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // grabs but does not seem thread-safe. if (tracer_) { InstrumentedMutexLock lock(&trace_mutex_); if (tracer_ && tracer_->IsWriteOrderPreserved()) { for (auto* writer : write_group) { if (writer->CallbackFailed()) { continue; } // TODO: maybe handle the tracing status? tracer_->Write(writer->batch).PermitUncheckedError(); } } } // Note about seq_per_batch_: either disableWAL is set for the entire write // group or not. In either case we inc seq for each write batch with no // failed callback. This means that there could be a batch with // disalbe_memtable in between; although we do not write this batch to // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc // the seq per valid written key to mem. size_t seq_inc = seq_per_batch_ ? valid_batches : total_count; const bool concurrent_update = two_write_queues_; // Update stats while we are an exclusive group leader, so we know // that nobody else can be writing to these particular stats. // We're optimistic, updating the stats before we successfully // commit. That lets us release our leader status early. auto stats = default_cf_internal_stats_; stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count, concurrent_update); RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size, concurrent_update); RecordTick(stats_, BYTES_WRITTEN, total_byte_size); stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1, concurrent_update); RecordTick(stats_, WRITE_DONE_BY_SELF); auto write_done_by_other = write_group.size - 1; if (write_done_by_other > 0) { stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther, write_done_by_other, concurrent_update); RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); } RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); if (write_options.disableWAL) { has_unpersisted_data_.store(true, std::memory_order_relaxed); } PERF_TIMER_STOP(write_pre_and_post_process_time); if (!two_write_queues_) { if (status.ok() && !write_options.disableWAL) { assert(log_context.log_file_number_size); LogFileNumberSize& log_file_number_size = *(log_context.log_file_number_size); PERF_TIMER_GUARD(write_wal_time); io_s = WriteToWAL(write_group, log_context.writer, log_used, log_context.need_log_sync, log_context.need_log_dir_sync, last_sequence + 1, log_file_number_size); } } else { if (status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); // LastAllocatedSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); } else { // Otherwise we inc seq number for memtable writes last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); } } status = io_s; assert(last_sequence != kMaxSequenceNumber); const SequenceNumber current_sequence = last_sequence + 1; last_sequence += seq_inc; if (log_context.need_log_sync) { VersionEdit synced_wals; log_write_mutex_.Lock(); if (status.ok()) { MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, &synced_wals); } else { MarkLogsNotSynced(logfile_number_); } log_write_mutex_.Unlock(); if (status.ok() && synced_wals.IsWalAddition()) { InstrumentedMutexLock l(&mutex_); // TODO: plumb Env::IOActivity, Env::IOPriority const ReadOptions read_options; status = ApplyWALToManifest(read_options, write_options, &synced_wals); } // Requesting sync with two_write_queues_ is expected to be very rare. We // hence provide a simple implementation that is not necessarily // efficient. if (status.ok() && two_write_queues_) { if (manual_wal_flush_) { status = FlushWAL(true); } else { status = SyncWAL(); } } } // PreReleaseCallback is called after WAL write and before memtable write if (status.ok()) { SequenceNumber next_sequence = current_sequence; size_t index = 0; // Note: the logic for advancing seq here must be consistent with the // logic in WriteBatchInternal::InsertInto(write_group...) as well as // with WriteBatchInternal::InsertInto(write_batch...) that is called on // the merged batch during recovery from the WAL. for (auto* writer : write_group) { if (writer->CallbackFailed()) { continue; } writer->sequence = next_sequence; if (writer->pre_release_callback) { Status ws = writer->pre_release_callback->Callback( writer->sequence, disable_memtable, writer->log_used, index++, pre_release_callback_cnt); if (!ws.ok()) { status = pre_release_cb_status = ws; break; } } if (seq_per_batch_) { assert(writer->batch_cnt); next_sequence += writer->batch_cnt; } else if (writer->ShouldWriteToMemtable()) { next_sequence += WriteBatchInternal::Count(writer->batch); } } } if (status.ok()) { PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time); if (!parallel) { // w.sequence will be set inside InsertInto w.status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), &flush_scheduler_, &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*recovery_log_number*/, this, parallel, seq_per_batch_, batch_per_txn_); } else { write_group.last_sequence = last_sequence; write_thread_.LaunchParallelMemTableWriters(&write_group); in_parallel_group = true; // Each parallel follower is doing each own writes. The leader should // also do its own. if (w.ShouldWriteToMemtable()) { ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); assert(w.sequence == current_sequence); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt, batch_per_txn_, write_options.memtable_insert_hint_per_batch); } } if (seq_used != nullptr) { *seq_used = w.sequence; } } } PERF_TIMER_START(write_pre_and_post_process_time); if (!io_s.ok()) { // Check WriteToWAL status IOStatusCheck(io_s); } if (!w.CallbackFailed()) { if (!io_s.ok()) { assert(pre_release_cb_status.ok()); } else { WriteStatusCheck(pre_release_cb_status); } } else { assert(pre_release_cb_status.ok()); } bool should_exit_batch_group = true; if (in_parallel_group) { // CompleteParallelWorker returns true if this thread should // handle exit, false means somebody else did should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w); } if (should_exit_batch_group) { if (status.ok()) { for (auto* tmp_w : write_group) { assert(tmp_w); if (tmp_w->post_memtable_callback) { Status tmp_s = (*tmp_w->post_memtable_callback)(last_sequence, disable_memtable); // TODO: propagate the execution status of post_memtable_callback to // caller. assert(tmp_s.ok()); } } // Note: if we are to resume after non-OK statuses we need to revisit how // we reacts to non-OK statuses here. versions_->SetLastSequence(last_sequence); } MemTableInsertStatusCheck(w.status); write_thread_.ExitAsBatchGroupLeader(write_group, status); } if (status.ok()) { status = w.FinalStatus(); } return status; } Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, UserWriteCallback* user_write_cb, uint64_t* log_used, uint64_t log_ref, bool disable_memtable, uint64_t* seq_used) { PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); WriteContext write_context; WriteThread::Writer w(write_options, my_batch, callback, user_write_cb, log_ref, disable_memtable, /*_batch_cnt=*/0, /*_pre_release_callback=*/nullptr); write_thread_.JoinBatchGroup(&w); TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"); if (w.state == WriteThread::STATE_GROUP_LEADER) { WriteThread::WriteGroup wal_write_group; if (w.callback && !w.callback->AllowWriteBatching()) { write_thread_.WaitForMemTableWriters(); } LogContext log_context(!write_options.disableWAL && write_options.sync); // PreprocessWrite does its own perf timing. PERF_TIMER_STOP(write_pre_and_post_process_time); w.status = PreprocessWrite(write_options, &log_context, &write_context); PERF_TIMER_START(write_pre_and_post_process_time); // This can set non-OK status if callback fail. last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group); const SequenceNumber current_sequence = write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1; size_t total_count = 0; size_t total_byte_size = 0; if (w.status.ok()) { // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // grabs but does not seem thread-safe. if (tracer_) { InstrumentedMutexLock lock(&trace_mutex_); if (tracer_ != nullptr && tracer_->IsWriteOrderPreserved()) { for (auto* writer : wal_write_group) { // TODO: maybe handle the tracing status? tracer_->Write(writer->batch).PermitUncheckedError(); } } } SequenceNumber next_sequence = current_sequence; for (auto* writer : wal_write_group) { assert(writer); if (writer->CheckCallback(this)) { if (writer->ShouldWriteToMemtable()) { writer->sequence = next_sequence; size_t count = WriteBatchInternal::Count(writer->batch); total_byte_size = WriteBatchInternal::AppendedByteSize( total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); next_sequence += count; total_count += count; } } } if (w.disable_wal) { has_unpersisted_data_.store(true, std::memory_order_relaxed); } write_thread_.UpdateLastSequence(current_sequence + total_count - 1); } auto stats = default_cf_internal_stats_; stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count); RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size); RecordTick(stats_, BYTES_WRITTEN, total_byte_size); RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); PERF_TIMER_STOP(write_pre_and_post_process_time); IOStatus io_s; io_s.PermitUncheckedError(); // Allow io_s to be uninitialized if (w.status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1); RecordTick(stats_, WRITE_DONE_BY_SELF, 1); if (wal_write_group.size > 1) { stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther, wal_write_group.size - 1); RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); } assert(log_context.log_file_number_size); LogFileNumberSize& log_file_number_size = *(log_context.log_file_number_size); io_s = WriteToWAL(wal_write_group, log_context.writer, log_used, log_context.need_log_sync, log_context.need_log_dir_sync, current_sequence, log_file_number_size); w.status = io_s; } if (!io_s.ok()) { // Check WriteToWAL status IOStatusCheck(io_s); } else if (!w.CallbackFailed()) { WriteStatusCheck(w.status); } VersionEdit synced_wals; if (log_context.need_log_sync) { InstrumentedMutexLock l(&log_write_mutex_); if (w.status.ok()) { MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, &synced_wals); } else { MarkLogsNotSynced(logfile_number_); } } if (w.status.ok() && synced_wals.IsWalAddition()) { InstrumentedMutexLock l(&mutex_); // TODO: plumb Env::IOActivity, Env::IOPriority const ReadOptions read_options; w.status = ApplyWALToManifest(read_options, write_options, &synced_wals); } write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); } // NOTE: the memtable_write_group is declared before the following // `if` statement because its lifetime needs to be longer // that the inner context of the `if` as a reference to it // may be used further below within the outer _write_thread WriteThread::WriteGroup memtable_write_group; if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time); assert(w.ShouldWriteToMemtable()); write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group); if (memtable_write_group.size > 1 && immutable_db_options_.allow_concurrent_memtable_write) { write_thread_.LaunchParallelMemTableWriters(&memtable_write_group); } else { memtable_write_group.status = WriteBatchInternal::InsertInto( memtable_write_group, w.sequence, column_family_memtables_.get(), &flush_scheduler_, &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_); versions_->SetLastSequence(memtable_write_group.last_sequence); write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); } } else { // NOTE: the memtable_write_group is never really used, // so we need to set its status to pass ASSERT_STATUS_CHECKED memtable_write_group.status.PermitUncheckedError(); } if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_CALLER) { write_thread_.SetMemWritersEachStride(&w); } if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time); assert(w.ShouldWriteToMemtable()); ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/, write_options.memtable_insert_hint_per_batch); PERF_TIMER_STOP(write_memtable_time); PERF_TIMER_START(write_pre_and_post_process_time); if (write_thread_.CompleteParallelMemTableWriter(&w)) { MemTableInsertStatusCheck(w.status); versions_->SetLastSequence(w.write_group->last_sequence); write_thread_.ExitAsMemTableWriter(&w, *w.write_group); } } if (seq_used != nullptr) { *seq_used = w.sequence; } assert(w.state == WriteThread::STATE_COMPLETED); return w.FinalStatus(); } Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t log_ref, SequenceNumber seq, const size_t sub_batch_cnt) { PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); WriteThread::Writer w(write_options, my_batch, callback, /*user_write_cb=*/nullptr, log_ref, false /*disable_memtable*/); if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) { w.sequence = seq; size_t total_count = WriteBatchInternal::Count(my_batch); InternalStats* stats = default_cf_internal_stats_; stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count); RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time); ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/, write_options.memtable_insert_hint_per_batch); if (write_options.disableWAL) { has_unpersisted_data_.store(true, std::memory_order_relaxed); } PERF_TIMER_START(write_pre_and_post_process_time); } size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1; if (pending_cnt == 0) { // switch_cv_ waits until pending_memtable_writes_ = 0. Locking its mutex // before notify ensures that cv is in waiting state when it is notified // thus not missing the update to pending_memtable_writes_ even though it is // not modified under the mutex. std::lock_guard lck(switch_mutex_); switch_cv_.notify_all(); } WriteStatusCheck(w.status); if (!w.FinalStatus().ok()) { return w.FinalStatus(); } return Status::OK(); } // The 2nd write queue. If enabled it will be used only for WAL-only writes. // This is the only queue that updates LastPublishedSequence which is only // applicable in a two-queue setting. Status DBImpl::WriteImplWALOnly( WriteThread* write_thread, const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, UserWriteCallback* user_write_cb, uint64_t* log_used, const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, const PublishLastSeq publish_last_seq, const bool disable_memtable) { PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, user_write_cb, log_ref, disable_memtable, sub_batch_cnt, pre_release_callback); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); write_thread->JoinBatchGroup(&w); assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER); if (w.state == WriteThread::STATE_COMPLETED) { if (log_used != nullptr) { *log_used = w.log_used; } if (seq_used != nullptr) { *seq_used = w.sequence; } return w.FinalStatus(); } // else we are the leader of the write batch group assert(w.state == WriteThread::STATE_GROUP_LEADER); if (publish_last_seq == kDoPublishLastSeq) { Status status; // Currently we only use kDoPublishLastSeq in unordered_write assert(immutable_db_options_.unordered_write); WriteContext write_context; if (error_handler_.IsDBStopped()) { status = error_handler_.GetBGError(); } // TODO(myabandeh): Make preliminary checks thread-safe so we could do them // without paying the cost of obtaining the mutex. if (status.ok()) { LogContext log_context; status = PreprocessWrite(write_options, &log_context, &write_context); WriteStatusCheckOnLocked(status); } if (!status.ok()) { WriteThread::WriteGroup write_group; write_thread->EnterAsBatchGroupLeader(&w, &write_group); write_thread->ExitAsBatchGroupLeader(write_group, status); return status; } } else { PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_FOR_WAIT_GUARD(write_delay_time); InstrumentedMutexLock lock(&mutex_); Status status = DelayWrite(/*num_bytes=*/0ull, *write_thread, write_options); PERF_TIMER_STOP(write_delay_time); PERF_TIMER_START(write_pre_and_post_process_time); if (!status.ok()) { WriteThread::WriteGroup write_group; write_thread->EnterAsBatchGroupLeader(&w, &write_group); write_thread->ExitAsBatchGroupLeader(write_group, status); return status; } } WriteThread::WriteGroup write_group; uint64_t last_sequence; write_thread->EnterAsBatchGroupLeader(&w, &write_group); // Note: no need to update last_batch_group_size_ here since the batch writes // to WAL only // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // grabs but does not seem thread-safe. if (tracer_) { InstrumentedMutexLock lock(&trace_mutex_); if (tracer_ != nullptr && tracer_->IsWriteOrderPreserved()) { for (auto* writer : write_group) { // TODO: maybe handle the tracing status? tracer_->Write(writer->batch).PermitUncheckedError(); } } } size_t pre_release_callback_cnt = 0; size_t total_byte_size = 0; for (auto* writer : write_group) { assert(writer); if (writer->CheckCallback(this)) { total_byte_size = WriteBatchInternal::AppendedByteSize( total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); if (writer->pre_release_callback) { pre_release_callback_cnt++; } } } const bool concurrent_update = true; // Update stats while we are an exclusive group leader, so we know // that nobody else can be writing to these particular stats. // We're optimistic, updating the stats before we successfully // commit. That lets us release our leader status early. auto stats = default_cf_internal_stats_; stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size, concurrent_update); RecordTick(stats_, BYTES_WRITTEN, total_byte_size); stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1, concurrent_update); RecordTick(stats_, WRITE_DONE_BY_SELF); auto write_done_by_other = write_group.size - 1; if (write_done_by_other > 0) { stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther, write_done_by_other, concurrent_update); RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); } RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_wal_time); // LastAllocatedSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL size_t seq_inc = 0 /* total_count */; if (assign_order == kDoAssignOrder) { size_t total_batch_cnt = 0; for (auto* writer : write_group) { assert(writer->batch_cnt || !seq_per_batch_); if (!writer->CallbackFailed()) { total_batch_cnt += writer->batch_cnt; } } seq_inc = total_batch_cnt; } Status status; if (!write_options.disableWAL) { IOStatus io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); status = io_s; // last_sequence may not be set if there is an error // This error checking and return is moved up to avoid using uninitialized // last_sequence. if (!io_s.ok()) { IOStatusCheck(io_s); write_thread->ExitAsBatchGroupLeader(write_group, status); return status; } } else { // Otherwise we inc seq number to do solely the seq allocation last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); } size_t memtable_write_cnt = 0; auto curr_seq = last_sequence + 1; for (auto* writer : write_group) { if (writer->CallbackFailed()) { continue; } writer->sequence = curr_seq; if (assign_order == kDoAssignOrder) { assert(writer->batch_cnt || !seq_per_batch_); curr_seq += writer->batch_cnt; } if (!writer->disable_memtable) { memtable_write_cnt++; } // else seq advances only by memtable writes } if (status.ok() && write_options.sync) { assert(!write_options.disableWAL); // Requesting sync with two_write_queues_ is expected to be very rare. We // hance provide a simple implementation that is not necessarily efficient. if (manual_wal_flush_) { status = FlushWAL(true); } else { status = SyncWAL(); } } PERF_TIMER_START(write_pre_and_post_process_time); if (!w.CallbackFailed()) { WriteStatusCheck(status); } if (status.ok()) { size_t index = 0; for (auto* writer : write_group) { if (!writer->CallbackFailed() && writer->pre_release_callback) { assert(writer->sequence != kMaxSequenceNumber); Status ws = writer->pre_release_callback->Callback( writer->sequence, disable_memtable, writer->log_used, index++, pre_release_callback_cnt); if (!ws.ok()) { status = ws; break; } } } } if (publish_last_seq == kDoPublishLastSeq) { versions_->SetLastSequence(last_sequence + seq_inc); // Currently we only use kDoPublishLastSeq in unordered_write assert(immutable_db_options_.unordered_write); } if (immutable_db_options_.unordered_write && status.ok()) { pending_memtable_writes_ += memtable_write_cnt; } write_thread->ExitAsBatchGroupLeader(write_group, status); if (status.ok()) { status = w.FinalStatus(); } if (seq_used != nullptr) { *seq_used = w.sequence; } return status; } void DBImpl::WriteStatusCheckOnLocked(const Status& status) { // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. InstrumentedMutexLock l(&mutex_); assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok()); if (immutable_db_options_.paranoid_checks && !status.ok() && !status.IsBusy() && !status.IsIncomplete()) { // Maybe change the return status to void? error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback); } } void DBImpl::WriteStatusCheck(const Status& status) { // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok()); if (immutable_db_options_.paranoid_checks && !status.ok() && !status.IsBusy() && !status.IsIncomplete()) { mutex_.Lock(); // Maybe change the return status to void? error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback); mutex_.Unlock(); } } void DBImpl::IOStatusCheck(const IOStatus& io_status) { // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. if ((immutable_db_options_.paranoid_checks && !io_status.ok() && !io_status.IsBusy() && !io_status.IsIncomplete()) || io_status.IsIOFenced()) { mutex_.Lock(); // Maybe change the return status to void? error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback); mutex_.Unlock(); } else { // Force writable file to be continue writable. logs_.back().writer->file()->reset_seen_error(); } } void DBImpl::MemTableInsertStatusCheck(const Status& status) { // A non-OK status here indicates that the state implied by the // WAL has diverged from the in-memory state. This could be // because of a corrupt write_batch (very bad), or because the // client specified an invalid column family and didn't specify // ignore_missing_column_families. if (!status.ok()) { mutex_.Lock(); assert(!error_handler_.IsBGWorkStopped()); // Maybe change the return status to void? error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable); mutex_.Unlock(); } } Status DBImpl::PreprocessWrite(const WriteOptions& write_options, LogContext* log_context, WriteContext* write_context) { assert(write_context != nullptr && log_context != nullptr); Status status; if (error_handler_.IsDBStopped()) { InstrumentedMutexLock l(&mutex_); status = error_handler_.GetBGError(); } PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time); if (UNLIKELY(status.ok() && total_log_size_ > GetMaxTotalWalSize())) { assert(versions_); InstrumentedMutexLock l(&mutex_); const ColumnFamilySet* const column_families = versions_->GetColumnFamilySet(); assert(column_families); size_t num_cfs = column_families->NumberOfColumnFamilies(); assert(num_cfs >= 1); if (num_cfs > 1) { WaitForPendingWrites(); status = SwitchWAL(write_context); } } if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) { // Before a new memtable is added in SwitchMemtable(), // write_buffer_manager_->ShouldFlush() will keep returning true. If another // thread is writing to another DB with the same write buffer, they may also // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. InstrumentedMutexLock l(&mutex_); WaitForPendingWrites(); status = HandleWriteBufferManagerFlush(write_context); } if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) { InstrumentedMutexLock l(&mutex_); status = TrimMemtableHistory(write_context); } if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { InstrumentedMutexLock l(&mutex_); WaitForPendingWrites(); status = ScheduleFlushes(write_context); } PERF_TIMER_STOP(write_scheduling_flushes_compactions_time); PERF_TIMER_GUARD(write_pre_and_post_process_time); if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || write_controller_.NeedsDelay()))) { PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_FOR_WAIT_GUARD(write_delay_time); // We don't know size of curent batch so that we always use the size // for previous one. It might create a fairness issue that expiration // might happen for smaller writes but larger writes can go through. // Can optimize it if it is an issue. InstrumentedMutexLock l(&mutex_); status = DelayWrite(last_batch_group_size_, write_thread_, write_options); PERF_TIMER_START(write_pre_and_post_process_time); } // If memory usage exceeded beyond a certain threshold, // write_buffer_manager_->ShouldStall() returns true to all threads writing to // all DBs and writers will be stalled. // It does soft checking because WriteBufferManager::buffer_limit_ has already // exceeded at this point so no new write (including current one) will go // through until memory usage is decreased. if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldStall())) { default_cf_internal_stats_->AddDBStats( InternalStats::kIntStatsWriteBufferManagerLimitStopsCounts, 1, true /* concurrent */); if (write_options.no_slowdown) { status = Status::Incomplete("Write stall"); } else { InstrumentedMutexLock l(&mutex_); WriteBufferManagerStallWrites(); } } InstrumentedMutexLock l(&log_write_mutex_); if (status.ok() && log_context->need_log_sync) { // Wait until the parallel syncs are finished. Any sync process has to sync // the front log too so it is enough to check the status of front() // We do a while loop since log_sync_cv_ is signalled when any sync is // finished // Note: there does not seem to be a reason to wait for parallel sync at // this early step but it is not important since parallel sync (SyncWAL) and // need_log_sync are usually not used together. while (logs_.front().IsSyncing()) { log_sync_cv_.Wait(); } for (auto& log : logs_) { // This is just to prevent the logs to be synced by a parallel SyncWAL // call. We will do the actual syncing later after we will write to the // WAL. // Note: there does not seem to be a reason to set this early before we // actually write to the WAL log.PrepareForSync(); } } else { log_context->need_log_sync = false; } log_context->writer = logs_.back().writer; log_context->need_log_dir_sync = log_context->need_log_dir_sync && !log_dir_synced_; log_context->log_file_number_size = std::addressof(alive_log_files_.back()); return status; } Status DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, WriteBatch* tmp_batch, WriteBatch** merged_batch, size_t* write_with_wal, WriteBatch** to_be_cached_state) { assert(write_with_wal != nullptr); assert(tmp_batch != nullptr); assert(*to_be_cached_state == nullptr); *write_with_wal = 0; auto* leader = write_group.leader; assert(!leader->disable_wal); // Same holds for all in the batch group if (write_group.size == 1 && !leader->CallbackFailed() && leader->batch->GetWalTerminationPoint().is_cleared()) { // we simply write the first WriteBatch to WAL if the group only // contains one batch, that batch should be written to the WAL, // and the batch is not wanting to be truncated *merged_batch = leader->batch; if (WriteBatchInternal::IsLatestPersistentState(*merged_batch)) { *to_be_cached_state = *merged_batch; } *write_with_wal = 1; } else { // WAL needs all of the batches flattened into a single batch. // We could avoid copying here with an iov-like AddRecord // interface *merged_batch = tmp_batch; for (auto writer : write_group) { if (!writer->CallbackFailed()) { Status s = WriteBatchInternal::Append(*merged_batch, writer->batch, /*WAL_only*/ true); if (!s.ok()) { tmp_batch->Clear(); return s; } if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) { // We only need to cache the last of such write batch *to_be_cached_state = writer->batch; } (*write_with_wal)++; } } } // return merged_batch; return Status::OK(); } // When two_write_queues_ is disabled, this function is called from the only // write thread. Otherwise this must be called holding log_write_mutex_. IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, const WriteOptions& write_options, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, LogFileNumberSize& log_file_number_size) { assert(log_size != nullptr); Slice log_entry = WriteBatchInternal::Contents(&merged_batch); TEST_SYNC_POINT_CALLBACK("DBImpl::WriteToWAL:log_entry", &log_entry); auto s = merged_batch.VerifyChecksum(); if (!s.ok()) { return status_to_io_status(std::move(s)); } *log_size = log_entry.size(); // When two_write_queues_ WriteToWAL has to be protected from concurretn calls // from the two queues anyway and log_write_mutex_ is already held. Otherwise // if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord // from possible concurrent calls via the FlushWAL by the application. const bool needs_locking = manual_wal_flush_ && !two_write_queues_; // Due to performance cocerns of missed branch prediction penalize the new // manual_wal_flush_ feature (by UNLIKELY) instead of the more common case // when we do not need any locking. if (UNLIKELY(needs_locking)) { log_write_mutex_.Lock(); } IOStatus io_s = log_writer->MaybeAddUserDefinedTimestampSizeRecord( write_options, versions_->GetColumnFamiliesTimestampSizeForRecord()); if (!io_s.ok()) { return io_s; } io_s = log_writer->AddRecord(write_options, log_entry); if (UNLIKELY(needs_locking)) { log_write_mutex_.Unlock(); } if (log_used != nullptr) { *log_used = logfile_number_; } total_log_size_ += log_entry.size(); log_file_number_size.AddSize(*log_size); log_empty_ = false; return io_s; } IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, bool need_log_sync, bool need_log_dir_sync, SequenceNumber sequence, LogFileNumberSize& log_file_number_size) { IOStatus io_s; assert(!two_write_queues_); assert(!write_group.leader->disable_wal); // Same holds for all in the batch group size_t write_with_wal = 0; WriteBatch* to_be_cached_state = nullptr; WriteBatch* merged_batch; io_s = status_to_io_status(MergeBatch(write_group, &tmp_batch_, &merged_batch, &write_with_wal, &to_be_cached_state)); if (UNLIKELY(!io_s.ok())) { return io_s; } if (merged_batch == write_group.leader->batch) { write_group.leader->log_used = logfile_number_; } else if (write_with_wal > 1) { for (auto writer : write_group) { writer->log_used = logfile_number_; } } WriteBatchInternal::SetSequence(merged_batch, sequence); uint64_t log_size; // TODO: plumb Env::IOActivity, Env::IOPriority WriteOptions write_options; write_options.rate_limiter_priority = write_group.leader->rate_limiter_priority; io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used, &log_size, log_file_number_size); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; } if (io_s.ok() && need_log_sync) { StopWatch sw(immutable_db_options_.clock, stats_, WAL_FILE_SYNC_MICROS); // It's safe to access logs_ with unlocked mutex_ here because: // - we've set getting_synced=true for all logs, // so other threads won't pop from logs_ while we're here, // - only writer thread can push to logs_, and we're in // writer thread, so no one will push to logs_, // - as long as other threads don't modify it, it's safe to read // from std::deque from multiple threads concurrently. // // Sync operation should work with locked log_write_mutex_, because: // when DBOptions.manual_wal_flush_ is set, // FlushWAL function will be invoked by another thread. // if without locked log_write_mutex_, the log file may get data // corruption const bool needs_locking = manual_wal_flush_ && !two_write_queues_; if (UNLIKELY(needs_locking)) { log_write_mutex_.Lock(); } if (io_s.ok()) { for (auto& log : logs_) { IOOptions opts; io_s = WritableFileWriter::PrepareIOOptions(write_options, opts); if (!io_s.ok()) { break; } io_s = log.writer->file()->Sync(opts, immutable_db_options_.use_fsync); if (!io_s.ok()) { break; } } } if (UNLIKELY(needs_locking)) { log_write_mutex_.Unlock(); } if (io_s.ok() && need_log_dir_sync) { // We only sync WAL directory the first time WAL syncing is // requested, so that in case users never turn on WAL sync, // we can avoid the disk I/O in the write code path. io_s = directories_.GetWalDir()->FsyncWithDirOptions( IOOptions(), nullptr, DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); } } if (merged_batch == &tmp_batch_) { tmp_batch_.Clear(); } if (io_s.ok()) { auto stats = default_cf_internal_stats_; if (need_log_sync) { stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1); RecordTick(stats_, WAL_FILE_SYNCED); } stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size); RecordTick(stats_, WAL_FILE_BYTES, log_size); stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal); RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); for (auto* writer : write_group) { if (!writer->CallbackFailed()) { writer->CheckPostWalWriteCallback(); } } } return io_s; } IOStatus DBImpl::ConcurrentWriteToWAL( const WriteThread::WriteGroup& write_group, uint64_t* log_used, SequenceNumber* last_sequence, size_t seq_inc) { IOStatus io_s; assert(two_write_queues_ || immutable_db_options_.unordered_write); assert(!write_group.leader->disable_wal); // Same holds for all in the batch group WriteBatch tmp_batch; size_t write_with_wal = 0; WriteBatch* to_be_cached_state = nullptr; WriteBatch* merged_batch; io_s = status_to_io_status(MergeBatch(write_group, &tmp_batch, &merged_batch, &write_with_wal, &to_be_cached_state)); if (UNLIKELY(!io_s.ok())) { return io_s; } // We need to lock log_write_mutex_ since logs_ and alive_log_files might be // pushed back concurrently log_write_mutex_.Lock(); if (merged_batch == write_group.leader->batch) { write_group.leader->log_used = logfile_number_; } else if (write_with_wal > 1) { for (auto writer : write_group) { writer->log_used = logfile_number_; } } *last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); auto sequence = *last_sequence + 1; WriteBatchInternal::SetSequence(merged_batch, sequence); log::Writer* log_writer = logs_.back().writer; LogFileNumberSize& log_file_number_size = alive_log_files_.back(); assert(log_writer->get_log_number() == log_file_number_size.number); uint64_t log_size; // TODO: plumb Env::IOActivity, Env::IOPriority WriteOptions write_options; write_options.rate_limiter_priority = write_group.leader->rate_limiter_priority; io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used, &log_size, log_file_number_size); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; } log_write_mutex_.Unlock(); if (io_s.ok()) { const bool concurrent = true; auto stats = default_cf_internal_stats_; stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size, concurrent); RecordTick(stats_, WAL_FILE_BYTES, log_size); stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal, concurrent); RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); for (auto* writer : write_group) { if (!writer->CallbackFailed()) { writer->CheckPostWalWriteCallback(); } } } return io_s; } Status DBImpl::WriteRecoverableState() { mutex_.AssertHeld(); if (!cached_recoverable_state_empty_) { bool dont_care_bool; SequenceNumber next_seq; if (two_write_queues_) { log_write_mutex_.Lock(); } SequenceNumber seq; if (two_write_queues_) { seq = versions_->FetchAddLastAllocatedSequence(0); } else { seq = versions_->LastSequence(); } WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1); auto status = WriteBatchInternal::InsertInto( &cached_recoverable_state_, column_family_memtables_.get(), &flush_scheduler_, &trim_history_scheduler_, true, 0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool, seq_per_batch_); auto last_seq = next_seq - 1; if (two_write_queues_) { versions_->FetchAddLastAllocatedSequence(last_seq - seq); versions_->SetLastPublishedSequence(last_seq); } versions_->SetLastSequence(last_seq); if (two_write_queues_) { log_write_mutex_.Unlock(); } if (status.ok() && recoverable_state_pre_release_callback_) { const bool DISABLE_MEMTABLE = true; for (uint64_t sub_batch_seq = seq + 1; sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) { uint64_t const no_log_num = 0; // Unlock it since the callback might end up locking mutex. e.g., // AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB mutex_.Unlock(); status = recoverable_state_pre_release_callback_->Callback( sub_batch_seq, !DISABLE_MEMTABLE, no_log_num, 0, 1); mutex_.Lock(); } } if (status.ok()) { cached_recoverable_state_.Clear(); cached_recoverable_state_empty_ = true; } else { // FIXME: !ok status is untested } return status; } return Status::OK(); } void DBImpl::SelectColumnFamiliesForAtomicFlush( autovector* selected_cfds, const autovector& provided_candidate_cfds, FlushReason flush_reason) { mutex_.AssertHeld(); assert(selected_cfds); autovector candidate_cfds; // Generate candidate cfds if not provided if (provided_candidate_cfds.empty()) { for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) { if (!cfd->IsDropped() && cfd->initialized()) { cfd->Ref(); candidate_cfds.push_back(cfd); } } } else { candidate_cfds = provided_candidate_cfds; } for (ColumnFamilyData* cfd : candidate_cfds) { if (cfd->IsDropped()) { continue; } if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load() || IsRecoveryFlush(flush_reason)) { selected_cfds->push_back(cfd); } } // Unref the newly generated candidate cfds (when not provided) in // `candidate_cfds` if (provided_candidate_cfds.empty()) { for (auto candidate_cfd : candidate_cfds) { candidate_cfd->UnrefAndTryDelete(); } } } // Assign sequence number for atomic flush. void DBImpl::AssignAtomicFlushSeq(const autovector& cfds) { assert(immutable_db_options_.atomic_flush); auto seq = versions_->LastSequence(); for (auto cfd : cfds) { cfd->imm()->AssignAtomicFlushSeq(seq); } } Status DBImpl::SwitchWAL(WriteContext* write_context) { mutex_.AssertHeld(); assert(write_context != nullptr); Status status; if (alive_log_files_.begin()->getting_flushed) { return status; } auto oldest_alive_log = alive_log_files_.begin()->number; bool flush_wont_release_oldest_log = false; if (allow_2pc()) { auto oldest_log_with_uncommitted_prep = logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep(); assert(oldest_log_with_uncommitted_prep == 0 || oldest_log_with_uncommitted_prep >= oldest_alive_log); if (oldest_log_with_uncommitted_prep > 0 && oldest_log_with_uncommitted_prep == oldest_alive_log) { if (unable_to_release_oldest_log_) { // we already attempted to flush all column families dependent on // the oldest alive log but the log still contained uncommitted // transactions so there is still nothing that we can do. return status; } else { ROCKS_LOG_WARN( immutable_db_options_.info_log, "Unable to release oldest log due to uncommitted transaction"); unable_to_release_oldest_log_ = true; flush_wont_release_oldest_log = true; } } } if (!flush_wont_release_oldest_log) { // we only mark this log as getting flushed if we have successfully // flushed all data in this log. If this log contains outstanding prepared // transactions then we cannot flush this log until those transactions are // commited. unable_to_release_oldest_log_ = false; alive_log_files_.begin()->getting_flushed = true; } ROCKS_LOG_INFO( immutable_db_options_.info_log, "Flushing all column families with data in WAL number %" PRIu64 ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64, oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread autovector cfds; if (immutable_db_options_.atomic_flush) { SelectColumnFamiliesForAtomicFlush(&cfds); } else { for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; } if (cfd->OldestLogToKeep() <= oldest_alive_log) { cfds.push_back(cfd); } } MaybeFlushStatsCF(&cfds); } WriteThread::Writer nonmem_w; if (two_write_queues_) { nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } for (const auto cfd : cfds) { cfd->Ref(); status = SwitchMemtable(cfd, write_context); cfd->UnrefAndTryDelete(); if (!status.ok()) { break; } } if (two_write_queues_) { nonmem_write_thread_.ExitUnbatched(&nonmem_w); } if (status.ok()) { if (immutable_db_options_.atomic_flush) { AssignAtomicFlushSeq(cfds); } for (auto cfd : cfds) { cfd->imm()->FlushRequested(); if (!immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest({cfd}, FlushReason::kWalFull, &flush_req); SchedulePendingFlush(flush_req); } } if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest(cfds, FlushReason::kWalFull, &flush_req); SchedulePendingFlush(flush_req); } MaybeScheduleFlushOrCompaction(); } return status; } Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { mutex_.AssertHeld(); assert(write_context != nullptr); Status status; // Before a new memtable is added in SwitchMemtable(), // write_buffer_manager_->ShouldFlush() will keep returning true. If another // thread is writing to another DB with the same write buffer, they may also // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread autovector cfds; if (immutable_db_options_.atomic_flush) { SelectColumnFamiliesForAtomicFlush(&cfds); } else { ColumnFamilyData* cfd_picked = nullptr; SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; } if (!cfd->mem()->IsEmpty() && !cfd->imm()->IsFlushPendingOrRunning()) { // We only consider flush on CFs with bytes in the mutable memtable, // and no immutable memtables for which flush has yet to finish. If // we triggered flush on CFs already trying to flush, we would risk // creating too many immutable memtables leading to write stalls. uint64_t seq = cfd->mem()->GetCreationSeq(); if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { cfd_picked = cfd; seq_num_for_cf_picked = seq; } } } if (cfd_picked != nullptr) { cfds.push_back(cfd_picked); } MaybeFlushStatsCF(&cfds); } if (!cfds.empty()) { ROCKS_LOG_INFO( immutable_db_options_.info_log, "Flushing triggered to alleviate write buffer memory usage. Write " "buffer is using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".", write_buffer_manager_->memory_usage(), write_buffer_manager_->buffer_size()); } WriteThread::Writer nonmem_w; if (two_write_queues_) { nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } for (const auto cfd : cfds) { if (cfd->mem()->IsEmpty()) { continue; } cfd->Ref(); status = SwitchMemtable(cfd, write_context); cfd->UnrefAndTryDelete(); if (!status.ok()) { break; } } if (two_write_queues_) { nonmem_write_thread_.ExitUnbatched(&nonmem_w); } if (status.ok()) { if (immutable_db_options_.atomic_flush) { AssignAtomicFlushSeq(cfds); } for (const auto cfd : cfds) { cfd->imm()->FlushRequested(); if (!immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest({cfd}, FlushReason::kWriteBufferManager, &flush_req); SchedulePendingFlush(flush_req); } } if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager, &flush_req); SchedulePendingFlush(flush_req); } MaybeScheduleFlushOrCompaction(); } return status; } uint64_t DBImpl::GetMaxTotalWalSize() const { uint64_t max_total_wal_size = max_total_wal_size_.load(std::memory_order_acquire); if (max_total_wal_size > 0) { return max_total_wal_size; } return 4 * max_total_in_memory_state_.load(std::memory_order_acquire); } // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the leader for write_thread Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, const WriteOptions& write_options) { mutex_.AssertHeld(); uint64_t start_time = 0; bool delayed = false; { // To avoid parallel timed delays (bad throttling), only support them // on the primary write queue. uint64_t delay; if (&write_thread == &write_thread_) { delay = write_controller_.GetDelay(immutable_db_options_.clock, num_bytes); } else { assert(num_bytes == 0); delay = 0; } TEST_SYNC_POINT("DBImpl::DelayWrite:Start"); start_time = immutable_db_options_.clock->NowMicros(); if (delay > 0) { if (write_options.no_slowdown) { return Status::Incomplete("Write stall"); } TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); // Notify write_thread about the stall so it can setup a barrier and // fail any pending writers with no_slowdown write_thread.BeginWriteStall(); mutex_.Unlock(); TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone"); // We will delay the write until we have slept for `delay` microseconds // or we don't need a delay anymore. We check for cancellation every 1ms // (slightly longer because WriteController minimum delay is 1ms, in // case of sleep imprecision, rounding, etc.) const uint64_t kDelayInterval = 1001; uint64_t stall_end = start_time + delay; while (write_controller_.NeedsDelay()) { if (immutable_db_options_.clock->NowMicros() >= stall_end) { // We already delayed this write `delay` microseconds break; } delayed = true; // Sleep for 0.001 seconds immutable_db_options_.clock->SleepForMicroseconds(kDelayInterval); } mutex_.Lock(); write_thread.EndWriteStall(); } // Don't wait if there's a background error that is not pending recovery // since recovery might never be attempted. while ((error_handler_.GetBGError().ok() || error_handler_.IsRecoveryInProgress()) && write_controller_.IsStopped() && !shutting_down_.load(std::memory_order_relaxed)) { if (write_options.no_slowdown) { return Status::Incomplete("Write stall"); } delayed = true; // Notify write_thread about the stall so it can setup a barrier and // fail any pending writers with no_slowdown write_thread.BeginWriteStall(); if (&write_thread == &write_thread_) { TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); } else { TEST_SYNC_POINT("DBImpl::DelayWrite:NonmemWait"); } bg_cv_.Wait(); TEST_SYNC_POINT_CALLBACK("DBImpl::DelayWrite:AfterWait", &mutex_); write_thread.EndWriteStall(); } } assert(!delayed || !write_options.no_slowdown); if (delayed) { auto time_delayed = immutable_db_options_.clock->NowMicros() - start_time; default_cf_internal_stats_->AddDBStats( InternalStats::kIntStatsWriteStallMicros, time_delayed); RecordTick(stats_, STALL_MICROS, time_delayed); RecordInHistogram(stats_, WRITE_STALL, time_delayed); } // If DB is not in read-only mode and write_controller is not stopping // writes, we can ignore any background errors and allow the write to // proceed Status s; if (write_controller_.IsStopped()) { if (!shutting_down_.load(std::memory_order_relaxed)) { // If writes are still stopped and db not shutdown, it means we bailed // due to a background error s = Status::Incomplete(error_handler_.GetBGError().ToString()); } else { s = Status::ShutdownInProgress("stalled writes"); } } if (error_handler_.IsDBStopped()) { s = error_handler_.GetBGError(); } return s; } // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue void DBImpl::WriteBufferManagerStallWrites() { mutex_.AssertHeld(); // First block future writer threads who want to add themselves to the queue // of WriteThread. write_thread_.BeginWriteStall(); mutex_.Unlock(); // Change the state to State::Blocked. static_cast(wbm_stall_.get()) ->SetState(WBMStallInterface::State::BLOCKED); // Then WriteBufferManager will add DB instance to its queue // and block this thread by calling WBMStallInterface::Block(). write_buffer_manager_->BeginWriteStall(wbm_stall_.get()); wbm_stall_->Block(); mutex_.Lock(); // Stall has ended. Signal writer threads so that they can add // themselves to the WriteThread queue for writes. write_thread_.EndWriteStall(); } Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, WriteBatch* my_batch) { assert(write_options.low_pri); // This is called outside the DB mutex. Although it is safe to make the call, // the consistency condition is not guaranteed to hold. It's OK to live with // it in this case. // If we need to speed compaction, it means the compaction is left behind // and we start to limit low pri writes to a limit. if (write_controller_.NeedSpeedupCompaction()) { if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) { // For 2PC, we only rate limit prepare, not commit. return Status::OK(); } if (write_options.no_slowdown) { return Status::Incomplete("Low priority write stall"); } else { assert(my_batch != nullptr); // Rate limit those writes. The reason that we don't completely wait // is that in case the write is heavy, low pri writes may never have // a chance to run. Now we guarantee we are still slowly making // progress. PERF_TIMER_FOR_WAIT_GUARD(write_delay_time); auto data_size = my_batch->GetDataSize(); while (data_size > 0) { size_t allowed = write_controller_.low_pri_rate_limiter()->RequestToken( data_size, 0 /* alignment */, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kWrite); data_size -= allowed; } } } return Status::OK(); } void DBImpl::MaybeFlushStatsCF(autovector* cfds) { assert(cfds != nullptr); if (!cfds->empty() && immutable_db_options_.persist_stats_to_disk) { ColumnFamilyData* cfd_stats = versions_->GetColumnFamilySet()->GetColumnFamily( kPersistentStatsColumnFamilyName); if (cfd_stats != nullptr && !cfd_stats->mem()->IsEmpty()) { for (ColumnFamilyData* cfd : *cfds) { if (cfd == cfd_stats) { // stats CF already included in cfds return; } } // force flush stats CF when its log number is less than all other CF's // log numbers bool force_flush_stats_cf = true; for (auto* loop_cfd : *versions_->GetColumnFamilySet()) { if (loop_cfd == cfd_stats) { continue; } if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) { force_flush_stats_cf = false; } } if (force_flush_stats_cf) { cfds->push_back(cfd_stats); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Force flushing stats CF with automated flush " "to avoid holding old logs"); } } } } Status DBImpl::TrimMemtableHistory(WriteContext* context) { autovector cfds; ColumnFamilyData* tmp_cfd; while ((tmp_cfd = trim_history_scheduler_.TakeNextColumnFamily()) != nullptr) { cfds.push_back(tmp_cfd); } for (auto& cfd : cfds) { autovector to_delete; bool trimmed = cfd->imm()->TrimHistory(&context->memtables_to_free_, cfd->mem()->MemoryAllocatedBytes()); if (trimmed) { context->superversion_context.NewSuperVersion(); assert(context->superversion_context.new_superversion.get() != nullptr); cfd->InstallSuperVersion(&context->superversion_context, &mutex_); } if (cfd->UnrefAndTryDelete()) { cfd = nullptr; } } return Status::OK(); } Status DBImpl::ScheduleFlushes(WriteContext* context) { autovector cfds; if (immutable_db_options_.atomic_flush) { SelectColumnFamiliesForAtomicFlush(&cfds); for (auto cfd : cfds) { cfd->Ref(); } flush_scheduler_.Clear(); } else { ColumnFamilyData* tmp_cfd; while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { cfds.push_back(tmp_cfd); } MaybeFlushStatsCF(&cfds); } Status status; WriteThread::Writer nonmem_w; if (two_write_queues_) { nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } TEST_SYNC_POINT_CALLBACK("DBImpl::ScheduleFlushes:PreSwitchMemtable", nullptr); for (auto& cfd : cfds) { if (status.ok() && !cfd->mem()->IsEmpty()) { status = SwitchMemtable(cfd, context); } if (cfd->UnrefAndTryDelete()) { cfd = nullptr; } } if (two_write_queues_) { nonmem_write_thread_.ExitUnbatched(&nonmem_w); } if (status.ok()) { if (immutable_db_options_.atomic_flush) { AssignAtomicFlushSeq(cfds); FlushRequest flush_req; GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull, &flush_req); SchedulePendingFlush(flush_req); } else { for (auto* cfd : cfds) { FlushRequest flush_req; GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req); SchedulePendingFlush(flush_req); } } MaybeScheduleFlushOrCompaction(); } return status; } void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/, const MemTableInfo& mem_table_info) { if (immutable_db_options_.listeners.size() == 0U) { return; } if (shutting_down_.load(std::memory_order_acquire)) { return; } mutex_.Unlock(); for (const auto& listener : immutable_db_options_.listeners) { listener->OnMemTableSealed(mem_table_info); } mutex_.Lock(); } // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue // REQUIRES: this thread is currently at the front of the 2nd writer queue if // two_write_queues_ is true (This is to simplify the reasoning.) Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutex_.AssertHeld(); assert(lock_wal_count_ == 0); // TODO: plumb Env::IOActivity, Env::IOPriority const ReadOptions read_options; const WriteOptions write_options; log::Writer* new_log = nullptr; MemTable* new_mem = nullptr; IOStatus io_s; // Recoverable state is persisted in WAL. After memtable switch, WAL might // be deleted, so we write the state to memtable to be persisted as well. Status s = WriteRecoverableState(); if (!s.ok()) { return s; } // Attempt to switch to a new memtable and trigger flush of old. // Do this without holding the dbmutex lock. assert(versions_->prev_log_number() == 0); if (two_write_queues_) { log_write_mutex_.Lock(); } bool creating_new_log = !log_empty_; if (two_write_queues_) { log_write_mutex_.Unlock(); } uint64_t recycle_log_number = 0; // If file deletion is disabled, don't recycle logs since it'll result in // the file getting renamed if (creating_new_log && immutable_db_options_.recycle_log_file_num && !log_recycle_files_.empty() && IsFileDeletionsEnabled()) { recycle_log_number = log_recycle_files_.front(); } uint64_t new_log_number = creating_new_log ? versions_->NewFileNumber() : logfile_number_; const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); // Set memtable_info for memtable sealed callback MemTableInfo memtable_info; memtable_info.cf_name = cfd->GetName(); memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber(); memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber(); memtable_info.num_entries = cfd->mem()->num_entries(); memtable_info.num_deletes = cfd->mem()->num_deletes(); if (!cfd->ioptions()->persist_user_defined_timestamps && cfd->user_comparator()->timestamp_size() > 0) { const Slice& newest_udt = cfd->mem()->GetNewestUDT(); memtable_info.newest_udt.assign(newest_udt.data(), newest_udt.size()); } // Log this later after lock release. It may be outdated, e.g., if background // flush happens before logging, but that should be ok. int num_imm_unflushed = cfd->imm()->NumNotFlushed(); const auto preallocate_block_size = GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); mutex_.Unlock(); if (creating_new_log) { // TODO: Write buffer size passed in should be max of all CF's instead // of mutable_cf_options.write_buffer_size. io_s = CreateWAL(write_options, new_log_number, recycle_log_number, preallocate_block_size, &new_log); if (s.ok()) { s = io_s; } } if (s.ok()) { SequenceNumber seq = versions_->LastSequence(); new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); context->superversion_context.NewSuperVersion(); ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] New memtable created with log file: #%" PRIu64 ". Immutable memtables: %d.\n", cfd->GetName().c_str(), new_log_number, num_imm_unflushed); // There should be no concurrent write as the thread is at the front of // writer queue cfd->mem()->ConstructFragmentedRangeTombstones(); } mutex_.Lock(); if (recycle_log_number != 0) { // Since renaming the file is done outside DB mutex, we need to ensure // concurrent full purges don't delete the file while we're recycling it. // To achieve that we hold the old log number in the recyclable list until // after it has been renamed. assert(log_recycle_files_.front() == recycle_log_number); log_recycle_files_.pop_front(); } if (s.ok() && creating_new_log) { InstrumentedMutexLock l(&log_write_mutex_); assert(new_log != nullptr); if (!logs_.empty()) { // Alway flush the buffer of the last log before switching to a new one log::Writer* cur_log_writer = logs_.back().writer; if (error_handler_.IsRecoveryInProgress()) { // In recovery path, we force another try of writing WAL buffer. cur_log_writer->file()->reset_seen_error(); } io_s = cur_log_writer->WriteBuffer(write_options); if (s.ok()) { s = io_s; } if (!s.ok()) { ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64 " WAL file\n", cfd->GetName().c_str(), cur_log_writer->get_log_number(), new_log_number); } } if (s.ok()) { logfile_number_ = new_log_number; log_empty_ = true; log_dir_synced_ = false; logs_.emplace_back(logfile_number_, new_log); alive_log_files_.emplace_back(logfile_number_); } } if (!s.ok()) { // how do we fail if we're not creating new log? assert(creating_new_log); delete new_mem; delete new_log; context->superversion_context.new_superversion.reset(); // We may have lost data from the WritableFileBuffer in-memory buffer for // the current log, so treat it as a fatal error and set bg_error if (!io_s.ok()) { error_handler_.SetBGError(io_s, BackgroundErrorReason::kMemTable); } else { error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable); } // Read back bg_error in order to get the right severity s = error_handler_.GetBGError(); return s; } bool empty_cf_updated = false; if (immutable_db_options_.track_and_verify_wals_in_manifest && !immutable_db_options_.allow_2pc && creating_new_log) { // In non-2pc mode, WALs become obsolete if they do not contain unflushed // data. Updating the empty CF's log number might cause some WALs to become // obsolete. So we should track the WAL obsoletion event before actually // updating the empty CF's log number. uint64_t min_wal_number_to_keep = versions_->PreComputeMinLogNumberWithUnflushedData(logfile_number_); if (min_wal_number_to_keep > versions_->GetWalSet().GetMinWalNumberToKeep()) { // Get a snapshot of the empty column families. // LogAndApply may release and reacquire db // mutex, during that period, column family may become empty (e.g. its // flush succeeds), then it affects the computed min_log_number_to_keep, // so we take a snapshot for consistency of column family data // status. If a column family becomes non-empty afterwards, its active log // should still be the created new log, so the min_log_number_to_keep is // not affected. autovector empty_cfs; for (auto cf : *versions_->GetColumnFamilySet()) { if (cf->IsEmpty()) { empty_cfs.push_back(cf); } } VersionEdit wal_deletion; wal_deletion.DeleteWalsBefore(min_wal_number_to_keep); s = versions_->LogAndApplyToDefaultColumnFamily( read_options, write_options, &wal_deletion, &mutex_, directories_.GetDbDir()); if (!s.ok() && versions_->io_status().IsIOError()) { error_handler_.SetBGError(versions_->io_status(), BackgroundErrorReason::kManifestWrite); } if (!s.ok()) { return s; } for (auto cf : empty_cfs) { if (cf->IsEmpty()) { cf->SetLogNumber(logfile_number_); // MEMPURGE: No need to change this, because new adds // should still receive new sequence numbers. cf->mem()->SetCreationSeq(versions_->LastSequence()); } // cf may become non-empty. } empty_cf_updated = true; } } if (!empty_cf_updated) { for (auto cf : *versions_->GetColumnFamilySet()) { // all this is just optimization to delete logs that // are no longer needed -- if CF is empty, that means it // doesn't need that particular log to stay alive, so we just // advance the log number. no need to persist this in the manifest if (cf->IsEmpty()) { if (creating_new_log) { cf->SetLogNumber(logfile_number_); } cf->mem()->SetCreationSeq(versions_->LastSequence()); } } } cfd->mem()->SetNextLogNumber(logfile_number_); assert(new_mem != nullptr); cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); new_mem->Ref(); cfd->SetMemtable(new_mem); InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, mutable_cf_options); // Notify client that memtable is sealed, now that we have successfully // installed a new memtable NotifyOnMemTableSealed(cfd, memtable_info); // It is possible that we got here without checking the value of i_os, but // that is okay. If we did, it most likely means that s was already an error. // In any case, ignore any unchecked error for i_os here. io_s.PermitUncheckedError(); return s; } size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const { mutex_.AssertHeld(); size_t bsize = static_cast(write_buffer_size / 10 + write_buffer_size); // Some users might set very high write_buffer_size and rely on // max_total_wal_size or other parameters to control the WAL size. if (mutable_db_options_.max_total_wal_size > 0) { bsize = std::min( bsize, static_cast(mutable_db_options_.max_total_wal_size)); } if (immutable_db_options_.db_write_buffer_size > 0) { bsize = std::min(bsize, immutable_db_options_.db_write_buffer_size); } if (immutable_db_options_.write_buffer_manager && immutable_db_options_.write_buffer_manager->enabled()) { bsize = std::min( bsize, immutable_db_options_.write_buffer_manager->buffer_size()); } return bsize; } // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { // Pre-allocate size of write batch conservatively. // 8 bytes are taken by header, 4 bytes for count, 1 byte for type, // and we allocate 11 extra bytes for key length, as well as value length. WriteBatch batch(key.size() + value.size() + 24, 0 /* max_bytes */, opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); Status s = batch.Put(column_family, key, value); if (!s.ok()) { return s; } return Write(opt, &batch); } Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& ts, const Slice& value) { ColumnFamilyHandle* default_cf = DefaultColumnFamily(); assert(default_cf); const Comparator* const default_cf_ucmp = default_cf->GetComparator(); assert(default_cf_ucmp); WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, opt.protection_bytes_per_key, default_cf_ucmp->timestamp_size()); Status s = batch.Put(column_family, key, ts, value); if (!s.ok()) { return s; } return Write(opt, &batch); } Status DB::PutEntity(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const WideColumns& columns) { const ColumnFamilyHandle* const default_cf = DefaultColumnFamily(); assert(default_cf); const Comparator* const default_cf_ucmp = default_cf->GetComparator(); assert(default_cf_ucmp); WriteBatch batch(/* reserved_bytes */ 0, /* max_bytes */ 0, options.protection_bytes_per_key, default_cf_ucmp->timestamp_size()); const Status s = batch.PutEntity(column_family, key, columns); if (!s.ok()) { return s; } return Write(options, &batch); } Status DB::PutEntity(const WriteOptions& options, const Slice& key, const AttributeGroups& attribute_groups) { ColumnFamilyHandle* default_cf = DefaultColumnFamily(); assert(default_cf); const Comparator* const default_cf_ucmp = default_cf->GetComparator(); assert(default_cf_ucmp); WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, options.protection_bytes_per_key, default_cf_ucmp->timestamp_size()); const Status s = batch.PutEntity(key, attribute_groups); if (!s.ok()) { return s; } return Write(options, &batch); } Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key) { WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); Status s = batch.Delete(column_family, key); if (!s.ok()) { return s; } return Write(opt, &batch); } Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& ts) { ColumnFamilyHandle* default_cf = DefaultColumnFamily(); assert(default_cf); const Comparator* const default_cf_ucmp = default_cf->GetComparator(); assert(default_cf_ucmp); WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, opt.protection_bytes_per_key, default_cf_ucmp->timestamp_size()); Status s = batch.Delete(column_family, key, ts); if (!s.ok()) { return s; } return Write(opt, &batch); } Status DB::SingleDelete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key) { WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); Status s = batch.SingleDelete(column_family, key); if (!s.ok()) { return s; } return Write(opt, &batch); } Status DB::SingleDelete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& ts) { ColumnFamilyHandle* default_cf = DefaultColumnFamily(); assert(default_cf); const Comparator* const default_cf_ucmp = default_cf->GetComparator(); assert(default_cf_ucmp); WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, opt.protection_bytes_per_key, default_cf_ucmp->timestamp_size()); Status s = batch.SingleDelete(column_family, key, ts); if (!s.ok()) { return s; } return Write(opt, &batch); } Status DB::DeleteRange(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& begin_key, const Slice& end_key) { WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); Status s = batch.DeleteRange(column_family, begin_key, end_key); if (!s.ok()) { return s; } return Write(opt, &batch); } Status DB::DeleteRange(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& begin_key, const Slice& end_key, const Slice& ts) { ColumnFamilyHandle* default_cf = DefaultColumnFamily(); assert(default_cf); const Comparator* const default_cf_ucmp = default_cf->GetComparator(); assert(default_cf_ucmp); WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, opt.protection_bytes_per_key, default_cf_ucmp->timestamp_size()); Status s = batch.DeleteRange(column_family, begin_key, end_key, ts); if (!s.ok()) { return s; } return Write(opt, &batch); } Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); Status s = batch.Merge(column_family, key, value); if (!s.ok()) { return s; } return Write(opt, &batch); } Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& ts, const Slice& value) { ColumnFamilyHandle* default_cf = DefaultColumnFamily(); assert(default_cf); const Comparator* const default_cf_ucmp = default_cf->GetComparator(); assert(default_cf_ucmp); WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, opt.protection_bytes_per_key, default_cf_ucmp->timestamp_size()); Status s = batch.Merge(column_family, key, ts, value); if (!s.ok()) { return s; } return Write(opt, &batch); } } // namespace ROCKSDB_NAMESPACE