mirror of https://github.com/facebook/rocksdb.git
GetEntity Support for ReadOnlyDB and SecondaryDB (#11799)
Summary: `GetEntity` API support for ReadOnly DB and Secondary DB. - Introduced `GetImpl()` with `GetImplOptions` in `db_impl_readonly` and refactored current `Get()` logic into `GetImpl()` so that look up logic can be reused for `GetEntity()` (Following the same pattern as `DBImpl::Get()` and `DBImpl::GetEntity()`) - Introduced `GetImpl()` with `GetImplOptions` in `db_impl_secondary` and refactored current `GetImpl()` logic. This is to make `DBImplSecondary::Get/GetEntity` consistent with `DBImpl::Get/GetEntity` and `DBImplReadOnly::Get/GetEntity` - `GetImpl()` in `db_impl` is now virtual. both `db_impl_readonly` and `db_impl_secondary`'s `Get()` override are no longer needed since all three dbs now have the same `Get()` which calls `GetImpl()` internally. - `GetImpl()` in `DBImplReadOnly` and `DBImplSecondary` now pass in `columns` instead of `nullptr` in lookup functions like `memtable->get()` - Introduced `GetEntity()` API in `DBImplReadOnly` and `DBImplSecondary` which simply calls `GetImpl()` with `columns` set in `GetImplOptions`. - Introduced `Env::IOActivity::kGetEntity` and set read_options.io_activity to `Env::IOActivity::kGetEntity` for `GetEntity()` operations (in db_impl) Pull Request resolved: https://github.com/facebook/rocksdb/pull/11799 Test Plan: **Unit Tests** - Added verification in `DBWideBasicTest::PutEntity` by Reopening DB as ReadOnly with the same setup. - Added verification in `DBSecondaryTest::ReopenAsSecondary` by calling `PutEntity()` and `GetEntity()` on top of existing `Put()` and `Get()` - `make -j64 check` **Crash Tests** - `python3 tools/db_crashtest.py blackbox --max_key=25000000 --write_buffer_size=4194304 --max_bytes_for_level_base=2097152 --target_file_size_base=2097152 --periodic_compaction_seconds=0 --use_put_entity_one_in=10 --use_get_entity=1 --duration=60 --inter val=10` - `python3 tools/db_crashtest.py blackbox --simple --max_key=25000000 --write_buffer_size=4194304 --max_bytes_for_level_base=2097152 --target_file_size_base=2097152 --periodic_compaction_seconds=0 --use_put_entity_one_in=10 --use_get_entity=1 ` - `python3 tools/db_crashtest.py blackbox --cf_consistency --max_key=25000000 --write_buffer_size=4194304 --max_bytes_for_level_base=2097152 --target_file_size_base=2097152 --periodic_compaction_seconds=0 --use_put_entity_one_in=10 --use_get_entity=1 --duration=60 --inter val=10` Reviewed By: ltamasi Differential Revision: D49037040 Pulled By: jaykorean fbshipit-source-id: a0648253ded6e91af7953de364ed3c6bf163626b
This commit is contained in:
parent
47e023abbd
commit
f2b623bcc1
|
@ -1944,25 +1944,27 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status DBImpl::GetEntity(const ReadOptions& read_options,
|
Status DBImpl::GetEntity(const ReadOptions& _read_options,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
PinnableWideColumns* columns) {
|
PinnableWideColumns* columns) {
|
||||||
if (!column_family) {
|
if (!column_family) {
|
||||||
return Status::InvalidArgument(
|
return Status::InvalidArgument(
|
||||||
"Cannot call GetEntity without a column family handle");
|
"Cannot call GetEntity without a column family handle");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!columns) {
|
if (!columns) {
|
||||||
return Status::InvalidArgument(
|
return Status::InvalidArgument(
|
||||||
"Cannot call GetEntity without a PinnableWideColumns object");
|
"Cannot call GetEntity without a PinnableWideColumns object");
|
||||||
}
|
}
|
||||||
|
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
|
||||||
if (read_options.io_activity != Env::IOActivity::kUnknown) {
|
_read_options.io_activity != Env::IOActivity::kGetEntity) {
|
||||||
return Status::InvalidArgument(
|
return Status::InvalidArgument(
|
||||||
"Cannot call GetEntity with `ReadOptions::io_activity` != "
|
"Cannot call GetEntity with `ReadOptions::io_activity` != "
|
||||||
"`Env::IOActivity::kUnknown`");
|
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGetEntity`");
|
||||||
|
}
|
||||||
|
ReadOptions read_options(_read_options);
|
||||||
|
if (read_options.io_activity == Env::IOActivity::kUnknown) {
|
||||||
|
read_options.io_activity = Env::IOActivity::kGetEntity;
|
||||||
}
|
}
|
||||||
|
|
||||||
columns->Reset();
|
columns->Reset();
|
||||||
|
|
||||||
GetImplOptions get_impl_options;
|
GetImplOptions get_impl_options;
|
||||||
|
|
|
@ -643,8 +643,8 @@ class DBImpl : public DB {
|
||||||
// get_impl_options.key via get_impl_options.value
|
// get_impl_options.key via get_impl_options.value
|
||||||
// If get_impl_options.get_value = false get merge operands associated with
|
// If get_impl_options.get_value = false get merge operands associated with
|
||||||
// get_impl_options.key via get_impl_options.merge_operands
|
// get_impl_options.key via get_impl_options.merge_operands
|
||||||
Status GetImpl(const ReadOptions& options, const Slice& key,
|
virtual Status GetImpl(const ReadOptions& options, const Slice& key,
|
||||||
GetImplOptions& get_impl_options);
|
GetImplOptions& get_impl_options);
|
||||||
|
|
||||||
// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
|
// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
|
||||||
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
|
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
|
||||||
|
|
|
@ -29,41 +29,23 @@ DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
|
||||||
DBImplReadOnly::~DBImplReadOnly() {}
|
DBImplReadOnly::~DBImplReadOnly() {}
|
||||||
|
|
||||||
// Implementations of the DB interface
|
// Implementations of the DB interface
|
||||||
Status DBImplReadOnly::Get(const ReadOptions& read_options,
|
Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
const Slice& key,
|
||||||
PinnableSlice* pinnable_val) {
|
GetImplOptions& get_impl_options) {
|
||||||
return Get(read_options, column_family, key, pinnable_val,
|
assert(get_impl_options.value != nullptr ||
|
||||||
/*timestamp*/ nullptr);
|
get_impl_options.columns != nullptr);
|
||||||
}
|
assert(get_impl_options.column_family);
|
||||||
|
|
||||||
Status DBImplReadOnly::Get(const ReadOptions& _read_options,
|
Status s;
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
|
||||||
PinnableSlice* pinnable_val,
|
|
||||||
std::string* timestamp) {
|
|
||||||
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
|
|
||||||
_read_options.io_activity != Env::IOActivity::kGet) {
|
|
||||||
return Status::InvalidArgument(
|
|
||||||
"Can only call Get with `ReadOptions::io_activity` is "
|
|
||||||
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
|
|
||||||
}
|
|
||||||
ReadOptions read_options(_read_options);
|
|
||||||
if (read_options.io_activity == Env::IOActivity::kUnknown) {
|
|
||||||
read_options.io_activity = Env::IOActivity::kGet;
|
|
||||||
}
|
|
||||||
assert(pinnable_val != nullptr);
|
|
||||||
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
|
|
||||||
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
|
|
||||||
PERF_TIMER_GUARD(get_snapshot_time);
|
|
||||||
|
|
||||||
assert(column_family);
|
|
||||||
if (read_options.timestamp) {
|
if (read_options.timestamp) {
|
||||||
const Status s =
|
s = FailIfTsMismatchCf(get_impl_options.column_family,
|
||||||
FailIfTsMismatchCf(column_family, *(read_options.timestamp));
|
*(read_options.timestamp));
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
const Status s = FailIfCfHasTs(column_family);
|
s = FailIfCfHasTs(get_impl_options.column_family);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
@ -71,25 +53,32 @@ Status DBImplReadOnly::Get(const ReadOptions& _read_options,
|
||||||
|
|
||||||
// Clear the timestamps for returning results so that we can distinguish
|
// Clear the timestamps for returning results so that we can distinguish
|
||||||
// between tombstone or key that has never been written
|
// between tombstone or key that has never been written
|
||||||
if (timestamp) {
|
if (get_impl_options.timestamp) {
|
||||||
timestamp->clear();
|
get_impl_options.timestamp->clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
const Comparator* ucmp = column_family->GetComparator();
|
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
|
||||||
assert(ucmp);
|
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
|
||||||
std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr;
|
PERF_TIMER_GUARD(get_snapshot_time);
|
||||||
|
|
||||||
Status s;
|
const Comparator* ucmp = get_impl_options.column_family->GetComparator();
|
||||||
|
assert(ucmp);
|
||||||
|
std::string* ts =
|
||||||
|
ucmp->timestamp_size() > 0 ? get_impl_options.timestamp : nullptr;
|
||||||
SequenceNumber snapshot = versions_->LastSequence();
|
SequenceNumber snapshot = versions_->LastSequence();
|
||||||
GetWithTimestampReadCallback read_cb(snapshot);
|
GetWithTimestampReadCallback read_cb(snapshot);
|
||||||
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
|
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
|
||||||
|
get_impl_options.column_family);
|
||||||
auto cfd = cfh->cfd();
|
auto cfd = cfh->cfd();
|
||||||
if (tracer_) {
|
if (tracer_) {
|
||||||
InstrumentedMutexLock lock(&trace_mutex_);
|
InstrumentedMutexLock lock(&trace_mutex_);
|
||||||
if (tracer_) {
|
if (tracer_) {
|
||||||
tracer_->Get(column_family, key);
|
tracer_->Get(get_impl_options.column_family, key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// In read-only mode Get(), no super version operation is needed (i.e.
|
||||||
|
// GetAndRefSuperVersion and ReturnAndCleanupSuperVersion)
|
||||||
SuperVersion* super_version = cfd->GetSuperVersion();
|
SuperVersion* super_version = cfd->GetSuperVersion();
|
||||||
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||||
s = FailIfReadCollapsedHistory(cfd, super_version,
|
s = FailIfReadCollapsedHistory(cfd, super_version,
|
||||||
|
@ -102,29 +91,42 @@ Status DBImplReadOnly::Get(const ReadOptions& _read_options,
|
||||||
SequenceNumber max_covering_tombstone_seq = 0;
|
SequenceNumber max_covering_tombstone_seq = 0;
|
||||||
LookupKey lkey(key, snapshot, read_options.timestamp);
|
LookupKey lkey(key, snapshot, read_options.timestamp);
|
||||||
PERF_TIMER_STOP(get_snapshot_time);
|
PERF_TIMER_STOP(get_snapshot_time);
|
||||||
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
|
|
||||||
/*columns=*/nullptr, ts, &s, &merge_context,
|
// Look up starts here
|
||||||
&max_covering_tombstone_seq, read_options,
|
if (super_version->mem->Get(
|
||||||
false /* immutable_memtable */, &read_cb)) {
|
lkey,
|
||||||
pinnable_val->PinSelf();
|
get_impl_options.value ? get_impl_options.value->GetSelf() : nullptr,
|
||||||
|
get_impl_options.columns, ts, &s, &merge_context,
|
||||||
|
&max_covering_tombstone_seq, read_options,
|
||||||
|
false /* immutable_memtable */, &read_cb)) {
|
||||||
|
if (get_impl_options.value) {
|
||||||
|
get_impl_options.value->PinSelf();
|
||||||
|
}
|
||||||
RecordTick(stats_, MEMTABLE_HIT);
|
RecordTick(stats_, MEMTABLE_HIT);
|
||||||
} else {
|
} else {
|
||||||
PERF_TIMER_GUARD(get_from_output_files_time);
|
PERF_TIMER_GUARD(get_from_output_files_time);
|
||||||
PinnedIteratorsManager pinned_iters_mgr;
|
PinnedIteratorsManager pinned_iters_mgr;
|
||||||
super_version->current->Get(
|
super_version->current->Get(
|
||||||
read_options, lkey, pinnable_val, /*columns=*/nullptr, ts, &s,
|
read_options, lkey, get_impl_options.value, get_impl_options.columns,
|
||||||
&merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr,
|
ts, &s, &merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr,
|
||||||
/*value_found*/ nullptr,
|
/*value_found*/ nullptr,
|
||||||
/*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb,
|
/*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb,
|
||||||
/*is_blob*/ nullptr,
|
/*is_blob*/ nullptr,
|
||||||
/*do_merge*/ true);
|
/*do_merge*/ true);
|
||||||
RecordTick(stats_, MEMTABLE_MISS);
|
RecordTick(stats_, MEMTABLE_MISS);
|
||||||
}
|
}
|
||||||
RecordTick(stats_, NUMBER_KEYS_READ);
|
{
|
||||||
size_t size = pinnable_val->size();
|
RecordTick(stats_, NUMBER_KEYS_READ);
|
||||||
RecordTick(stats_, BYTES_READ, size);
|
size_t size = 0;
|
||||||
RecordInHistogram(stats_, BYTES_PER_READ, size);
|
if (get_impl_options.value) {
|
||||||
PERF_COUNTER_ADD(get_read_bytes, size);
|
size = get_impl_options.value->size();
|
||||||
|
} else if (get_impl_options.columns) {
|
||||||
|
size = get_impl_options.columns->serialized_size();
|
||||||
|
}
|
||||||
|
RecordTick(stats_, BYTES_READ, size);
|
||||||
|
RecordInHistogram(stats_, BYTES_PER_READ, size);
|
||||||
|
PERF_COUNTER_ADD(get_read_bytes, size);
|
||||||
|
}
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,13 +24,9 @@ class DBImplReadOnly : public DBImpl {
|
||||||
virtual ~DBImplReadOnly();
|
virtual ~DBImplReadOnly();
|
||||||
|
|
||||||
// Implementations of the DB interface
|
// Implementations of the DB interface
|
||||||
using DB::Get;
|
using DBImpl::GetImpl;
|
||||||
virtual Status Get(const ReadOptions& options,
|
Status GetImpl(const ReadOptions& options, const Slice& key,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
GetImplOptions& get_impl_options) override;
|
||||||
PinnableSlice* value) override;
|
|
||||||
Status Get(const ReadOptions& _read_options,
|
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
|
||||||
PinnableSlice* value, std::string* timestamp) override;
|
|
||||||
|
|
||||||
// TODO: Implement ReadOnly MultiGet?
|
// TODO: Implement ReadOnly MultiGet?
|
||||||
|
|
||||||
|
|
|
@ -339,113 +339,93 @@ Status DBImplSecondary::RecoverLogFiles(
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implementation of the DB interface
|
|
||||||
Status DBImplSecondary::Get(const ReadOptions& _read_options,
|
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
|
||||||
PinnableSlice* value) {
|
|
||||||
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
|
|
||||||
_read_options.io_activity != Env::IOActivity::kGet) {
|
|
||||||
return Status::InvalidArgument(
|
|
||||||
"Can only call Get with `ReadOptions::io_activity` is "
|
|
||||||
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
|
|
||||||
}
|
|
||||||
ReadOptions read_options(_read_options);
|
|
||||||
if (read_options.io_activity == Env::IOActivity::kUnknown) {
|
|
||||||
read_options.io_activity = Env::IOActivity::kGet;
|
|
||||||
}
|
|
||||||
return GetImpl(read_options, column_family, key, value,
|
|
||||||
/*timestamp*/ nullptr);
|
|
||||||
}
|
|
||||||
|
|
||||||
Status DBImplSecondary::Get(const ReadOptions& _read_options,
|
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
|
||||||
PinnableSlice* value, std::string* timestamp) {
|
|
||||||
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
|
|
||||||
_read_options.io_activity != Env::IOActivity::kGet) {
|
|
||||||
return Status::InvalidArgument(
|
|
||||||
"Can only call Get with `ReadOptions::io_activity` is "
|
|
||||||
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
|
|
||||||
}
|
|
||||||
ReadOptions read_options(_read_options);
|
|
||||||
if (read_options.io_activity == Env::IOActivity::kUnknown) {
|
|
||||||
read_options.io_activity = Env::IOActivity::kGet;
|
|
||||||
}
|
|
||||||
return GetImpl(read_options, column_family, key, value, timestamp);
|
|
||||||
}
|
|
||||||
|
|
||||||
Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
|
Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
|
||||||
ColumnFamilyHandle* column_family,
|
const Slice& key,
|
||||||
const Slice& key, PinnableSlice* pinnable_val,
|
GetImplOptions& get_impl_options) {
|
||||||
std::string* timestamp) {
|
assert(get_impl_options.value != nullptr ||
|
||||||
assert(pinnable_val != nullptr);
|
get_impl_options.columns != nullptr);
|
||||||
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
|
assert(get_impl_options.column_family);
|
||||||
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
|
|
||||||
PERF_TIMER_GUARD(get_snapshot_time);
|
Status s;
|
||||||
|
|
||||||
assert(column_family);
|
|
||||||
if (read_options.timestamp) {
|
if (read_options.timestamp) {
|
||||||
const Status s =
|
s = FailIfTsMismatchCf(get_impl_options.column_family,
|
||||||
FailIfTsMismatchCf(column_family, *(read_options.timestamp));
|
*(read_options.timestamp));
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
const Status s = FailIfCfHasTs(column_family);
|
s = FailIfCfHasTs(get_impl_options.column_family);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear the timestamp for returning results so that we can distinguish
|
// Clear the timestamps for returning results so that we can distinguish
|
||||||
// between tombstone or key that has never been written later.
|
// between tombstone or key that has never been written
|
||||||
if (timestamp) {
|
if (get_impl_options.timestamp) {
|
||||||
timestamp->clear();
|
get_impl_options.timestamp->clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
|
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
|
||||||
ColumnFamilyData* cfd = cfh->cfd();
|
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
|
||||||
|
PERF_TIMER_GUARD(get_snapshot_time);
|
||||||
|
|
||||||
|
const Comparator* ucmp = get_impl_options.column_family->GetComparator();
|
||||||
|
assert(ucmp);
|
||||||
|
std::string* ts =
|
||||||
|
ucmp->timestamp_size() > 0 ? get_impl_options.timestamp : nullptr;
|
||||||
|
SequenceNumber snapshot = versions_->LastSequence();
|
||||||
|
GetWithTimestampReadCallback read_cb(snapshot);
|
||||||
|
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
|
||||||
|
get_impl_options.column_family);
|
||||||
|
auto cfd = cfh->cfd();
|
||||||
if (tracer_) {
|
if (tracer_) {
|
||||||
InstrumentedMutexLock lock(&trace_mutex_);
|
InstrumentedMutexLock lock(&trace_mutex_);
|
||||||
if (tracer_) {
|
if (tracer_) {
|
||||||
tracer_->Get(column_family, key);
|
tracer_->Get(get_impl_options.column_family, key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Acquire SuperVersion
|
// Acquire SuperVersion
|
||||||
SuperVersion* super_version = GetAndRefSuperVersion(cfd);
|
SuperVersion* super_version = GetAndRefSuperVersion(cfd);
|
||||||
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||||
const Status s = FailIfReadCollapsedHistory(cfd, super_version,
|
s = FailIfReadCollapsedHistory(cfd, super_version,
|
||||||
*(read_options.timestamp));
|
*(read_options.timestamp));
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
ReturnAndCleanupSuperVersion(cfd, super_version);
|
ReturnAndCleanupSuperVersion(cfd, super_version);
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SequenceNumber snapshot = versions_->LastSequence();
|
|
||||||
GetWithTimestampReadCallback read_cb(snapshot);
|
|
||||||
MergeContext merge_context;
|
MergeContext merge_context;
|
||||||
SequenceNumber max_covering_tombstone_seq = 0;
|
SequenceNumber max_covering_tombstone_seq = 0;
|
||||||
Status s;
|
|
||||||
LookupKey lkey(key, snapshot, read_options.timestamp);
|
LookupKey lkey(key, snapshot, read_options.timestamp);
|
||||||
PERF_TIMER_STOP(get_snapshot_time);
|
PERF_TIMER_STOP(get_snapshot_time);
|
||||||
|
|
||||||
bool done = false;
|
bool done = false;
|
||||||
const Comparator* ucmp = column_family->GetComparator();
|
|
||||||
assert(ucmp);
|
// Look up starts here
|
||||||
std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr;
|
if (super_version->mem->Get(
|
||||||
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
|
lkey,
|
||||||
/*columns=*/nullptr, ts, &s, &merge_context,
|
get_impl_options.value ? get_impl_options.value->GetSelf() : nullptr,
|
||||||
&max_covering_tombstone_seq, read_options,
|
get_impl_options.columns, ts, &s, &merge_context,
|
||||||
false /* immutable_memtable */, &read_cb)) {
|
&max_covering_tombstone_seq, read_options,
|
||||||
|
false /* immutable_memtable */, &read_cb)) {
|
||||||
done = true;
|
done = true;
|
||||||
pinnable_val->PinSelf();
|
if (get_impl_options.value) {
|
||||||
|
get_impl_options.value->PinSelf();
|
||||||
|
}
|
||||||
RecordTick(stats_, MEMTABLE_HIT);
|
RecordTick(stats_, MEMTABLE_HIT);
|
||||||
} else if ((s.ok() || s.IsMergeInProgress()) &&
|
} else if ((s.ok() || s.IsMergeInProgress()) &&
|
||||||
super_version->imm->Get(
|
super_version->imm->Get(
|
||||||
lkey, pinnable_val->GetSelf(), /*columns=*/nullptr, ts, &s,
|
lkey,
|
||||||
&merge_context, &max_covering_tombstone_seq, read_options,
|
get_impl_options.value ? get_impl_options.value->GetSelf()
|
||||||
&read_cb)) {
|
: nullptr,
|
||||||
|
get_impl_options.columns, ts, &s, &merge_context,
|
||||||
|
&max_covering_tombstone_seq, read_options, &read_cb)) {
|
||||||
done = true;
|
done = true;
|
||||||
pinnable_val->PinSelf();
|
if (get_impl_options.value) {
|
||||||
|
get_impl_options.value->PinSelf();
|
||||||
|
}
|
||||||
RecordTick(stats_, MEMTABLE_HIT);
|
RecordTick(stats_, MEMTABLE_HIT);
|
||||||
}
|
}
|
||||||
if (!done && !s.ok() && !s.IsMergeInProgress()) {
|
if (!done && !s.ok() && !s.IsMergeInProgress()) {
|
||||||
|
@ -456,8 +436,8 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
|
||||||
PERF_TIMER_GUARD(get_from_output_files_time);
|
PERF_TIMER_GUARD(get_from_output_files_time);
|
||||||
PinnedIteratorsManager pinned_iters_mgr;
|
PinnedIteratorsManager pinned_iters_mgr;
|
||||||
super_version->current->Get(
|
super_version->current->Get(
|
||||||
read_options, lkey, pinnable_val, /*columns=*/nullptr, ts, &s,
|
read_options, lkey, get_impl_options.value, get_impl_options.columns,
|
||||||
&merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr,
|
ts, &s, &merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr,
|
||||||
/*value_found*/ nullptr,
|
/*value_found*/ nullptr,
|
||||||
/*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb, /*is_blob*/ nullptr,
|
/*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb, /*is_blob*/ nullptr,
|
||||||
/*do_merge*/ true);
|
/*do_merge*/ true);
|
||||||
|
@ -467,7 +447,12 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
|
||||||
PERF_TIMER_GUARD(get_post_process_time);
|
PERF_TIMER_GUARD(get_post_process_time);
|
||||||
ReturnAndCleanupSuperVersion(cfd, super_version);
|
ReturnAndCleanupSuperVersion(cfd, super_version);
|
||||||
RecordTick(stats_, NUMBER_KEYS_READ);
|
RecordTick(stats_, NUMBER_KEYS_READ);
|
||||||
size_t size = pinnable_val->size();
|
size_t size = 0;
|
||||||
|
if (get_impl_options.value) {
|
||||||
|
size = get_impl_options.value->size();
|
||||||
|
} else if (get_impl_options.columns) {
|
||||||
|
size = get_impl_options.columns->serialized_size();
|
||||||
|
}
|
||||||
RecordTick(stats_, BYTES_READ, size);
|
RecordTick(stats_, BYTES_READ, size);
|
||||||
RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
|
RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
|
||||||
PERF_COUNTER_ADD(get_read_bytes, size);
|
PERF_COUNTER_ADD(get_read_bytes, size);
|
||||||
|
|
|
@ -85,8 +85,6 @@ class DBImplSecondary : public DBImpl {
|
||||||
bool error_if_data_exists_in_wals, uint64_t* = nullptr,
|
bool error_if_data_exists_in_wals, uint64_t* = nullptr,
|
||||||
RecoveryContext* recovery_ctx = nullptr) override;
|
RecoveryContext* recovery_ctx = nullptr) override;
|
||||||
|
|
||||||
// Implementations of the DB interface.
|
|
||||||
using DB::Get;
|
|
||||||
// Can return IOError due to files being deleted by the primary. To avoid
|
// Can return IOError due to files being deleted by the primary. To avoid
|
||||||
// IOError in this case, application can coordinate between primary and
|
// IOError in this case, application can coordinate between primary and
|
||||||
// secondaries so that primary will not delete files that are currently being
|
// secondaries so that primary will not delete files that are currently being
|
||||||
|
@ -96,17 +94,9 @@ class DBImplSecondary : public DBImpl {
|
||||||
// workaround, the secondaries can be opened with `max_open_files=-1` so that
|
// workaround, the secondaries can be opened with `max_open_files=-1` so that
|
||||||
// it eagerly keeps all talbe files open and is able to access the contents of
|
// it eagerly keeps all talbe files open and is able to access the contents of
|
||||||
// deleted files via prior open fd.
|
// deleted files via prior open fd.
|
||||||
Status Get(const ReadOptions& _read_options,
|
using DBImpl::GetImpl;
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
Status GetImpl(const ReadOptions& options, const Slice& key,
|
||||||
PinnableSlice* value) override;
|
GetImplOptions& get_impl_options) override;
|
||||||
|
|
||||||
Status Get(const ReadOptions& _read_options,
|
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
|
||||||
PinnableSlice* value, std::string* timestamp) override;
|
|
||||||
|
|
||||||
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
|
|
||||||
const Slice& key, PinnableSlice* value,
|
|
||||||
std::string* timestamp);
|
|
||||||
|
|
||||||
using DBImpl::NewIterator;
|
using DBImpl::NewIterator;
|
||||||
// Operations on the created iterators can return IOError due to files being
|
// Operations on the created iterators can return IOError due to files being
|
||||||
|
|
|
@ -164,12 +164,22 @@ TEST_F(DBSecondaryTest, ReopenAsSecondary) {
|
||||||
Reopen(options);
|
Reopen(options);
|
||||||
ASSERT_OK(Put("foo", "foo_value"));
|
ASSERT_OK(Put("foo", "foo_value"));
|
||||||
ASSERT_OK(Put("bar", "bar_value"));
|
ASSERT_OK(Put("bar", "bar_value"));
|
||||||
|
WideColumns columns{{kDefaultWideColumnName, "attr_default_val"},
|
||||||
|
{"attr_name1", "attr_value_1"},
|
||||||
|
{"attr_name2", "attr_value_2"}};
|
||||||
|
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), "baz",
|
||||||
|
columns));
|
||||||
ASSERT_OK(dbfull()->Flush(FlushOptions()));
|
ASSERT_OK(dbfull()->Flush(FlushOptions()));
|
||||||
Close();
|
Close();
|
||||||
|
|
||||||
ASSERT_OK(ReopenAsSecondary(options));
|
ASSERT_OK(ReopenAsSecondary(options));
|
||||||
ASSERT_EQ("foo_value", Get("foo"));
|
ASSERT_EQ("foo_value", Get("foo"));
|
||||||
ASSERT_EQ("bar_value", Get("bar"));
|
ASSERT_EQ("bar_value", Get("bar"));
|
||||||
|
PinnableWideColumns result;
|
||||||
|
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), "baz",
|
||||||
|
&result));
|
||||||
|
ASSERT_EQ(result.columns(), columns);
|
||||||
|
|
||||||
ReadOptions ropts;
|
ReadOptions ropts;
|
||||||
ropts.verify_checksums = true;
|
ropts.verify_checksums = true;
|
||||||
auto db1 = static_cast<DBImplSecondary*>(db_);
|
auto db1 = static_cast<DBImplSecondary*>(db_);
|
||||||
|
@ -182,13 +192,16 @@ TEST_F(DBSecondaryTest, ReopenAsSecondary) {
|
||||||
ASSERT_EQ("bar", iter->key().ToString());
|
ASSERT_EQ("bar", iter->key().ToString());
|
||||||
ASSERT_EQ("bar_value", iter->value().ToString());
|
ASSERT_EQ("bar_value", iter->value().ToString());
|
||||||
} else if (1 == count) {
|
} else if (1 == count) {
|
||||||
|
ASSERT_EQ("baz", iter->key().ToString());
|
||||||
|
ASSERT_EQ(columns, iter->columns());
|
||||||
|
} else if (2 == count) {
|
||||||
ASSERT_EQ("foo", iter->key().ToString());
|
ASSERT_EQ("foo", iter->key().ToString());
|
||||||
ASSERT_EQ("foo_value", iter->value().ToString());
|
ASSERT_EQ("foo_value", iter->value().ToString());
|
||||||
}
|
}
|
||||||
++count;
|
++count;
|
||||||
}
|
}
|
||||||
delete iter;
|
delete iter;
|
||||||
ASSERT_EQ(2, count);
|
ASSERT_EQ(3, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DBSecondaryTest, SimpleInternalCompaction) {
|
TEST_F(DBSecondaryTest, SimpleInternalCompaction) {
|
||||||
|
|
|
@ -208,6 +208,11 @@ TEST_F(DBWideBasicTest, PutEntity) {
|
||||||
ASSERT_OK(Flush());
|
ASSERT_OK(Flush());
|
||||||
|
|
||||||
verify();
|
verify();
|
||||||
|
|
||||||
|
// Reopen as Readonly DB and verify
|
||||||
|
Close();
|
||||||
|
ASSERT_OK(ReadOnlyReopen(options));
|
||||||
|
verify();
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DBWideBasicTest, PutEntityColumnFamily) {
|
TEST_F(DBWideBasicTest, PutEntityColumnFamily) {
|
||||||
|
|
|
@ -1060,7 +1060,11 @@ void StressTest::OperateDb(ThreadState* thread) {
|
||||||
|
|
||||||
i += batch_size - 1;
|
i += batch_size - 1;
|
||||||
} else if (FLAGS_use_get_entity) {
|
} else if (FLAGS_use_get_entity) {
|
||||||
|
ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
|
||||||
|
ThreadStatusUtil::SetThreadOperation(
|
||||||
|
ThreadStatus::OperationType::OP_GETENTITY);
|
||||||
TestGetEntity(thread, read_opts, rand_column_families, rand_keys);
|
TestGetEntity(thread, read_opts, rand_column_families, rand_keys);
|
||||||
|
ThreadStatusUtil::ResetThreadStatus();
|
||||||
} else if (FLAGS_use_multiget) {
|
} else if (FLAGS_use_multiget) {
|
||||||
// Leave room for one more iteration of the loop with a single key
|
// Leave room for one more iteration of the loop with a single key
|
||||||
// batch. This is to ensure that each thread does exactly the same
|
// batch. This is to ensure that each thread does exactly the same
|
||||||
|
|
|
@ -446,6 +446,8 @@ class Env : public Customizable {
|
||||||
kDBIterator = 5,
|
kDBIterator = 5,
|
||||||
kVerifyDBChecksum = 6,
|
kVerifyDBChecksum = 6,
|
||||||
kVerifyFileChecksums = 7,
|
kVerifyFileChecksums = 7,
|
||||||
|
kGetEntity = 8,
|
||||||
|
kMultiGetEntity = 9,
|
||||||
kUnknown, // Keep last for easy array of non-unknowns
|
kUnknown, // Keep last for easy array of non-unknowns
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,8 @@ struct ThreadStatus {
|
||||||
OP_DBITERATOR,
|
OP_DBITERATOR,
|
||||||
OP_VERIFY_DB_CHECKSUM,
|
OP_VERIFY_DB_CHECKSUM,
|
||||||
OP_VERIFY_FILE_CHECKSUMS,
|
OP_VERIFY_FILE_CHECKSUMS,
|
||||||
|
OP_GETENTITY,
|
||||||
|
OP_MULTIGETENTITY,
|
||||||
NUM_OP_TYPES
|
NUM_OP_TYPES
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,10 @@ Env::IOActivity ThreadStatusUtil::TEST_GetExpectedIOActivity(
|
||||||
return Env::IOActivity::kVerifyDBChecksum;
|
return Env::IOActivity::kVerifyDBChecksum;
|
||||||
case ThreadStatus::OperationType::OP_VERIFY_FILE_CHECKSUMS:
|
case ThreadStatus::OperationType::OP_VERIFY_FILE_CHECKSUMS:
|
||||||
return Env::IOActivity::kVerifyFileChecksums;
|
return Env::IOActivity::kVerifyFileChecksums;
|
||||||
|
case ThreadStatus::OperationType::OP_GETENTITY:
|
||||||
|
return Env::IOActivity::kGetEntity;
|
||||||
|
case ThreadStatus::OperationType::OP_MULTIGETENTITY:
|
||||||
|
return Env::IOActivity::kMultiGetEntity;
|
||||||
default:
|
default:
|
||||||
return Env::IOActivity::kUnknown;
|
return Env::IOActivity::kUnknown;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Add `GetEntity()` API for ReadOnly DB and Secondary DB.
|
|
@ -45,6 +45,8 @@ static OperationInfo global_operation_table[] = {
|
||||||
{ThreadStatus::OP_DBITERATOR, "DBIterator"},
|
{ThreadStatus::OP_DBITERATOR, "DBIterator"},
|
||||||
{ThreadStatus::OP_VERIFY_DB_CHECKSUM, "VerifyDBChecksum"},
|
{ThreadStatus::OP_VERIFY_DB_CHECKSUM, "VerifyDBChecksum"},
|
||||||
{ThreadStatus::OP_VERIFY_FILE_CHECKSUMS, "VerifyFileChecksums"},
|
{ThreadStatus::OP_VERIFY_FILE_CHECKSUMS, "VerifyFileChecksums"},
|
||||||
|
{ThreadStatus::OP_GETENTITY, "GetEntity"},
|
||||||
|
{ThreadStatus::OP_MULTIGETENTITY, "MultiGetEntity"},
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue