Change the behavior of manual flush to not retain UDT (#12737)

Summary:
When user-defined timestamps in Memtable only feature is enabled, all scheduled flushes go through a check to see if it's eligible to be rescheduled to retain user-defined timestamps. However when the user makes a manual flush request, their intention is for all the in memory data to be persisted into SST files as soon as possible. These two sides have some conflict of interest, the user can implement some workaround like https://github.com/facebook/rocksdb/issues/12631 to explicitly mark which one takes precedence. The implementation for this can be nuanced since the user needs to be aware of all the scenarios that can trigger a manual flush and handle the concurrency well etc.

In this PR, we updated the default behavior to give manual flush precedence when it's requested. The user-defined timestamps rescheduling mechanism is turned off when a manual flush is requested. Likewise, all error recovery triggered flushes skips the rescheduling mechanism too.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12737

Test Plan: Add unit tests

Reviewed By: ajkr

Differential Revision: D58538246

Pulled By: jowlyzhang

fbshipit-source-id: 0b9b3d1af3e8d882f2d6a2406adda19324ba0694
This commit is contained in:
Yu Zhang 2024-06-13 13:18:10 -07:00
parent aa44f959a6
commit b01951ed91
5 changed files with 265 additions and 217 deletions

View file

@ -538,6 +538,7 @@ ColumnFamilyData::ColumnFamilyData(
refs_(0),
initialized_(false),
dropped_(false),
flush_skip_reschedule_(false),
internal_comparator_(cf_options.comparator),
initial_cf_options_(SanitizeOptions(db_options, cf_options)),
ioptions_(db_options, initial_cf_options_),
@ -1607,6 +1608,19 @@ FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
return data_dirs_[path_id].get();
}
void ColumnFamilyData::SetFlushSkipReschedule() {
const Comparator* ucmp = user_comparator();
const size_t ts_sz = ucmp->timestamp_size();
if (ts_sz == 0 || ioptions_.persist_user_defined_timestamps) {
return;
}
flush_skip_reschedule_.store(true);
}
bool ColumnFamilyData::GetAndClearFlushSkipReschedule() {
return flush_skip_reschedule_.exchange(false);
}
bool ColumnFamilyData::ShouldPostponeFlushToRetainUDT(
uint64_t max_memtable_id) {
const Comparator* ucmp = user_comparator();

View file

@ -329,6 +329,10 @@ class ColumnFamilyData {
void SetDropped();
bool IsDropped() const { return dropped_.load(std::memory_order_relaxed); }
void SetFlushSkipReschedule();
bool GetAndClearFlushSkipReschedule();
// thread-safe
int NumberLevels() const { return ioptions_.num_levels; }
@ -592,6 +596,15 @@ class ColumnFamilyData {
std::atomic<bool> initialized_;
std::atomic<bool> dropped_; // true if client dropped it
// When user-defined timestamps in memtable only feature is enabled, this
// flag indicates a successfully requested flush that should
// skip being rescheduled and haven't undergone the rescheduling check yet.
// This flag is cleared when a check skips rescheduling a FlushRequest.
// With this flag, automatic flushes in regular cases can continue to
// retain UDTs by getting rescheduled as usual while manual flushes and
// error recovery flushes will proceed without getting rescheduled.
std::atomic<bool> flush_skip_reschedule_;
const InternalKeyComparator internal_comparator_;
InternalTblPropCollFactories internal_tbl_prop_coll_factories_;

View file

@ -35,12 +35,11 @@
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
namespace {
std::string EncodeAsUint64(uint64_t v) {
std::string dst;
PutFixed64(&dst, v);
return dst;
std::string EncodeAsUint64(uint64_t number) {
std::string result;
PutFixed64(&result, number);
return result;
}
} // namespace
@ -3612,7 +3611,9 @@ TEST(ColumnFamilyTest, ValidateMemtableKVChecksumOption) {
}
// Tests the flushing behavior of a column family to retain user-defined
// timestamp when `persist_user_defined_timestamp` is false.
// timestamp when `persist_user_defined_timestamp` is false. The behavior of
// auto flush is it makes some effort to retain user-defined timestamps while
// the behavior of manual flush is that it skips retaining UDTs.
class ColumnFamilyRetainUDTTest : public ColumnFamilyTestBase {
public:
ColumnFamilyRetainUDTTest() : ColumnFamilyTestBase(kLatestFormatVersion) {}
@ -3630,6 +3631,27 @@ class ColumnFamilyRetainUDTTest : public ColumnFamilyTestBase {
return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(ts),
Slice(value));
}
std::string Get(int cf, const std::string& key, const std::string& read_ts) {
ReadOptions ropts;
Slice timestamp = read_ts;
ropts.timestamp = &timestamp;
std::string value;
Status s = db_->Get(ropts, handles_[cf], Slice(key), &value);
if (s.IsNotFound()) {
return "NOT_FOUND";
} else if (s.ok()) {
return value;
}
return "";
}
void CheckEffectiveCutoffTime(uint64_t expected_cutoff) {
std::string effective_full_history_ts_low;
EXPECT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
EXPECT_EQ(EncodeAsUint64(expected_cutoff), effective_full_history_ts_low);
}
};
class TestTsComparator : public Comparator {
@ -3673,7 +3695,9 @@ TEST_F(ColumnFamilyRetainUDTTest, SanityCheck) {
Close();
}
TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) {
class AutoFlushRetainUDTTest : public ColumnFamilyRetainUDTTest {};
TEST_F(AutoFlushRetainUDTTest, FullHistoryTsLowNotSet) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
@ -3684,23 +3708,21 @@ TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) {
SyncPoint::GetInstance()->EnableProcessing();
Open();
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
// No `full_history_ts_low` explicitly set by user, flush is continued
// No `full_history_ts_low` explicitly set by user, auto flush is continued
// without checking if its UDTs expired.
ASSERT_OK(Flush(0));
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
CheckEffectiveCutoffTime(2);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {
TEST_F(AutoFlushRetainUDTTest, AllKeysExpired) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
@ -3712,21 +3734,20 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {
Open();
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)));
// All keys expired w.r.t the configured `full_history_ts_low`, flush continue
// without the need for a re-schedule.
ASSERT_OK(Flush(0));
// All keys expired w.r.t the configured `full_history_ts_low`, auto flush
// continue without the need for a re-schedule.
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// `full_history_ts_low` stays unchanged after flush.
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
CheckEffectiveCutoffTime(3);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffInMemtableSealCb) {
TEST_F(AutoFlushRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
@ -3739,198 +3760,22 @@ TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffInMemtableSealCb) {
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(db_->SetOptions(handles_[0], {{"max_write_buffer_number", "1"}}));
// Not all keys expired, but flush is continued without a re-schedule because
// of risk of write stall.
ASSERT_OK(Flush(0));
// Not all keys expired, but auto flush is continued without a re-schedule
// because of risk of write stall.
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
CheckEffectiveCutoffTime(2);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
// The user selectively increase cutoff timestamp in the `OnMemtableSealed`
// callback when it is invoked during a manual flush. It's suitable for when the
// user does not know an effective new cutoff timestamp and the callback will
// provide this info.
// The caveat of this approach is that the user need to track when manual flush
// is ongoing. In this example listener, the `manual_flush_count_` variable is
// for this purpose, it's designed to be a counter to allow concurrent manual
// flush to control the increase cutoff timestamp behavior independently.
// Also, a lot of operations can indirectly cause a manual flush, such as
// manual compaction/file ingestion. And the user needs to
// explicitly track each of such operation. So this callback is not ideal. Check
// out below `ManualFlushScheduledEventListener` for a different approach.
class MemtableSealEventListener : public EventListener {
private:
DB* db_;
std::vector<ColumnFamilyHandle*> handles_;
std::atomic<int> manual_flush_count_{0};
public:
std::atomic<int> memtable_seal_count_{0};
std::atomic<int> increase_cutoff_count_{0};
void OnMemTableSealed(const MemTableInfo& info) override {
memtable_seal_count_.fetch_add(1);
if (manual_flush_count_.load() == 0) {
return;
}
if (!info.newest_udt.empty()) {
uint64_t int_newest_udt = 0;
Slice udt_slice = info.newest_udt;
Status s = DecodeU64Ts(udt_slice, &int_newest_udt);
if (!s.ok()) {
return;
}
// An error indicates others have already set the cutoff to a higher
// point, so it's OK to proceed.
db_->IncreaseFullHistoryTsLow(handles_[0],
EncodeAsUint64(int_newest_udt + 1))
.PermitUncheckedError();
increase_cutoff_count_.fetch_add(1);
}
}
void PopulateDBAndHandles(DB* db, std::vector<ColumnFamilyHandle*> handles) {
db_ = db;
handles_ = handles;
}
void MarkManualFlushStart() { manual_flush_count_.fetch_add(1); }
void MarkManualFlushEnd() { manual_flush_count_.fetch_sub(1); }
};
TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffOnMemtableSealedCb) {
std::shared_ptr<MemtableSealEventListener> listener =
std::make_shared<MemtableSealEventListener>();
db_options_.listeners.push_back(listener);
const int kNumEntriesPerMemTable = 2;
column_family_options_.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumEntriesPerMemTable - 1));
// Make sure there is no memory pressure to not retain udts.
column_family_options_.max_write_buffer_number = 8;
Open();
listener->PopulateDBAndHandles(db_, handles_);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(Put(0, "bar", EncodeAsUint64(2), "v1"));
ASSERT_OK(Put(0, "baz", EncodeAsUint64(2), "v1"));
// Event listener not attempt to increase cutoff timestamp if there is no
// manual flush going on.
ASSERT_EQ(listener->memtable_seal_count_.load(), 1);
ASSERT_EQ(listener->increase_cutoff_count_.load(), 0);
// Created the first memtable and scheduled it for flush.
ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v1"));
listener->MarkManualFlushStart();
// Cutoff increased to 3 in `OnMemTableSealed` callback.
ASSERT_OK(dbfull()->Flush(FlushOptions(), handles_[0]));
listener->MarkManualFlushEnd();
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
ASSERT_OK(Put(0, "foo", EncodeAsUint64(4), "v2"));
// Cutoff increased to 5 in `OnMemtableSealed` callback.
listener->MarkManualFlushStart();
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[0], nullptr,
nullptr));
listener->MarkManualFlushEnd();
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(5), effective_full_history_ts_low);
// There are two attempts to increase cutoff timestamp, one for each manual
// compaction.
ASSERT_EQ(listener->increase_cutoff_count_.load(), 2);
Close();
}
// The user explicitly increase cutoff timestamp in the `OnManualFlushScheduled`
// callback. It's suitable for when the user already knows an effective cutoff
// timestamp to let the flush proceed.
class ManualFlushScheduledEventListener : public EventListener {
private:
std::vector<ColumnFamilyHandle*> handles_;
// this is a workaround to get a meaningful cutoff timestamp to use.
std::atomic<uint64_t> counter{0};
public:
void OnManualFlushScheduled(
DB* db, const std::vector<ManualFlushInfo>& manual_flush_info) override {
// This vector should always be 1 for non atomic flush case.
EXPECT_EQ(manual_flush_info.size(), 1);
EXPECT_EQ(manual_flush_info[0].cf_name, kDefaultColumnFamilyName);
if (counter.load() == 0) {
EXPECT_EQ(manual_flush_info[0].flush_reason, FlushReason::kManualFlush);
// An error indicates others have already set the cutoff to a higher
// point, so it's OK to proceed.
db->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3))
.PermitUncheckedError();
} else if (counter.load() == 1) {
EXPECT_EQ(manual_flush_info[0].flush_reason,
FlushReason::kManualCompaction);
db->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(5))
.PermitUncheckedError();
}
counter.fetch_add(1);
}
void PopulateHandles(std::vector<ColumnFamilyHandle*> handles) {
handles_ = handles;
}
};
TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffOnManualFlushScheduledCb) {
std::shared_ptr<ManualFlushScheduledEventListener> listener =
std::make_shared<ManualFlushScheduledEventListener>();
db_options_.listeners.push_back(listener);
const int kNumEntriesPerMemTable = 2;
column_family_options_.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumEntriesPerMemTable - 1));
// Make sure there is no memory pressure to not retain udts.
column_family_options_.max_write_buffer_number = 8;
Open();
listener->PopulateHandles(handles_);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(Put(0, "bar", EncodeAsUint64(2), "v1"));
ASSERT_OK(Put(0, "baz", EncodeAsUint64(2), "v1"));
// Created the first memtable and scheduled it for flush.
ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v1"));
// Cutoff increased to 3 in the `OnManualFlushScheduled` callback.
ASSERT_OK(dbfull()->Flush(FlushOptions(), handles_[0]));
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
ASSERT_OK(Put(0, "foo", EncodeAsUint64(4), "v2"));
// Cutoff increased to 5 in the `OnManualFlushScheduled` callback.
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[0], nullptr,
nullptr));
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(5), effective_full_history_ts_low);
Close();
}
TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
std::string cutoff_ts;
TEST_F(AutoFlushRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
std::atomic<int> local_counter{1};
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) {
// Increasing full_history_ts_low so all keys expired after the initial
@ -3943,6 +3788,7 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(2, reschedule_count);
local_counter.fetch_add(1);
});
SyncPoint::GetInstance()->EnableProcessing();
@ -3952,19 +3798,177 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
// Not all keys expired, and there is no risk of write stall. Flush is
// rescheduled. The actual flush happens after `full_history_ts_low` is
// increased to mark all keys expired.
ASSERT_OK(Flush(0));
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// Make sure callback is not skipped.
ASSERT_EQ(2, local_counter);
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` stays unchanged.
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
CheckEffectiveCutoffTime(3);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
class ManualFlushSkipRetainUDTTest : public ColumnFamilyRetainUDTTest {
public:
// Write an entry with timestamp that is not expired w.r.t cutoff timestamp,
// and make sure automatic flush would be rescheduled to retain UDT.
void CheckAutomaticFlushRetainUDT(uint64_t write_ts) {
std::atomic<int> local_counter{1};
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) {
// Increasing full_history_ts_low so all keys expired after the
// initial FlushRequest is rescheduled
ASSERT_OK(db_->IncreaseFullHistoryTsLow(
handles_[0], EncodeAsUint64(write_ts + 1)));
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(2, reschedule_count);
local_counter.fetch_add(1);
});
SyncPoint::GetInstance()->EnableProcessing();
EXPECT_OK(Put(0, "foo", EncodeAsUint64(write_ts),
"foo" + std::to_string(write_ts)));
EXPECT_OK(dbfull()->TEST_SwitchWAL());
EXPECT_OK(dbfull()->TEST_WaitForFlushMemTable());
// Make sure callback is not skipped.
EXPECT_EQ(2, local_counter);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
};
TEST_F(ManualFlushSkipRetainUDTTest, ManualFlush) {
Open();
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(0)));
// Manual flush proceeds without trying to retain UDT.
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(Flush(0));
CheckEffectiveCutoffTime(2);
CheckAutomaticFlushRetainUDT(3);
Close();
}
TEST_F(ManualFlushSkipRetainUDTTest, ManualCompaction) {
Open();
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(0)));
// Manual compaction proceeds without trying to retain UDT.
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v2"));
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[0], nullptr, nullptr));
CheckEffectiveCutoffTime(2);
CheckAutomaticFlushRetainUDT(3);
Close();
}
TEST_F(ManualFlushSkipRetainUDTTest, BulkLoading) {
Open();
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(0)));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
// Test flush behavior in bulk loading scenarios.
Options options(db_options_, column_family_options_);
std::string sst_files_dir = dbname_ + "/sst_files/";
ASSERT_OK(DestroyDir(env_, sst_files_dir));
ASSERT_OK(env_->CreateDir(sst_files_dir));
SstFileWriter sst_file_writer(EnvOptions(), options);
std::string file1 = sst_files_dir + "file1.sst";
ASSERT_OK(sst_file_writer.Open(file1));
ASSERT_OK(sst_file_writer.Put("foo", EncodeAsUint64(0), "v2"));
ExternalSstFileInfo file1_info;
ASSERT_OK(sst_file_writer.Finish(&file1_info));
// Bulk loading in UDT mode doesn't support external file key range overlap
// with DB key range.
ASSERT_TRUE(db_->IngestExternalFile({file1}, IngestExternalFileOptions())
.IsInvalidArgument());
std::string file2 = sst_files_dir + "file2.sst";
ASSERT_OK(sst_file_writer.Open(file2));
ASSERT_OK(sst_file_writer.Put("bar", EncodeAsUint64(0), "val"));
ExternalSstFileInfo file2_info;
ASSERT_OK(sst_file_writer.Finish(&file2_info));
// A successful bulk loading, and it doesn't trigger any flush. As a result
// the effective cutoff timestamp is also unchanged.
ASSERT_OK(db_->IngestExternalFile({file2}, IngestExternalFileOptions()));
ASSERT_EQ(Get(0, "foo", EncodeAsUint64(1)), "v1");
ASSERT_EQ(Get(0, "bar", EncodeAsUint64(0)), "val");
CheckEffectiveCutoffTime(0);
CheckAutomaticFlushRetainUDT(1);
Close();
}
TEST_F(ManualFlushSkipRetainUDTTest, AutomaticFlushQueued) {
// TODO(yuzhangyu): this is where the write for no write stall happens.
// We need to sovle that and remove this.
column_family_options_.max_write_buffer_number = 3;
Open();
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(0)));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(dbfull()->TEST_SwitchWAL());
CheckEffectiveCutoffTime(0);
ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v2"));
ASSERT_OK(Flush(0));
CheckEffectiveCutoffTime(3);
CheckAutomaticFlushRetainUDT(4);
Close();
}
TEST_F(ManualFlushSkipRetainUDTTest, ConcurrentManualFlushes) {
Open();
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(0)));
std::vector<ROCKSDB_NAMESPACE::port::Thread> manual_flush_tds;
std::atomic<int> next_ts{0};
std::mutex mtx;
std::condition_variable cv;
auto manual_flush = [&](int write_ts) {
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock,
[&write_ts, &next_ts] { return write_ts == next_ts.load(); });
ASSERT_OK(Put(0, "foo" + std::to_string(write_ts),
EncodeAsUint64(write_ts),
"val_" + std::to_string(write_ts)));
next_ts.fetch_add(1);
cv.notify_all();
}
if (write_ts % 2 == 0) {
ASSERT_OK(Flush(0));
} else {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[0], nullptr,
nullptr));
}
};
for (int write_ts = 0; write_ts < 10; write_ts++) {
manual_flush_tds.emplace_back(manual_flush, write_ts);
}
for (auto& td : manual_flush_tds) {
td.join();
}
CheckEffectiveCutoffTime(10);
CheckAutomaticFlushRetainUDT(11);
Close();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View file

@ -2184,7 +2184,8 @@ class DBImpl : public DB {
void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushReason flush_reason, FlushRequest* req);
void SchedulePendingFlush(const FlushRequest& req);
// Returns true if `req` is successfully enqueued.
bool SchedulePendingFlush(const FlushRequest& req);
void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,

View file

@ -87,6 +87,9 @@ bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT(
mutex_.AssertHeld();
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* cfd = flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
if (cfd->GetAndClearFlushSkipReschedule()) {
return false;
}
uint64_t max_memtable_id =
flush_req.cfd_to_max_mem_id_to_persist.begin()->second;
if (cfd->IsDropped() ||
@ -2431,7 +2434,14 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}
for (const auto& req : flush_reqs) {
SchedulePendingFlush(req);
assert(req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* loop_cfd =
req.cfd_to_max_mem_id_to_persist.begin()->first;
bool already_queued_for_flush = loop_cfd->queued_for_flush();
bool flush_req_enqueued = SchedulePendingFlush(req);
if (already_queued_for_flush || flush_req_enqueued) {
loop_cfd->SetFlushSkipReschedule();
}
}
MaybeScheduleFlushOrCompaction();
}
@ -2647,7 +2657,9 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
flush_reason,
{{cfd,
std::numeric_limits<uint64_t>::max() /* max_mem_id_to_persist */}}};
SchedulePendingFlush(flush_req);
if (SchedulePendingFlush(flush_req)) {
cfd->SetFlushSkipReschedule();
};
}
}
MaybeScheduleFlushOrCompaction();
@ -3053,13 +3065,14 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue(
return cfd;
}
void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
mutex_.AssertHeld();
bool enqueued = false;
if (reject_new_background_jobs_) {
return;
return enqueued;
}
if (flush_req.cfd_to_max_mem_id_to_persist.empty()) {
return;
return enqueued;
}
if (!immutable_db_options_.atomic_flush) {
// For the non-atomic flush case, we never schedule multiple column
@ -3074,6 +3087,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
cfd->set_queued_for_flush(true);
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
enqueued = true;
}
} else {
for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
@ -3082,7 +3096,9 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
}
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
enqueued = true;
}
return enqueued;
}
void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {