Merge branch 'main' into JniReaderForTableIterator

This commit is contained in:
Swaminathan Balachandran 2024-09-17 12:05:54 -07:00 committed by GitHub
commit ccc6b591f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
55 changed files with 761 additions and 401 deletions

View file

@ -80,7 +80,7 @@ jobs:
steps:
- uses: actions/checkout@v4.1.0
- uses: "./.github/actions/pre-steps"
- run: PORTABLE=1 make V=1 -j32 valgrind_test
- run: make V=1 -j32 valgrind_test
- uses: "./.github/actions/post-steps"
build-windows-vs2022-avx2:
if: ${{ github.repository_owner == 'facebook' }}

View file

@ -630,6 +630,11 @@ VALGRIND_VER := $(join $(VALGRIND_VER),valgrind)
VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full
# Not yet supported: --show-leak-kinds=definite,possible,reachable --errors-for-leak-kinds=definite,possible,reachable
# Work around valgrind hanging on systems with limited internet access
ifneq ($(shell which git 2>/dev/null && git config --get https.proxy),)
export DEBUGINFOD_URLS=
endif
TEST_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(TEST_LIB_SOURCES) $(MOCK_LIB_SOURCES)) $(GTEST)
BENCH_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(BENCH_LIB_SOURCES))
CACHE_BENCH_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(CACHE_BENCH_LIB_SOURCES))
@ -1164,16 +1169,16 @@ ubsan_crash_test_with_best_efforts_recovery: clean
$(MAKE) clean
full_valgrind_test:
ROCKSDB_FULL_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 $(MAKE) valgrind_check
ROCKSDB_FULL_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 PORTABLE=1 $(MAKE) valgrind_check
full_valgrind_test_some:
ROCKSDB_FULL_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 $(MAKE) valgrind_check_some
ROCKSDB_FULL_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 PORTABLE=1 $(MAKE) valgrind_check_some
valgrind_test:
ROCKSDB_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 $(MAKE) valgrind_check
ROCKSDB_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 PORTABLE=1 $(MAKE) valgrind_check
valgrind_test_some:
ROCKSDB_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 $(MAKE) valgrind_check_some
ROCKSDB_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 PORTABLE=1 $(MAKE) valgrind_check_some
valgrind_check: $(TESTS)
$(MAKE) DRIVER="$(VALGRIND_VER) $(VALGRIND_OPTS)" gen_parallel_tests

View file

@ -417,6 +417,8 @@ cpp_binary_wrapper(name="ldb", srcs=["tools/ldb.cc"], deps=[":rocksdb_tools_lib"
cpp_binary_wrapper(name="db_stress", srcs=["db_stress_tool/db_stress.cc"], deps=[":rocksdb_stress_lib"], extra_preprocessor_flags=[], extra_bench_libs=False)
cpp_binary_wrapper(name="db_bench", srcs=["tools/db_bench.cc"], deps=[":rocksdb_tools_lib"], extra_preprocessor_flags=[], extra_bench_libs=False)
cpp_binary_wrapper(name="cache_bench", srcs=["cache/cache_bench.cc"], deps=[":rocksdb_cache_bench_tools_lib"], extra_preprocessor_flags=[], extra_bench_libs=False)
cpp_binary_wrapper(name="ribbon_bench", srcs=["microbench/ribbon_bench.cc"], deps=[], extra_preprocessor_flags=[], extra_bench_libs=True)

View file

@ -201,6 +201,10 @@ def generate_targets(repo_path, deps_map):
TARGETS.add_binary(
"db_stress", ["db_stress_tool/db_stress.cc"], [":rocksdb_stress_lib"]
)
# db_bench binary
TARGETS.add_binary(
"db_bench", ["tools/db_bench.cc"], [":rocksdb_tools_lib"]
)
# cache_bench binary
TARGETS.add_binary(
"cache_bench", ["cache/cache_bench.cc"], [":rocksdb_cache_bench_tools_lib"]

26
cache/cache_test.cc vendored
View file

@ -886,6 +886,32 @@ TEST_P(CacheTest, ApplyToAllEntriesDuringResize) {
ASSERT_EQ(special_count, kSpecialCount);
}
TEST_P(CacheTest, ApplyToHandleTest) {
std::string callback_state;
const auto callback = [&](const Slice& key, Cache::ObjectPtr value,
size_t charge,
const Cache::CacheItemHelper* helper) {
callback_state = std::to_string(DecodeKey(key)) + "," +
std::to_string(DecodeValue(value)) + "," +
std::to_string(charge);
assert(helper == &CacheTest::kHelper);
};
std::vector<std::string> inserted;
for (int i = 0; i < 10; ++i) {
Insert(i, i * 2, i + 1);
inserted.push_back(std::to_string(i) + "," + std::to_string(i * 2) + "," +
std::to_string(i + 1));
}
for (int i = 0; i < 10; ++i) {
Cache::Handle* handle = cache_->Lookup(EncodeKey(i));
cache_->ApplyToHandle(cache_.get(), handle, callback);
EXPECT_EQ(inserted[i], callback_state);
cache_->Release(handle);
}
}
TEST_P(CacheTest, DefaultShardBits) {
// Prevent excessive allocation (to save time & space)
estimated_value_size_ = 100000;

16
cache/clock_cache.cc vendored
View file

@ -1444,6 +1444,22 @@ const Cache::CacheItemHelper* BaseHyperClockCache<Table>::GetCacheItemHelper(
return h->helper;
}
template <class Table>
void BaseHyperClockCache<Table>::ApplyToHandle(
Cache* cache, Handle* handle,
const std::function<void(const Slice& key, Cache::ObjectPtr value,
size_t charge, const CacheItemHelper* helper)>&
callback) {
BaseHyperClockCache<Table>* cache_ptr =
static_cast<BaseHyperClockCache<Table>*>(cache);
auto h = static_cast<const typename Table::HandleImpl*>(handle);
UniqueId64x2 unhashed;
auto hash_seed = cache_ptr->GetShard(h->GetHash()).GetTable().GetHashSeed();
callback(
ClockCacheShard<Table>::ReverseHash(h->hashed_key, &unhashed, hash_seed),
h->value, h->GetTotalCharge(), h->helper);
}
namespace {
// For each cache shard, estimate what the table load factor would be if

6
cache/clock_cache.h vendored
View file

@ -1128,6 +1128,12 @@ class BaseHyperClockCache : public ShardedCache<ClockCacheShard<Table>> {
const CacheItemHelper* GetCacheItemHelper(Handle* handle) const override;
void ApplyToHandle(
Cache* cache, Handle* handle,
const std::function<void(const Slice& key, Cache::ObjectPtr obj,
size_t charge, const CacheItemHelper* helper)>&
callback) override;
void ReportProblems(
const std::shared_ptr<Logger>& /*info_log*/) const override;
};

11
cache/lru_cache.cc vendored
View file

@ -677,6 +677,17 @@ const Cache::CacheItemHelper* LRUCache::GetCacheItemHelper(
return h->helper;
}
void LRUCache::ApplyToHandle(
Cache* cache, Handle* handle,
const std::function<void(const Slice& key, ObjectPtr value, size_t charge,
const CacheItemHelper* helper)>& callback) {
auto cache_ptr = static_cast<LRUCache*>(cache);
auto h = static_cast<const LRUHandle*>(handle);
callback(h->key(), h->value,
h->GetCharge(cache_ptr->GetShard(0).metadata_charge_policy_),
h->helper);
}
size_t LRUCache::TEST_GetLRUSize() {
return SumOverShards([](LRUCacheShard& cs) { return cs.TEST_GetLRUSize(); });
}

6
cache/lru_cache.h vendored
View file

@ -452,6 +452,12 @@ class LRUCache
size_t GetCharge(Handle* handle) const override;
const CacheItemHelper* GetCacheItemHelper(Handle* handle) const override;
void ApplyToHandle(
Cache* cache, Handle* handle,
const std::function<void(const Slice& key, ObjectPtr obj, size_t charge,
const CacheItemHelper* helper)>& callback)
override;
// Retrieves number of elements in LRU, for unit test purpose only.
size_t TEST_GetLRUSize();
// Retrieves high pri pool ratio.

View file

@ -253,6 +253,7 @@ TEST_F(DBTieredSecondaryCacheTest, BasicTest) {
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.compression = kLZ4Compression;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
// Disable paranoid_file_checks so that flush will not read back the newly
@ -364,6 +365,7 @@ TEST_F(DBTieredSecondaryCacheTest, BasicMultiGetTest) {
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.compression = kLZ4Compression;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.paranoid_file_checks = false;
@ -506,6 +508,7 @@ TEST_F(DBTieredSecondaryCacheTest, WaitAllTest) {
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.compression = kLZ4Compression;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.paranoid_file_checks = false;
@ -606,6 +609,7 @@ TEST_F(DBTieredSecondaryCacheTest, ReadyBeforeWaitAllTest) {
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.compression = kLZ4Compression;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.statistics = CreateDBStatistics();
@ -717,6 +721,7 @@ TEST_F(DBTieredSecondaryCacheTest, IterateTest) {
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.compression = kLZ4Compression;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.paranoid_file_checks = false;
@ -784,6 +789,7 @@ TEST_P(DBTieredAdmPolicyTest, CompressedOnlyTest) {
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.compression = kLZ4Compression;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
size_t comp_cache_usage = compressed_secondary_cache()->TEST_GetUsage();
@ -836,6 +842,7 @@ TEST_P(DBTieredAdmPolicyTest, CompressedCacheAdmission) {
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.compression = kLZ4Compression;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
size_t comp_cache_usage = compressed_secondary_cache()->TEST_GetUsage();
@ -937,6 +944,7 @@ TEST_F(DBTieredSecondaryCacheTest, FSBufferTest) {
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.compression = kLZ4Compression;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.statistics = CreateDBStatistics();
options.env = wrap_env.get();

View file

@ -1565,28 +1565,6 @@ Status ColumnFamilyData::SetOptions(
return s;
}
// REQUIRES: DB mutex held
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
return Env::WLTH_NOT_SET;
}
if (level == 0) {
return Env::WLTH_MEDIUM;
}
int base_level = current_->storage_info()->base_level();
// L1: medium, L2: long, ...
if (level - base_level >= 2) {
return Env::WLTH_EXTREME;
} else if (level < base_level) {
// There is no restriction which prevents level passed in to be smaller
// than base_level.
return Env::WLTH_MEDIUM;
}
return static_cast<Env::WriteLifeTimeHint>(
level - base_level + static_cast<int>(Env::WLTH_MEDIUM));
}
Status ColumnFamilyData::AddDirectories(
std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
Status s;

View file

@ -511,8 +511,6 @@ class ColumnFamilyData {
return initial_cf_options_;
}
Env::WriteLifeTimeHint CalculateSSTWriteHint(int level);
// created_dirs remembers directory created, so that we don't need to call
// the same data creation operation again.
Status AddDirectories(

View file

@ -686,12 +686,11 @@ bool Compaction::KeyRangeNotExistsBeyondOutputLevel(
};
// Mark (or clear) each file that is being compacted
void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
void Compaction::MarkFilesBeingCompacted(bool being_compacted) const {
for (size_t i = 0; i < num_input_levels(); i++) {
for (size_t j = 0; j < inputs_[i].size(); j++) {
assert(mark_as_compacted ? !inputs_[i][j]->being_compacted
: inputs_[i][j]->being_compacted);
inputs_[i][j]->being_compacted = mark_as_compacted;
assert(being_compacted != inputs_[i][j]->being_compacted);
inputs_[i][j]->being_compacted = being_compacted;
}
}
}
@ -735,7 +734,7 @@ uint64_t Compaction::CalculateTotalInputSize() const {
return size;
}
void Compaction::ReleaseCompactionFiles(Status status) {
void Compaction::ReleaseCompactionFiles(const Status& status) {
MarkFilesBeingCompacted(false);
cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);
}

View file

@ -230,7 +230,7 @@ class Compaction {
// Delete this compaction from the list of running compactions.
//
// Requirement: DB mutex held
void ReleaseCompactionFiles(Status status);
void ReleaseCompactionFiles(const Status& status);
// Returns the summary of the compaction in "output" with maximum "len"
// in bytes. The caller is responsible for the memory management of
@ -435,13 +435,13 @@ class Compaction {
const int start_level,
const int output_level);
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool being_compacted) const;
private:
Status InitInputTableProperties();
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
// get the smallest and largest key present in files to be compacted
static void GetBoundaryKeys(VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs,

View file

@ -251,12 +251,13 @@ void CompactionJob::Prepare() {
// Generate file_levels_ for compaction before making Iterator
auto* c = compact_->compaction;
ColumnFamilyData* cfd = c->column_family_data();
[[maybe_unused]] ColumnFamilyData* cfd = c->column_family_data();
assert(cfd != nullptr);
assert(cfd->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);
const VersionStorageInfo* storage_info = c->input_version()->storage_info();
assert(storage_info);
assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
write_hint_ = cfd->CalculateSSTWriteHint(c->output_level());
write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
if (c->ShouldFormSubcompactions()) {
@ -297,8 +298,8 @@ void CompactionJob::Prepare() {
for (const auto& each_level : *c->inputs()) {
for (const auto& fmd : each_level.files) {
std::shared_ptr<const TableProperties> tp;
Status s =
cfd->current()->GetTableProperties(read_options, &tp, fmd, nullptr);
Status s = c->input_version()->GetTableProperties(read_options, &tp,
fmd, nullptr);
if (s.ok()) {
s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping);
}

View file

@ -133,7 +133,8 @@ CompactionPicker::CompactionPicker(const ImmutableOptions& ioptions,
CompactionPicker::~CompactionPicker() = default;
// Delete this compaction from the list of running compactions.
void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
void CompactionPicker::ReleaseCompactionFiles(Compaction* c,
const Status& status) {
UnregisterCompaction(c);
if (!status.ok()) {
c->ResetNextCompactionIndex();

View file

@ -104,7 +104,7 @@ class CompactionPicker {
// Free up the files that participated in a compaction
//
// Requirement: DB mutex held
void ReleaseCompactionFiles(Compaction* c, Status status);
void ReleaseCompactionFiles(Compaction* c, const Status& status);
// Returns true if any one of the specified files are being compacted
bool AreFilesInCompaction(const std::vector<FileMetaData*>& files);

View file

@ -261,11 +261,11 @@ Status CompactionServiceCompactionJob::Run() {
auto* c = compact_->compaction;
assert(c->column_family_data() != nullptr);
assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);
const VersionStorageInfo* storage_info = c->input_version()->storage_info();
assert(storage_info);
assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
write_hint_ =
c->column_family_data()->CalculateSSTWriteHint(c->output_level());
write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
Slice begin = compaction_input_.begin;

View file

@ -6146,7 +6146,7 @@ TEST_F(DBCompactionTest, CompactionLimiter) {
std::vector<std::string> pending_compaction_cfs;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SchedulePendingCompaction::cfd", [&](void* arg) {
"EnqueuePendingCompaction::cfd", [&](void* arg) {
const std::string& cf_name =
static_cast<ColumnFamilyData*>(arg)->GetName();
pending_compaction_cfs.emplace_back(cf_name);

View file

@ -473,7 +473,7 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) {
if (s.ok()) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
SchedulePendingCompaction(cfd);
EnqueuePendingCompaction(cfd);
}
MaybeScheduleFlushOrCompaction();
}
@ -4282,7 +4282,7 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
->storage_info()
->BottommostFilesMarkedForCompaction()
.empty()) {
SchedulePendingCompaction(cfd);
EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
cf_scheduled.push_back(cfd);
}
@ -5837,6 +5837,10 @@ Status DBImpl::IngestExternalFiles(
"allow_db_generated_files.");
}
}
if (ingest_opts.move_files && ingest_opts.link_files) {
return Status::InvalidArgument(
"`move_files` and `link_files` can not both be true.");
}
}
// TODO (yanqin) maybe handle the case in which column_families have

View file

@ -2216,10 +2216,27 @@ class DBImpl : public DB {
void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushReason flush_reason, FlushRequest* req);
// Returns true if `req` is successfully enqueued.
bool SchedulePendingFlush(const FlushRequest& req);
// Below functions are for executing flush, compaction in the background. A
// dequeue is the communication channel between threads that asks for the work
// to be done and the available threads in the thread pool that pick it up to
// execute it. We use these terminologies to describe the state of the work
// and its transitions:
// 1) It becomes pending once it's successfully enqueued into the
// corresponding dequeue, a work in this state is also called unscheduled.
// Counter `unscheduled_*_` counts work in this state.
// 2) When `MaybeScheduleFlushOrCompaction` schedule a thread to run `BGWork*`
// for the work, it becomes scheduled
// Counter `bg_*_scheduled_` counts work in this state.
// 3) Once the thread start to execute `BGWork*`, the work is popped from the
// dequeue, it is now in running state
// Counter `num_running_*_` counts work in this state.
// 4) Eventually, the work is finished. We don't need to specifically track
// finished work.
void SchedulePendingCompaction(ColumnFamilyData* cfd);
// Returns true if `req` is successfully enqueued.
bool EnqueuePendingFlush(const FlushRequest& req);
void EnqueuePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id);
static void BGWorkCompaction(void* arg);
@ -2946,6 +2963,14 @@ DBOptions SanitizeOptions(const std::string& db, const DBOptions& src,
CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options);
// Return a VersionEdit for the DB's recovery when the `memtables` of the
// specified column family are obsolete. Specifically, the min log number to
// keep, and the WAL files that can be deleted.
VersionEdit GetDBRecoveryEditForObsoletingMemTables(
VersionSet* vset, const ColumnFamilyData& cfd,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker);
// Return the earliest log file to keep after the memtable flush is
// finalized.
// `cfd_to_flush` is the column family whose memtable (specified in

View file

@ -1880,7 +1880,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
Status status = versions_->LogAndApply(cfd, mutable_cf_options,
read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
c->MarkFilesBeingCompacted(false);
cfd->compaction_picker()->UnregisterCompaction(c.get());
c.reset();
@ -2377,7 +2377,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
ColumnFamilyData* loop_cfd =
req.cfd_to_max_mem_id_to_persist.begin()->first;
bool already_queued_for_flush = loop_cfd->queued_for_flush();
bool flush_req_enqueued = SchedulePendingFlush(req);
bool flush_req_enqueued = EnqueuePendingFlush(req);
if (already_queued_for_flush || flush_req_enqueued) {
loop_cfd->SetFlushSkipReschedule();
}
@ -2528,7 +2528,7 @@ Status DBImpl::AtomicFlushMemTables(
}
}
GenerateFlushRequest(cfds, flush_reason, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
MaybeScheduleFlushOrCompaction();
}
@ -2583,7 +2583,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, flush_reason, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
flush_memtable_ids.push_back(iter.second);
}
@ -2597,7 +2597,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
flush_reason,
{{cfd,
std::numeric_limits<uint64_t>::max() /* max_mem_id_to_persist */}}};
if (SchedulePendingFlush(flush_req)) {
if (EnqueuePendingFlush(flush_req)) {
cfd->SetFlushSkipReschedule();
};
}
@ -2950,6 +2950,7 @@ void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
cfd->Ref();
compaction_queue_.push_back(cfd);
cfd->set_queued_for_compaction(true);
++unscheduled_compactions_;
}
ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
@ -3005,7 +3006,7 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue(
return cfd;
}
bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
bool DBImpl::EnqueuePendingFlush(const FlushRequest& flush_req) {
mutex_.AssertHeld();
bool enqueued = false;
if (reject_new_background_jobs_) {
@ -3041,16 +3042,15 @@ bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
return enqueued;
}
void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
void DBImpl::EnqueuePendingCompaction(ColumnFamilyData* cfd) {
mutex_.AssertHeld();
if (reject_new_background_jobs_) {
return;
}
if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
TEST_SYNC_POINT_CALLBACK("SchedulePendingCompaction::cfd",
TEST_SYNC_POINT_CALLBACK("EnqueuePendingCompaction::cfd",
static_cast<void*>(cfd));
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
}
}
@ -3218,7 +3218,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
#ifndef NDEBUG
flush_req.reschedule_count += 1;
#endif /* !NDEBUG */
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
*reason = flush_reason;
*flush_rescheduled_to_retain_udt = true;
return Status::TryAgain();
@ -3678,7 +3678,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
->ComputeCompactionScore(*(c->immutable_options()),
*(c->mutable_cf_options()));
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
c.reset();
// Don't need to sleep here, because BackgroundCallCompaction
@ -3707,7 +3706,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (cfd->NeedsCompaction()) {
// Yes, we need more compactions!
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
MaybeScheduleFlushOrCompaction();
}
}
@ -3997,7 +3995,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
*(c->mutable_cf_options()));
if (!cfd->queued_for_compaction()) {
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
}
}
}
@ -4269,7 +4266,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
// Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions.
SchedulePendingCompaction(cfd);
EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
// Update max_total_in_memory_state_

View file

@ -47,7 +47,7 @@ Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family,
// compaction score
vstorage->ComputeCompactionScore(*cfd->ioptions(),
*cfd->GetLatestMutableCFOptions());
SchedulePendingCompaction(cfd);
EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
}
return Status::OK();

View file

@ -722,6 +722,38 @@ void DBImpl::DeleteObsoleteFiles() {
mutex_.Lock();
}
VersionEdit GetDBRecoveryEditForObsoletingMemTables(
VersionSet* vset, const ColumnFamilyData& cfd,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker) {
VersionEdit wal_deletion_edit;
uint64_t min_wal_number_to_keep = 0;
assert(edit_list.size() > 0);
if (vset->db_options()->allow_2pc) {
// Note that if mempurge is successful, the edit_list will
// not be applicable (contains info of new min_log number to keep,
// and level 0 file path of SST file created during normal flush,
// so both pieces of information are irrelevant after a successful
// mempurge operation).
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, cfd, edit_list, memtables, prep_tracker);
// We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed.
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfd, edit_list);
}
wal_deletion_edit.SetMinLogNumberToKeep(min_wal_number_to_keep);
if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion_edit.DeleteWalsBefore(min_wal_number_to_keep);
}
}
return wal_deletion_edit;
}
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<MemTable*>& memtables_to_flush) {
uint64_t min_log = 0;

View file

@ -1681,7 +1681,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
meta.oldest_ancester_time = current_time;
meta.epoch_number = cfd->NewEpochNumber();
{
auto write_hint = cfd->CalculateSSTWriteHint(0);
auto write_hint =
cfd->current()->storage_info()->CalculateSSTWriteHint(/*level=*/0);
mutex_.Unlock();
SequenceNumber earliest_write_conflict_snapshot;

View file

@ -1789,13 +1789,13 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWalFull, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWalFull, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
}
@ -1881,13 +1881,13 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferManager,
&flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
}
@ -2163,12 +2163,12 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
AssignAtomicFlushSeq(cfds);
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
} else {
for (auto* cfd : cfds) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
}
}
MaybeScheduleFlushOrCompaction();

View file

@ -895,6 +895,81 @@ TEST_P(DBIOCorruptionTest, ManifestCorruptionRetry) {
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(DBIOCorruptionTest, FooterReadCorruptionRetry) {
Random rnd(300);
bool retry = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"ReadFooterFromFileInternal:0", [&](void* arg) {
Slice* data = static_cast<Slice*>(arg);
if (!retry) {
std::memcpy(const_cast<char*>(data->data()),
rnd.RandomString(static_cast<int>(data->size())).c_str(),
data->size());
retry = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key1", "val1"));
Status s = Flush();
if (std::get<2>(GetParam())) {
ASSERT_OK(s);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
1);
std::string val;
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
} else {
ASSERT_NOK(s);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
ASSERT_GT(stats()->getTickerCount(SST_FOOTER_CORRUPTION_COUNT), 0);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBIOCorruptionTest, TablePropertiesCorruptionRetry) {
Random rnd(300);
bool retry = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"ReadTablePropertiesHelper:0", [&](void* arg) {
Slice* data = static_cast<Slice*>(arg);
if (!retry) {
std::memcpy(const_cast<char*>(data->data()),
rnd.RandomString(static_cast<int>(data->size())).c_str(),
data->size());
retry = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key1", "val1"));
Status s = Flush();
if (std::get<2>(GetParam())) {
ASSERT_OK(s);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
1);
std::string val;
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
} else {
ASSERT_NOK(s);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
// The parameters are - 1. Use FS provided buffer, 2. Use async IO ReadOption,
// 3. Retry with verify_and_reconstruct_read IOOption
INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,

View file

@ -113,7 +113,7 @@ Status ExternalSstFileIngestionJob::Prepare(
const std::string path_outside_db = f.external_file_path;
const std::string path_inside_db = TableFileName(
cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
if (ingestion_options_.move_files) {
if (ingestion_options_.move_files || ingestion_options_.link_files) {
status =
fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr);
if (status.ok()) {
@ -626,8 +626,7 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
DeleteInternalFiles();
consumed_seqno_count_ = 0;
files_overlap_ = false;
} else if (status.ok() && ingestion_options_.move_files &&
!ingestion_options_.allow_db_generated_files) {
} else if (status.ok() && ingestion_options_.move_files) {
// The files were moved and added successfully, remove original file links
for (IngestedFileInfo& f : files_to_ingest_) {
Status s = fs_->DeleteFile(f.external_file_path, io_opts, nullptr);

View file

@ -66,7 +66,7 @@ class ExternalSSTFileTestBase : public DBTestBase {
class ExternSSTFileLinkFailFallbackTest
: public ExternalSSTFileTestBase,
public ::testing::WithParamInterface<std::tuple<bool, bool>> {
public ::testing::WithParamInterface<std::tuple<bool, bool, bool>> {
public:
ExternSSTFileLinkFailFallbackTest() {
fs_ = std::make_shared<ExternalSSTTestFS>(env_->GetFileSystem(), true);
@ -2210,7 +2210,8 @@ TEST_P(ExternSSTFileLinkFailFallbackTest, LinkFailFallBackExternalSst) {
DestroyAndReopen(options_);
const int kNumKeys = 10000;
IngestExternalFileOptions ifo;
ifo.move_files = true;
ifo.move_files = std::get<2>(GetParam());
ifo.link_files = !ifo.move_files;
ifo.failed_move_fall_back_to_copy = failed_move_fall_back_to_copy;
std::string file_path = sst_files_dir_ + "file1.sst";
@ -2251,6 +2252,13 @@ TEST_P(ExternSSTFileLinkFailFallbackTest, LinkFailFallBackExternalSst) {
ASSERT_EQ(0, bytes_copied);
ASSERT_EQ(file_size, bytes_moved);
ASSERT_FALSE(copyfile);
Status es = env_->FileExists(file_path);
if (ifo.move_files) {
ASSERT_TRUE(es.IsNotFound());
} else {
ASSERT_OK(es);
}
} else {
// Link operation fails.
ASSERT_EQ(0, bytes_moved);
@ -2269,6 +2277,11 @@ TEST_P(ExternSSTFileLinkFailFallbackTest, LinkFailFallBackExternalSst) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(ExternSSTFileLinkFailFallbackTest,
ExternSSTFileLinkFailFallbackTest,
testing::Combine(testing::Bool(), testing::Bool(),
testing::Bool()));
class TestIngestExternalFileListener : public EventListener {
public:
void OnExternalFileIngested(DB* /*db*/,
@ -3719,19 +3732,13 @@ TEST_F(ExternalSSTFileWithTimestampTest, TimestampsNotPersistedBasic) {
INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
testing::Combine(testing::Bool(), testing::Bool()));
INSTANTIATE_TEST_CASE_P(ExternSSTFileLinkFailFallbackTest,
ExternSSTFileLinkFailFallbackTest,
testing::Values(std::make_tuple(true, false),
std::make_tuple(true, true),
std::make_tuple(false, false)));
class IngestDBGeneratedFileTest
: public ExternalSSTFileTestBase,
public ::testing::WithParamInterface<std::tuple<bool, bool>> {
public:
IngestDBGeneratedFileTest() {
ingest_opts.allow_db_generated_files = true;
ingest_opts.move_files = std::get<0>(GetParam());
ingest_opts.link_files = std::get<0>(GetParam());
ingest_opts.verify_checksums_before_ingest = std::get<1>(GetParam());
ingest_opts.snapshot_consistency = false;
}
@ -3744,10 +3751,10 @@ INSTANTIATE_TEST_CASE_P(BasicMultiConfig, IngestDBGeneratedFileTest,
testing::Combine(testing::Bool(), testing::Bool()));
TEST_P(IngestDBGeneratedFileTest, FailureCase) {
if (encrypted_env_ && ingest_opts.move_files) {
if (encrypted_env_ && ingest_opts.link_files) {
// FIXME: should fail ingestion or support this combination.
ROCKSDB_GTEST_SKIP(
"Encrypted env and move_files do not work together, as we reopen the "
"Encrypted env and link_files do not work together, as we reopen the "
"file after linking it which appends an extra encryption prefix.");
return;
}
@ -3943,7 +3950,7 @@ TEST_P(IngestDBGeneratedFileTest2, NotOverlapWithDB) {
ingest_opts.allow_global_seqno = std::get<1>(GetParam());
ingest_opts.allow_blocking_flush = std::get<2>(GetParam());
ingest_opts.fail_if_not_bottommost_level = std::get<3>(GetParam());
ingest_opts.move_files = std::get<4>(GetParam());
ingest_opts.link_files = std::get<4>(GetParam());
do {
SCOPED_TRACE("option_config_ = " + std::to_string(option_config_));

View file

@ -630,7 +630,7 @@ Status FlushJob::MemPurge() {
new_mem->SetNextLogNumber(mems_[0]->GetNextLogNumber());
// This addition will not trigger another flush, because
// we do not call SchedulePendingFlush().
// we do not call EnqueuePendingFlush().
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
new_mem->Ref();
// Piggyback FlushJobInfo on the first flushed memtable.
@ -861,7 +861,7 @@ Status FlushJob::WriteLevel0Table() {
std::vector<BlobFileAddition> blob_file_additions;
{
auto write_hint = cfd_->CalculateSSTWriteHint(0);
auto write_hint = base_->storage_info()->CalculateSSTWriteHint(/*level=*/0);
Env::IOPriority io_priority = GetRateLimiterPriority();
db_mutex_->Unlock();
if (log_buffer_) {

View file

@ -582,37 +582,28 @@ Status MemTableList::TryInstallMemtableFlushResults(
// TODO(myabandeh): Not sure how batch_count could be 0 here.
if (batch_count > 0) {
uint64_t min_wal_number_to_keep = 0;
assert(edit_list.size() > 0);
if (vset->db_options()->allow_2pc) {
// Note that if mempurge is successful, the edit_list will
// not be applicable (contains info of new min_log number to keep,
// and level 0 file path of SST file created during normal flush,
// so both pieces of information are irrelevant after a successful
// mempurge operation).
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
// We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed.
VersionEdit edit;
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
if (memtables_to_flush.size() == memlist.size()) {
// TODO(yuzhangyu): remove this testing code once the
// `GetEditForDroppingCurrentVersion` API is used by the atomic data
// replacement. This function can get the same edits for wal related
// fields, and some duplicated fields as contained already in edit_list
// for column family's recovery.
edit = GetEditForDroppingCurrentVersion(cfd, vset, prep_tracker);
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
edit = GetDBRecoveryEditForObsoletingMemTables(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
}
VersionEdit wal_deletion;
wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep);
if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (min_wal_number_to_keep >
vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
}
TEST_SYNC_POINT_CALLBACK(
"MemTableList::TryInstallMemtableFlushResults:"
"AfterComputeMinWalToKeep",
nullptr);
}
edit_list.push_back(&wal_deletion);
#else
edit = GetDBRecoveryEditForObsoletingMemTables(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
TEST_SYNC_POINT_CALLBACK(
"MemTableList::TryInstallMemtableFlushResults:"
"AfterComputeMinWalToKeep",
nullptr);
edit_list.push_back(&edit);
const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
to_delete, mu](const Status& status) {
@ -1026,4 +1017,38 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number,
ResetTrimHistoryNeeded();
}
VersionEdit MemTableList::GetEditForDroppingCurrentVersion(
const ColumnFamilyData* cfd, VersionSet* vset,
LogsWithPrepTracker* prep_tracker) const {
assert(cfd);
auto& memlist = current_->memlist_;
if (memlist.empty()) {
return VersionEdit();
}
uint64_t max_next_log_number = 0;
autovector<VersionEdit*> edit_list;
autovector<MemTable*> memtables_to_drop;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
memtables_to_drop.push_back(m);
max_next_log_number = std::max(m->GetNextLogNumber(), max_next_log_number);
}
// Check the obsoleted MemTables' impact on WALs related to DB's recovery (min
// log number to keep, a delta of WAL files to delete).
VersionEdit edit_with_log_number;
edit_with_log_number.SetPrevLogNumber(0);
edit_with_log_number.SetLogNumber(max_next_log_number);
edit_list.push_back(&edit_with_log_number);
VersionEdit edit = GetDBRecoveryEditForObsoletingMemTables(
vset, *cfd, edit_list, memtables_to_drop, prep_tracker);
// Set fields related to the column family's recovery.
edit.SetColumnFamily(cfd->GetID());
edit.SetPrevLogNumber(0);
edit.SetLogNumber(max_next_log_number);
return edit;
}
} // namespace ROCKSDB_NAMESPACE

View file

@ -447,6 +447,12 @@ class MemTableList {
void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete);
// This API is only used by atomic date replacement. To get an edit for
// dropping the current `MemTableListVersion`.
VersionEdit GetEditForDroppingCurrentVersion(
const ColumnFamilyData* cfd, VersionSet* vset,
LogsWithPrepTracker* prep_tracker) const;
private:
friend Status InstallMemtableAtomicFlushResults(
const autovector<MemTableList*>* imm_lists,

View file

@ -451,7 +451,8 @@ class Repairer {
meta.file_creation_time = current_time;
SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
auto write_hint = cfd->CalculateSSTWriteHint(0);
auto write_hint =
cfd->current()->storage_info()->CalculateSSTWriteHint(/*level=*/0);
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
auto range_del_iter = mem->NewRangeTombstoneIterator(

View file

@ -44,8 +44,8 @@
#include "table/compaction_merging_iterator.h"
#if USE_COROUTINES
#include "folly/experimental/coro/BlockingWait.h"
#include "folly/experimental/coro/Collect.h"
#include "folly/coro/BlockingWait.h"
#include "folly/coro/Collect.h"
#endif
#include "file/filename.h"
#include "file/random_access_file_reader.h"
@ -4919,6 +4919,27 @@ bool VersionStorageInfo::RangeMightExistAfterSortedRun(
return false;
}
Env::WriteLifeTimeHint VersionStorageInfo::CalculateSSTWriteHint(
int level) const {
if (compaction_style_ != kCompactionStyleLevel) {
return Env::WLTH_NOT_SET;
}
if (level == 0) {
return Env::WLTH_MEDIUM;
}
// L1: medium, L2: long, ...
if (level - base_level_ >= 2) {
return Env::WLTH_EXTREME;
} else if (level < base_level_) {
// There is no restriction which prevents level passed in to be smaller
// than base_level.
return Env::WLTH_MEDIUM;
}
return static_cast<Env::WriteLifeTimeHint>(
level - base_level_ + static_cast<int>(Env::WLTH_MEDIUM));
}
void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files,
std::vector<uint64_t>* live_blob_files) const {
assert(live_table_files);

View file

@ -49,8 +49,8 @@
#include "db/write_controller.h"
#include "env/file_system_tracer.h"
#if USE_COROUTINES
#include "folly/experimental/coro/BlockingWait.h"
#include "folly/experimental/coro/Collect.h"
#include "folly/coro/BlockingWait.h"
#include "folly/coro/Collect.h"
#endif
#include "monitoring/instrumented_mutex.h"
#include "options/db_options.h"
@ -626,6 +626,8 @@ class VersionStorageInfo {
const Slice& largest_user_key,
int last_level, int last_l0_idx);
Env::WriteLifeTimeHint CalculateSSTWriteHint(int level) const;
private:
void ComputeCompensatedSizes();
void UpdateNumNonEmptyLevels();

View file

@ -99,6 +99,7 @@ void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) {
size_added_by_compaction += filemeta->fd.GetFileSize();
}
}
assert(cur_compactions_reserved_size_ >= size_added_by_compaction);
cur_compactions_reserved_size_ -= size_added_by_compaction;
}
@ -450,7 +451,6 @@ void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
// File was added before, we will just update the size
total_files_size_ -= tracked_file->second;
total_files_size_ += file_size;
cur_compactions_reserved_size_ -= file_size;
} else {
total_files_size_ += file_size;
}

View file

@ -411,6 +411,14 @@ class Cache {
const CacheItemHelper* helper)>& callback,
const ApplyToAllEntriesOptions& opts) = 0;
// Apply a callback to a cache handle. The Cache must ensure the lifetime
// of the key passed to the callback is valid for the duration of the
// callback. The handle may not belong to the cache, but is guaranteed to
// be type compatible.
virtual void ApplyToHandle(
Cache* cache, Handle* handle,
const std::function<void(const Slice& key, ObjectPtr obj, size_t charge,
const CacheItemHelper* helper)>& callback) = 0;
// Remove all entries.
// Prerequisite: no entry is referenced.
virtual void EraseUnRefEntries() = 0;
@ -636,6 +644,15 @@ class CacheWrapper : public Cache {
target_->ApplyToAllEntries(callback, opts);
}
virtual void ApplyToHandle(
Cache* cache, Handle* handle,
const std::function<void(const Slice& key, ObjectPtr obj, size_t charge,
const CacheItemHelper* helper)>& callback)
override {
auto cache_ptr = static_cast<CacheWrapper*>(cache);
target_->ApplyToHandle(cache_ptr->target_.get(), handle, callback);
}
void EraseUnRefEntries() override { target_->EraseUnRefEntries(); }
void StartAsyncLookup(AsyncLookupHandle& async_handle) override {

View file

@ -56,7 +56,9 @@ struct ConfigOptions {
// setting
};
// When true, any unused options will be ignored and OK will be returned
// When true, any unused options will be ignored and OK will be returned.
// For options files that appear to be from the current version or earlier,
// unknown options are considered corruption regardless of this setting.
bool ignore_unknown_options = false;
// When true, any unsupported options will be ignored and OK will be returned

View file

@ -2131,10 +2131,16 @@ struct CompactRangeOptions {
// IngestExternalFileOptions is used by IngestExternalFile()
struct IngestExternalFileOptions {
// Can be set to true to move the files instead of copying them.
// Note that original file links will be removed after successful ingestion,
// unless `allow_db_generated_files` is true.
// The input files will be unlinked after successful ingestion.
// The implementation depends on hard links (LinkFile) instead of traditional
// move (RenameFile) to maximize the chances to restore to the original
// state upon failure.
bool move_files = false;
// If set to true, ingestion falls back to copy when move fails.
// Same as move_files except that input files will NOT be unlinked.
// Only one of `move_files` and `link_files` can be set at the same time.
bool link_files = false;
// If set to true, ingestion falls back to copy when hard linking fails.
// This applies to both `move_files` and `link_files`.
bool failed_move_fall_back_to_copy = true;
// If set to false, an ingested file keys could appear in existing snapshots
// that where created before the file was ingested.
@ -2209,8 +2215,6 @@ struct IngestExternalFileOptions {
// Enables ingestion of files not generated by SstFileWriter. When true:
// - Allows files to be ingested when their cf_id doesn't match the CF they
// are being ingested into.
// - Preserves original file links after successful ingestion when
// `move_files = true`.
// REQUIREMENTS:
// - Ingested files must not overlap with existing keys.
// - `write_global_seqno` must be false.

View file

@ -296,12 +296,13 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in,
return s;
}
// If the option file is not generated by a higher minor version,
// there shouldn't be any unknown option.
// If the option file is not generated by a higher version, unknown
// option should only mean corruption.
if (config_options.ignore_unknown_options &&
section == kOptionSectionVersion) {
if (db_version[0] < ROCKSDB_MAJOR || (db_version[0] == ROCKSDB_MAJOR &&
db_version[1] <= ROCKSDB_MINOR)) {
using VTuple = std::tuple<int, int, int>;
if (VTuple(db_version[0], db_version[1], db_version[2]) <=
VTuple(ROCKSDB_MAJOR, ROCKSDB_MINOR, ROCKSDB_PATCH)) {
config_options.ignore_unknown_options = false;
}
}

View file

@ -3449,44 +3449,8 @@ TEST_F(OptionsParserTest, DuplicateCFOptions) {
}
TEST_F(OptionsParserTest, IgnoreUnknownOptions) {
for (int case_id = 0; case_id < 5; case_id++) {
DBOptions db_opt;
db_opt.max_open_files = 12345;
db_opt.max_background_flushes = 301;
db_opt.max_total_wal_size = 1024;
ColumnFamilyOptions cf_opt;
std::string version_string;
bool should_ignore = true;
if (case_id == 0) {
// same version
should_ignore = false;
version_string = std::to_string(ROCKSDB_MAJOR) + "." +
std::to_string(ROCKSDB_MINOR) + ".0";
} else if (case_id == 1) {
// higher minor version
should_ignore = true;
version_string = std::to_string(ROCKSDB_MAJOR) + "." +
std::to_string(ROCKSDB_MINOR + 1) + ".0";
} else if (case_id == 2) {
// higher major version.
should_ignore = true;
version_string = std::to_string(ROCKSDB_MAJOR + 1) + ".0.0";
} else if (case_id == 3) {
// lower minor version
#if ROCKSDB_MINOR == 0
continue;
#else
version_string = std::to_string(ROCKSDB_MAJOR) + "." +
std::to_string(ROCKSDB_MINOR - 1) + ".0";
should_ignore = false;
#endif
} else {
// lower major version
should_ignore = false;
version_string = std::to_string(ROCKSDB_MAJOR - 1) + "." +
std::to_string(ROCKSDB_MINOR) + ".0";
}
auto testCase = [&](bool should_ignore, const std::string& version_string) {
SCOPED_TRACE(std::to_string(should_ignore) + ", " + version_string);
std::string options_file_content =
"# This is a testing option string.\n"
@ -3519,16 +3483,45 @@ TEST_F(OptionsParserTest, IgnoreUnknownOptions) {
RocksDBOptionsParser parser;
ASSERT_NOK(parser.Parse(kTestFileName, fs_.get(), false,
4096 /* readahead_size */));
Status parse_status = parser.Parse(kTestFileName, fs_.get(),
true /* ignore_unknown_options */,
4096 /* readahead_size */);
if (should_ignore) {
ASSERT_OK(parser.Parse(kTestFileName, fs_.get(),
true /* ignore_unknown_options */,
4096 /* readahead_size */));
ASSERT_OK(parse_status);
} else {
ASSERT_NOK(parser.Parse(kTestFileName, fs_.get(),
true /* ignore_unknown_options */,
4096 /* readahead_size */));
ASSERT_NOK(parse_status);
}
}
};
// Same version
testCase(false, GetRocksVersionAsString());
// Same except .0 patch
testCase(false, std::to_string(ROCKSDB_MAJOR) + "." +
std::to_string(ROCKSDB_MINOR) + ".0");
// Higher major version
testCase(true, std::to_string(ROCKSDB_MAJOR + 1) + "." +
std::to_string(ROCKSDB_MINOR) + ".0");
// Higher minor version
testCase(true, std::to_string(ROCKSDB_MAJOR) + "." +
std::to_string(ROCKSDB_MINOR + 1) + ".0");
// Higher patch version
testCase(true, std::to_string(ROCKSDB_MAJOR) + "." +
std::to_string(ROCKSDB_MINOR) + "." +
std::to_string(ROCKSDB_PATCH + 1));
// Lower major version
testCase(false, std::to_string(ROCKSDB_MAJOR - 1) + "." +
std::to_string(ROCKSDB_MINOR) + ".0");
#if ROCKSDB_MINOR > 0
// Lower minor version
testCase(false, std::to_string(ROCKSDB_MAJOR) + "." +
std::to_string(ROCKSDB_MINOR - 1) + ".0");
#endif
#if ROCKSDB_PATCH > 0
// Lower patch version
testCase(false, std::to_string(ROCKSDB_MAJOR) + "." +
std::to_string(ROCKSDB_MINOR - 1) + "." +
std::to_string(ROCKSDB_PATCH - 1));
#endif
}
TEST_F(OptionsParserTest, ParseVersion) {

View file

@ -680,26 +680,12 @@ Status BlockBasedTable::Open(
if (s.ok()) {
s = ReadFooterFromFile(opts, file.get(), *ioptions.fs,
prefetch_buffer.get(), file_size, &footer,
kBlockBasedTableMagicNumber);
}
// If the footer is corrupted and the FS supports checksum verification and
// correction, try reading the footer again
if (s.IsCorruption()) {
RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT);
if (CheckFSFeatureSupport(ioptions.fs.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
IOOptions retry_opts = opts;
retry_opts.verify_and_reconstruct_read = true;
s = ReadFooterFromFile(retry_opts, file.get(), *ioptions.fs,
prefetch_buffer.get(), file_size, &footer,
kBlockBasedTableMagicNumber);
RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_COUNT);
if (s.ok()) {
RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
}
}
kBlockBasedTableMagicNumber, ioptions.stats);
}
if (!s.ok()) {
if (s.IsCorruption()) {
RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT);
}
return s;
}
if (!IsSupportedFormatVersion(footer.format_version())) {

View file

@ -91,9 +91,24 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder {
uint64_t alt_hash = GetSliceHash64(alt);
std::optional<uint64_t> prev_key_hash;
std::optional<uint64_t> prev_alt_hash = hash_entries_info_.prev_alt_hash;
if (!hash_entries_info_.entries.empty()) {
prev_key_hash = hash_entries_info_.entries.back();
}
#ifdef ROCKSDB_VALGRIND_RUN
// Valgrind can report uninitialized FPs on std::optional usage. See e.g.
// https://stackoverflow.com/q/51616179
if (!prev_key_hash.has_value()) {
std::memset((void*)&prev_key_hash, 0, sizeof(prev_key_hash));
prev_key_hash.reset();
}
if (!prev_alt_hash.has_value()) {
std::memset((void*)&prev_alt_hash, 0, sizeof(prev_key_hash));
prev_alt_hash.reset();
}
#endif
// Add alt first, so that entries.back() always contains previous key
// ASSUMING a change from one alt to the next implies a change to
// corresponding key
@ -295,15 +310,6 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder {
bool detect_filter_construct_corruption_;
struct HashEntriesInfo {
#ifdef ROCKSDB_VALGRIND_RUN
HashEntriesInfo() {
// Valgrind can report uninitialized FPs on std::optional usage. See e.g.
// https://stackoverflow.com/q/51616179
std::memset((void*)&prev_alt_hash, 0, sizeof(prev_alt_hash));
prev_alt_hash = {};
}
#endif
// A deque avoids unnecessary copying of already-saved values
// and has near-minimal peak memory use.
std::deque<uint64_t> entries;

View file

@ -475,10 +475,12 @@ std::string Footer::ToString() const {
return result;
}
Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
uint64_t file_size, Footer* footer,
uint64_t enforce_table_magic_number) {
static Status ReadFooterFromFileInternal(const IOOptions& opts,
RandomAccessFileReader* file,
FileSystem& fs,
FilePrefetchBuffer* prefetch_buffer,
uint64_t file_size, Footer* footer,
uint64_t enforce_table_magic_number) {
if (file_size < Footer::kMinEncodedLength) {
return Status::Corruption("file is too short (" +
std::to_string(file_size) +
@ -516,6 +518,8 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
}
}
TEST_SYNC_POINT_CALLBACK("ReadFooterFromFileInternal:0", &footer_input);
// Check that we actually read the whole footer from the file. It may be
// that size isn't correct.
if (footer_input.size() < Footer::kMinEncodedLength) {
@ -543,6 +547,30 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
return Status::OK();
}
Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
uint64_t file_size, Footer* footer,
uint64_t enforce_table_magic_number,
Statistics* stats) {
Status s =
ReadFooterFromFileInternal(opts, file, fs, prefetch_buffer, file_size,
footer, enforce_table_magic_number);
if (s.IsCorruption() &&
CheckFSFeatureSupport(&fs, FSSupportedOps::kVerifyAndReconstructRead)) {
IOOptions new_opts = opts;
new_opts.verify_and_reconstruct_read = true;
footer->Reset();
s = ReadFooterFromFileInternal(new_opts, file, fs, prefetch_buffer,
file_size, footer,
enforce_table_magic_number);
RecordTick(stats, FILE_READ_CORRUPTION_RETRY_COUNT);
if (s.ok()) {
RecordTick(stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
}
}
return s;
}
namespace {
// Custom handling for the last byte of a block, to avoid invoking streaming
// API to get an effective block checksum. This function is its own inverse

View file

@ -186,6 +186,16 @@ class Footer {
// Create empty. Populate using DecodeFrom.
Footer() {}
void Reset() {
table_magic_number_ = kNullTableMagicNumber;
format_version_ = kInvalidFormatVersion;
base_context_checksum_ = 0;
metaindex_handle_ = BlockHandle::NullBlockHandle();
index_handle_ = BlockHandle::NullBlockHandle();
checksum_type_ = kInvalidChecksumType;
block_trailer_size_ = 0;
}
// Deserialize a footer (populate fields) from `input` and check for various
// corruptions. `input_offset` is the offset within the target file of
// `input` buffer, which is needed for verifying format_version >= 6 footer.
@ -304,7 +314,8 @@ class FooterBuilder {
Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
uint64_t file_size, Footer* footer,
uint64_t enforce_table_magic_number = 0);
uint64_t enforce_table_magic_number = 0,
Statistics* stats = nullptr);
// Computes a checksum using the given ChecksumType. Sometimes we need to
// include one more input byte logically at the end but not part of the main

View file

@ -262,184 +262,232 @@ Status ReadTablePropertiesHelper(
MemoryAllocator* memory_allocator) {
assert(table_properties);
// If this is an external SST file ingested with write_global_seqno set to
// true, then we expect the checksum mismatch because checksum was written
// by SstFileWriter, but its global seqno in the properties block may have
// been changed during ingestion. For this reason, we initially read
// and process without checksum verification, then later try checksum
// verification so that if it fails, we can copy to a temporary buffer with
// global seqno set to its original value, i.e. 0, and attempt checksum
// verification again.
ReadOptions modified_ro = ro;
modified_ro.verify_checksums = false;
BlockContents block_contents;
BlockFetcher block_fetcher(file, prefetch_buffer, footer, modified_ro, handle,
&block_contents, ioptions, false /* decompress */,
false /*maybe_compressed*/, BlockType::kProperties,
UncompressionDict::GetEmptyDict(),
PersistentCacheOptions::kEmpty, memory_allocator);
Status s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
return s;
}
// Unfortunately, Block::size() might not equal block_contents.data.size(),
// and Block hides block_contents
uint64_t block_size = block_contents.data.size();
Block properties_block(std::move(block_contents));
std::unique_ptr<MetaBlockIter> iter(properties_block.NewMetaIterator());
std::unique_ptr<TableProperties> new_table_properties{new TableProperties};
// All pre-defined properties of type uint64_t
std::unordered_map<std::string, uint64_t*> predefined_uint64_properties = {
{TablePropertiesNames::kOriginalFileNumber,
&new_table_properties->orig_file_number},
{TablePropertiesNames::kDataSize, &new_table_properties->data_size},
{TablePropertiesNames::kIndexSize, &new_table_properties->index_size},
{TablePropertiesNames::kIndexPartitions,
&new_table_properties->index_partitions},
{TablePropertiesNames::kTopLevelIndexSize,
&new_table_properties->top_level_index_size},
{TablePropertiesNames::kIndexKeyIsUserKey,
&new_table_properties->index_key_is_user_key},
{TablePropertiesNames::kIndexValueIsDeltaEncoded,
&new_table_properties->index_value_is_delta_encoded},
{TablePropertiesNames::kFilterSize, &new_table_properties->filter_size},
{TablePropertiesNames::kRawKeySize, &new_table_properties->raw_key_size},
{TablePropertiesNames::kRawValueSize,
&new_table_properties->raw_value_size},
{TablePropertiesNames::kNumDataBlocks,
&new_table_properties->num_data_blocks},
{TablePropertiesNames::kNumEntries, &new_table_properties->num_entries},
{TablePropertiesNames::kNumFilterEntries,
&new_table_properties->num_filter_entries},
{TablePropertiesNames::kDeletedKeys,
&new_table_properties->num_deletions},
{TablePropertiesNames::kMergeOperands,
&new_table_properties->num_merge_operands},
{TablePropertiesNames::kNumRangeDeletions,
&new_table_properties->num_range_deletions},
{TablePropertiesNames::kFormatVersion,
&new_table_properties->format_version},
{TablePropertiesNames::kFixedKeyLen,
&new_table_properties->fixed_key_len},
{TablePropertiesNames::kColumnFamilyId,
&new_table_properties->column_family_id},
{TablePropertiesNames::kCreationTime,
&new_table_properties->creation_time},
{TablePropertiesNames::kOldestKeyTime,
&new_table_properties->oldest_key_time},
{TablePropertiesNames::kFileCreationTime,
&new_table_properties->file_creation_time},
{TablePropertiesNames::kSlowCompressionEstimatedDataSize,
&new_table_properties->slow_compression_estimated_data_size},
{TablePropertiesNames::kFastCompressionEstimatedDataSize,
&new_table_properties->fast_compression_estimated_data_size},
{TablePropertiesNames::kTailStartOffset,
&new_table_properties->tail_start_offset},
{TablePropertiesNames::kUserDefinedTimestampsPersisted,
&new_table_properties->user_defined_timestamps_persisted},
{TablePropertiesNames::kKeyLargestSeqno,
&new_table_properties->key_largest_seqno},
};
std::string last_key;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
s = iter->status();
if (!s.ok()) {
break;
Status s;
bool retry = false;
while (true) {
BlockContents block_contents;
size_t len = handle.size() + footer.GetBlockTrailerSize();
// If this is an external SST file ingested with write_global_seqno set to
// true, then we expect the checksum mismatch because checksum was written
// by SstFileWriter, but its global seqno in the properties block may have
// been changed during ingestion. For this reason, we initially read
// and process without checksum verification, then later try checksum
// verification so that if it fails, we can copy to a temporary buffer with
// global seqno set to its original value, i.e. 0, and attempt checksum
// verification again.
if (!retry) {
ReadOptions modified_ro = ro;
modified_ro.verify_checksums = false;
BlockFetcher block_fetcher(
file, prefetch_buffer, footer, modified_ro, handle, &block_contents,
ioptions, false /* decompress */, false /*maybe_compressed*/,
BlockType::kProperties, UncompressionDict::GetEmptyDict(),
PersistentCacheOptions::kEmpty, memory_allocator);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
return s;
}
assert(block_fetcher.GetBlockSizeWithTrailer() == len);
TEST_SYNC_POINT_CALLBACK("ReadTablePropertiesHelper:0",
&block_contents.data);
} else {
assert(s.IsCorruption());
// If retrying, use a stronger file system read to check and correct
// data corruption
IOOptions opts;
if (PrepareIOFromReadOptions(ro, ioptions.clock, opts) !=
IOStatus::OK()) {
return s;
}
opts.verify_and_reconstruct_read = true;
std::unique_ptr<char[]> data(new char[len]);
Slice result;
IOStatus io_s =
file->Read(opts, handle.offset(), len, &result, data.get(), nullptr);
RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_COUNT);
if (!io_s.ok()) {
ROCKS_LOG_INFO(ioptions.info_log,
"Reading properties block failed - %s",
io_s.ToString().c_str());
// Return the original corruption error as that's more serious
return s;
}
if (result.size() < len) {
return Status::Corruption("Reading properties block failed - " +
std::to_string(result.size()) +
" bytes read");
}
RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
block_contents = BlockContents(std::move(data), handle.size());
}
auto key = iter->key().ToString();
// properties block should be strictly sorted with no duplicate key.
if (!last_key.empty() &&
BytewiseComparator()->Compare(key, last_key) <= 0) {
s = Status::Corruption("properties unsorted");
break;
}
last_key = key;
uint64_t block_size = block_contents.data.size();
Block properties_block(std::move(block_contents));
// Unfortunately, Block::size() might not equal block_contents.data.size(),
// and Block hides block_contents
std::unique_ptr<MetaBlockIter> iter(properties_block.NewMetaIterator());
auto raw_val = iter->value();
auto pos = predefined_uint64_properties.find(key);
std::unique_ptr<TableProperties> new_table_properties{new TableProperties};
// All pre-defined properties of type uint64_t
std::unordered_map<std::string, uint64_t*> predefined_uint64_properties = {
{TablePropertiesNames::kOriginalFileNumber,
&new_table_properties->orig_file_number},
{TablePropertiesNames::kDataSize, &new_table_properties->data_size},
{TablePropertiesNames::kIndexSize, &new_table_properties->index_size},
{TablePropertiesNames::kIndexPartitions,
&new_table_properties->index_partitions},
{TablePropertiesNames::kTopLevelIndexSize,
&new_table_properties->top_level_index_size},
{TablePropertiesNames::kIndexKeyIsUserKey,
&new_table_properties->index_key_is_user_key},
{TablePropertiesNames::kIndexValueIsDeltaEncoded,
&new_table_properties->index_value_is_delta_encoded},
{TablePropertiesNames::kFilterSize, &new_table_properties->filter_size},
{TablePropertiesNames::kRawKeySize,
&new_table_properties->raw_key_size},
{TablePropertiesNames::kRawValueSize,
&new_table_properties->raw_value_size},
{TablePropertiesNames::kNumDataBlocks,
&new_table_properties->num_data_blocks},
{TablePropertiesNames::kNumEntries, &new_table_properties->num_entries},
{TablePropertiesNames::kNumFilterEntries,
&new_table_properties->num_filter_entries},
{TablePropertiesNames::kDeletedKeys,
&new_table_properties->num_deletions},
{TablePropertiesNames::kMergeOperands,
&new_table_properties->num_merge_operands},
{TablePropertiesNames::kNumRangeDeletions,
&new_table_properties->num_range_deletions},
{TablePropertiesNames::kFormatVersion,
&new_table_properties->format_version},
{TablePropertiesNames::kFixedKeyLen,
&new_table_properties->fixed_key_len},
{TablePropertiesNames::kColumnFamilyId,
&new_table_properties->column_family_id},
{TablePropertiesNames::kCreationTime,
&new_table_properties->creation_time},
{TablePropertiesNames::kOldestKeyTime,
&new_table_properties->oldest_key_time},
{TablePropertiesNames::kFileCreationTime,
&new_table_properties->file_creation_time},
{TablePropertiesNames::kSlowCompressionEstimatedDataSize,
&new_table_properties->slow_compression_estimated_data_size},
{TablePropertiesNames::kFastCompressionEstimatedDataSize,
&new_table_properties->fast_compression_estimated_data_size},
{TablePropertiesNames::kTailStartOffset,
&new_table_properties->tail_start_offset},
{TablePropertiesNames::kUserDefinedTimestampsPersisted,
&new_table_properties->user_defined_timestamps_persisted},
{TablePropertiesNames::kKeyLargestSeqno,
&new_table_properties->key_largest_seqno},
};
if (key == ExternalSstFilePropertyNames::kGlobalSeqno) {
new_table_properties->external_sst_file_global_seqno_offset =
handle.offset() + iter->ValueOffset();
}
std::string last_key;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
s = iter->status();
if (!s.ok()) {
break;
}
if (pos != predefined_uint64_properties.end()) {
if (key == TablePropertiesNames::kDeletedKeys ||
key == TablePropertiesNames::kMergeOperands) {
// Insert in user-collected properties for API backwards compatibility
auto key = iter->key().ToString();
// properties block should be strictly sorted with no duplicate key.
if (!last_key.empty() &&
BytewiseComparator()->Compare(key, last_key) <= 0) {
s = Status::Corruption("properties unsorted");
break;
}
last_key = key;
auto raw_val = iter->value();
auto pos = predefined_uint64_properties.find(key);
if (key == ExternalSstFilePropertyNames::kGlobalSeqno) {
new_table_properties->external_sst_file_global_seqno_offset =
handle.offset() + iter->ValueOffset();
}
if (pos != predefined_uint64_properties.end()) {
if (key == TablePropertiesNames::kDeletedKeys ||
key == TablePropertiesNames::kMergeOperands) {
// Insert in user-collected properties for API backwards compatibility
new_table_properties->user_collected_properties.insert(
{key, raw_val.ToString()});
}
// handle predefined rocksdb properties
uint64_t val;
if (!GetVarint64(&raw_val, &val)) {
// skip malformed value
auto error_msg =
"Detect malformed value in properties meta-block:"
"\tkey: " +
key + "\tval: " + raw_val.ToString();
ROCKS_LOG_ERROR(ioptions.logger, "%s", error_msg.c_str());
continue;
}
*(pos->second) = val;
} else if (key == TablePropertiesNames::kDbId) {
new_table_properties->db_id = raw_val.ToString();
} else if (key == TablePropertiesNames::kDbSessionId) {
new_table_properties->db_session_id = raw_val.ToString();
} else if (key == TablePropertiesNames::kDbHostId) {
new_table_properties->db_host_id = raw_val.ToString();
} else if (key == TablePropertiesNames::kFilterPolicy) {
new_table_properties->filter_policy_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kColumnFamilyName) {
new_table_properties->column_family_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kComparator) {
new_table_properties->comparator_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kMergeOperator) {
new_table_properties->merge_operator_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kPrefixExtractorName) {
new_table_properties->prefix_extractor_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kPropertyCollectors) {
new_table_properties->property_collectors_names = raw_val.ToString();
} else if (key == TablePropertiesNames::kCompression) {
new_table_properties->compression_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kCompressionOptions) {
new_table_properties->compression_options = raw_val.ToString();
} else if (key == TablePropertiesNames::kSequenceNumberTimeMapping) {
new_table_properties->seqno_to_time_mapping = raw_val.ToString();
} else {
// handle user-collected properties
new_table_properties->user_collected_properties.insert(
{key, raw_val.ToString()});
}
// handle predefined rocksdb properties
uint64_t val;
if (!GetVarint64(&raw_val, &val)) {
// skip malformed value
auto error_msg =
"Detect malformed value in properties meta-block:"
"\tkey: " +
key + "\tval: " + raw_val.ToString();
ROCKS_LOG_ERROR(ioptions.logger, "%s", error_msg.c_str());
continue;
}
// Modified version of BlockFetcher checksum verification
// (See write_global_seqno comment above)
if (s.ok() && footer.GetBlockTrailerSize() > 0) {
s = VerifyBlockChecksum(footer, properties_block.data(), block_size,
file->file_name(), handle.offset());
if (s.IsCorruption()) {
if (new_table_properties->external_sst_file_global_seqno_offset != 0) {
std::string tmp_buf(properties_block.data(), len);
uint64_t global_seqno_offset =
new_table_properties->external_sst_file_global_seqno_offset -
handle.offset();
EncodeFixed64(&tmp_buf[static_cast<size_t>(global_seqno_offset)], 0);
s = VerifyBlockChecksum(footer, tmp_buf.data(), block_size,
file->file_name(), handle.offset());
}
}
*(pos->second) = val;
} else if (key == TablePropertiesNames::kDbId) {
new_table_properties->db_id = raw_val.ToString();
} else if (key == TablePropertiesNames::kDbSessionId) {
new_table_properties->db_session_id = raw_val.ToString();
} else if (key == TablePropertiesNames::kDbHostId) {
new_table_properties->db_host_id = raw_val.ToString();
} else if (key == TablePropertiesNames::kFilterPolicy) {
new_table_properties->filter_policy_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kColumnFamilyName) {
new_table_properties->column_family_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kComparator) {
new_table_properties->comparator_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kMergeOperator) {
new_table_properties->merge_operator_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kPrefixExtractorName) {
new_table_properties->prefix_extractor_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kPropertyCollectors) {
new_table_properties->property_collectors_names = raw_val.ToString();
} else if (key == TablePropertiesNames::kCompression) {
new_table_properties->compression_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kCompressionOptions) {
new_table_properties->compression_options = raw_val.ToString();
} else if (key == TablePropertiesNames::kSequenceNumberTimeMapping) {
new_table_properties->seqno_to_time_mapping = raw_val.ToString();
}
// If we detected a corruption and the file system supports verification
// and reconstruction, retry the read
if (s.IsCorruption() && !retry &&
CheckFSFeatureSupport(ioptions.fs.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
retry = true;
} else {
// handle user-collected properties
new_table_properties->user_collected_properties.insert(
{key, raw_val.ToString()});
}
}
// Modified version of BlockFetcher checksum verification
// (See write_global_seqno comment above)
if (s.ok() && footer.GetBlockTrailerSize() > 0) {
s = VerifyBlockChecksum(footer, properties_block.data(), block_size,
file->file_name(), handle.offset());
if (s.IsCorruption()) {
if (new_table_properties->external_sst_file_global_seqno_offset != 0) {
std::string tmp_buf(properties_block.data(),
block_fetcher.GetBlockSizeWithTrailer());
uint64_t global_seqno_offset =
new_table_properties->external_sst_file_global_seqno_offset -
handle.offset();
EncodeFixed64(&tmp_buf[static_cast<size_t>(global_seqno_offset)], 0);
s = VerifyBlockChecksum(footer, tmp_buf.data(), block_size,
file->file_name(), handle.offset());
if (s.ok()) {
*table_properties = std::move(new_table_properties);
}
break;
}
}
if (s.ok()) {
*table_properties = std::move(new_table_properties);
}
return s;
}

View file

@ -12,8 +12,8 @@
#include "db/range_tombstone_fragmenter.h"
#if USE_COROUTINES
#include "folly/experimental/coro/Coroutine.h"
#include "folly/experimental/coro/Task.h"
#include "folly/coro/Coroutine.h"
#include "folly/coro/Task.h"
#endif
#include "rocksdb/slice_transform.h"
#include "rocksdb/table_reader_caller.h"

View file

@ -125,7 +125,7 @@ EOF
# To check for DB forward compatibility with loading options (old version
# reading data from new), as well as backward compatibility
declare -a db_forward_with_options_refs=("8.6.fb" "8.7.fb" "8.8.fb" "8.9.fb" "8.10.fb" "8.11.fb" "9.0.fb" "9.1.fb" "9.2.fb" "9.3.fb" "9.4.fb" "9.5.fb", "9.6.fb")
declare -a db_forward_with_options_refs=("8.6.fb" "8.7.fb" "8.8.fb" "8.9.fb" "8.10.fb" "8.11.fb" "9.0.fb" "9.1.fb" "9.2.fb" "9.3.fb" "9.4.fb" "9.5.fb" "9.6.fb")
# To check for DB forward compatibility without loading options (in addition
# to the "with loading options" set), as well as backward compatibility
declare -a db_forward_no_options_refs=() # N/A at the moment

View file

@ -923,16 +923,22 @@ def finalize_and_sanitize(src_params):
dest_params["prefixpercent"] = 0
dest_params["check_multiget_consistency"] = 0
dest_params["check_multiget_entity_consistency"] = 0
if dest_params.get("disable_wal") == 0 and dest_params.get("reopen") > 0:
# Reopen with WAL currently requires persisting WAL data before closing for reopen.
# Previous injected WAL write errors may not be cleared by the time of closing and ready
# for persisting WAL.
# To simplify, we disable any WAL write error injection.
# TODO(hx235): support WAL write error injection with reopen
# TODO(hx235): support excluding WAL from metadata write fault injection so we don't
# have to disable metadata write fault injection to other file
dest_params["exclude_wal_from_write_fault_injection"] = 1
dest_params["metadata_write_fault_one_in"] = 0
if dest_params.get("disable_wal") == 0:
if dest_params.get("reopen") > 0 or (dest_params.get("manual_wal_flush_one_in") and dest_params.get("column_families") != 1):
# Reopen with WAL currently requires persisting WAL data before closing for reopen.
# Previous injected WAL write errors may not be cleared by the time of closing and ready
# for persisting WAL.
# To simplify, we disable any WAL write error injection.
# TODO(hx235): support WAL write error injection with reopen
# TODO(hx235): support excluding WAL from metadata write fault injection so we don't
# have to disable metadata write fault injection to other file
#
# WAL write failure can drop buffered WAL data. This can cause
# inconsistency when one CF has a successful flush during auto
# recovery. Disable the fault injection in this path for now until
# we have a fix that allows auto recovery.
dest_params["exclude_wal_from_write_fault_injection"] = 1
dest_params["metadata_write_fault_one_in"] = 0
if dest_params.get("disable_wal") == 1:
# disableWAL and recycle_log_file_num options are not mutually
# compatible at the moment

View file

@ -1 +1 @@
* Support ingesting db generated files using hard link, i.e. IngestExternalFileOptions::move_files and IngestExternalFileOptions::allow_db_generated_files._
* Support ingesting db generated files using hard link, i.e. IngestExternalFileOptions::move_files/link_files and IngestExternalFileOptions::allow_db_generated_files.

View file

@ -0,0 +1 @@
* Add a new file ingestion option `IngestExternalFileOptions::link_files` to hard link input files and preserve original files links after ingestion.

View file

@ -0,0 +1 @@
* Fix a bug in CompactRange() where result files may not be compacted in any future compaction. This can only happen when users configure CompactRangeOptions::change_level to true and the change level step of manual compaction fails (#13009).

View file

@ -7,7 +7,7 @@
#if USE_COROUTINES
#include "file/random_access_file_reader.h"
#include "folly/experimental/coro/ViaIfAsync.h"
#include "folly/coro/ViaIfAsync.h"
#include "port/port.h"
#include "rocksdb/file_system.h"
#include "rocksdb/statistics.h"

View file

@ -5,8 +5,8 @@
// (found in the LICENSE.Apache file in the root directory).
#if defined(USE_COROUTINES)
#include "folly/experimental/coro/Coroutine.h"
#include "folly/experimental/coro/Task.h"
#include "folly/coro/Coroutine.h"
#include "folly/coro/Task.h"
#endif
#include "rocksdb/rocksdb_namespace.h"

View file

@ -506,10 +506,10 @@ IOStatus TestFSRandomAccessFile::ReadAsync(
s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr);
// TODO (low priority): fs_->ReadUnsyncedData()
} else {
// If theres no injected error, then cb will be called asynchronously when
// target_ actually finishes the read. But if theres an injected error, it
// If there's no injected error, then cb will be called asynchronously when
// target_ actually finishes the read. But if there's an injected error, it
// needs to immediately call cb(res, cb_arg) s since target_->ReadAsync()
// isnt invoked at all.
// isn't invoked at all.
res.status = res_status;
cb(res, cb_arg);
}