mirror of https://github.com/facebook/rocksdb.git
Counter for merge failure
Summary: With Merge returning bool, it can keep failing silently(eg. While faling to fetch timestamp in TTL). We need to detect this through a rocksdb counter which can get bumped whenever Merge returns false. This will also be super-useful for the mcrocksdb-counter service where Merge may fail. Added a counter NUMBER_MERGE_FAILURES and appropriately updated db/merge_helper.cc I felt that it would be better to directly add counter-bumping in Merge as a default function of MergeOperator class but user should not be aware of this, so this approach seems better to me. Test Plan: make all check Reviewers: dnicholas, haobo, dhruba, vamsi CC: leveldb Differential Revision: https://reviews.facebook.net/D12129
This commit is contained in:
parent
f5f1842282
commit
f1bf169484
|
@ -1915,7 +1915,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
|||
// object to minimize change to the existing flow. Turn out this
|
||||
// logic could also be nicely re-used for memtable flush purge
|
||||
// optimization in BuildTable.
|
||||
merge.MergeUntil(input.get(), prev_snapshot, bottommost_level);
|
||||
merge.MergeUntil(input.get(), prev_snapshot, bottommost_level,
|
||||
options_.statistics);
|
||||
current_entry_is_merging = true;
|
||||
if (merge.IsSuccess()) {
|
||||
// Successfully found Put/Delete/(end-of-key-range) while merging
|
||||
|
|
|
@ -169,6 +169,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
|||
assert(merge_operator);
|
||||
if (!merge_operator->Merge(key.user_key(), &v, *operands,
|
||||
value, logger.get())) {
|
||||
RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
|
||||
*s = Status::Corruption("Error: Could not perform merge.");
|
||||
}
|
||||
} else {
|
||||
|
@ -182,6 +183,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
|||
*s = Status::OK();
|
||||
if (!merge_operator->Merge(key.user_key(), nullptr, *operands,
|
||||
value, logger.get())) {
|
||||
RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
|
||||
*s = Status::Corruption("Error: Could not perform merge.");
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -8,10 +8,10 @@
|
|||
#include <string>
|
||||
#include <memory>
|
||||
#include <deque>
|
||||
#include "leveldb/db.h"
|
||||
#include "db/dbformat.h"
|
||||
#include "db/skiplist.h"
|
||||
#include "db/version_set.h"
|
||||
#include "leveldb/db.h"
|
||||
#include "leveldb/memtablerep.h"
|
||||
#include "util/arena_impl.h"
|
||||
|
||||
|
|
|
@ -21,17 +21,15 @@ using std::list;
|
|||
|
||||
// Increase reference count on all underling memtables
|
||||
void MemTableList::RefAll() {
|
||||
for (list<MemTable*>::iterator it = memlist_.begin();
|
||||
it != memlist_.end() ; ++it) {
|
||||
(*it)->Ref();
|
||||
for (auto &memtable : memlist_) {
|
||||
memtable->Ref();
|
||||
}
|
||||
}
|
||||
|
||||
// Drop reference count on all underling memtables
|
||||
void MemTableList::UnrefAll() {
|
||||
for (list<MemTable*>::iterator it = memlist_.begin();
|
||||
it != memlist_.end() ; ++it) {
|
||||
(*it)->Unref();
|
||||
for (auto &memtable : memlist_) {
|
||||
memtable->Unref();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -53,8 +51,7 @@ bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) {
|
|||
|
||||
// Returns the memtables that need to be flushed.
|
||||
void MemTableList::PickMemtablesToFlush(std::vector<MemTable*>* ret) {
|
||||
for (list<MemTable*>::reverse_iterator it = memlist_.rbegin();
|
||||
it != memlist_.rend(); it++) {
|
||||
for (auto it = memlist_.rbegin(); it != memlist_.rend(); it++) {
|
||||
MemTable* m = *it;
|
||||
if (!m->flush_in_progress_) {
|
||||
assert(!m->flush_completed_);
|
||||
|
@ -184,9 +181,8 @@ void MemTableList::Add(MemTable* m) {
|
|||
// Returns an estimate of the number of bytes of data in use.
|
||||
size_t MemTableList::ApproximateMemoryUsage() {
|
||||
size_t size = 0;
|
||||
for (list<MemTable*>::iterator it = memlist_.begin();
|
||||
it != memlist_.end(); ++it) {
|
||||
size += (*it)->ApproximateMemoryUsage();
|
||||
for (auto &memtable : memlist_) {
|
||||
size += memtable->ApproximateMemoryUsage();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
@ -197,9 +193,8 @@ size_t MemTableList::ApproximateMemoryUsage() {
|
|||
bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s,
|
||||
std::deque<std::string>* operands,
|
||||
const Options& options) {
|
||||
for (list<MemTable*>::iterator it = memlist_.begin();
|
||||
it != memlist_.end(); ++it) {
|
||||
if ((*it)->Get(key, value, s, operands, options)) {
|
||||
for (auto &memtable : memlist_) {
|
||||
if (memtable->Get(key, value, s, operands, options)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -207,9 +202,8 @@ bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s,
|
|||
}
|
||||
|
||||
void MemTableList::GetMemTables(std::vector<MemTable*>* output) {
|
||||
for (list<MemTable*>::iterator it = memlist_.begin();
|
||||
it != memlist_.end(); ++it) {
|
||||
output->push_back(*it);
|
||||
for (auto &memtable : memlist_) {
|
||||
output->push_back(memtable);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ namespace leveldb {
|
|||
// operands_ stores the list of merge operands encountered while merging.
|
||||
// keys_[i] corresponds to operands_[i] for each i.
|
||||
void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
bool at_bottom) {
|
||||
bool at_bottom, shared_ptr<Statistics> stats) {
|
||||
// Get a copy of the internal key, before it's invalidated by iter->Next()
|
||||
// Also maintain the list of merge operands seen.
|
||||
keys_.clear();
|
||||
|
@ -79,6 +79,8 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
|||
UpdateInternalKey(&key[0], key.size(),
|
||||
orig_ikey.sequence, orig_ikey.type);
|
||||
swap(operands_.back(), merge_result);
|
||||
} else {
|
||||
RecordTick(stats, NUMBER_MERGE_FAILURES);
|
||||
}
|
||||
|
||||
// move iter to the next entry (before doing anything else)
|
||||
|
@ -105,6 +107,8 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
|||
UpdateInternalKey(&key[0], key.size(),
|
||||
orig_ikey.sequence, orig_ikey.type);
|
||||
swap(operands_.back(), merge_result);
|
||||
} else {
|
||||
RecordTick(stats, NUMBER_MERGE_FAILURES);
|
||||
}
|
||||
|
||||
// move iter to the next entry
|
||||
|
@ -179,6 +183,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
|||
// The final value() is always stored in operands_.back()
|
||||
swap(operands_.back(),merge_result);
|
||||
} else {
|
||||
RecordTick(stats, NUMBER_MERGE_FAILURES);
|
||||
// Do nothing if not success_. Leave keys() and operands() as they are.
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
#include "db/dbformat.h"
|
||||
#include "leveldb/slice.h"
|
||||
#include "leveldb/statistics.h"
|
||||
#include <string>
|
||||
#include <deque>
|
||||
|
||||
|
@ -40,7 +41,7 @@ class MergeHelper {
|
|||
// at_bottom: (IN) true if the iterator covers the bottem level, which means
|
||||
// we could reach the start of the history of this user key.
|
||||
void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0,
|
||||
bool at_bottom = false);
|
||||
bool at_bottom = false, shared_ptr<Statistics> stats=nullptr);
|
||||
|
||||
// Query the merge result
|
||||
// These are valid until the next MergeUntil call
|
||||
|
|
|
@ -245,6 +245,7 @@ struct Saver {
|
|||
std::deque<std::string>* merge_operands; // the merge operations encountered
|
||||
Logger* logger;
|
||||
bool didIO; // did we do any disk io?
|
||||
shared_ptr<Statistics> statistics;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -287,6 +288,7 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
|
|||
s->state = kFound;
|
||||
if (!s->merge_operator->Merge(s->user_key, &v, *ops,
|
||||
s->value, s->logger)) {
|
||||
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
|
||||
s->state = kCorrupt;
|
||||
}
|
||||
} else {
|
||||
|
@ -301,6 +303,7 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
|
|||
s->state = kFound;
|
||||
if (!s->merge_operator->Merge(s->user_key, nullptr, *ops,
|
||||
s->value, s->logger)) {
|
||||
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
|
||||
s->state = kCorrupt;
|
||||
}
|
||||
} else {
|
||||
|
@ -391,6 +394,7 @@ void Version::Get(const ReadOptions& options,
|
|||
saver.merge_operands = operands;
|
||||
saver.logger = logger.get();
|
||||
saver.didIO = false;
|
||||
saver.statistics = db_options.statistics;
|
||||
|
||||
stats->seek_file = nullptr;
|
||||
stats->seek_file_level = -1;
|
||||
|
@ -517,6 +521,7 @@ void Version::Get(const ReadOptions& options,
|
|||
value, logger.get())) {
|
||||
*status = Status::OK();
|
||||
} else {
|
||||
RecordTick(db_options.statistics, NUMBER_MERGE_FAILURES);
|
||||
*status = Status::Corruption("could not perform end-of-key merge for ",
|
||||
user_key);
|
||||
}
|
||||
|
|
|
@ -60,7 +60,9 @@ enum Tickers {
|
|||
|
||||
NUMBER_FILTERED_DELETES = 21,
|
||||
|
||||
TICKER_ENUM_MAX = 22
|
||||
NUMBER_MERGE_FAILURES = 22,
|
||||
|
||||
TICKER_ENUM_MAX = 23
|
||||
};
|
||||
|
||||
const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
|
||||
|
@ -85,7 +87,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
|
|||
{ NUMBER_MULTIGET_CALLS, "rocksdb.number.multiget.get" },
|
||||
{ NUMBER_MULTIGET_KEYS_READ, "rocksdb.number.multiget.keys.read" },
|
||||
{ NUMBER_MULTIGET_BYTES_READ, "rocksdb.number.multiget.bytes.read" },
|
||||
{ NUMBER_FILTERED_DELETES, "rocksdb.number.deletes.filtered" }
|
||||
{ NUMBER_FILTERED_DELETES, "rocksdb.number.deletes.filtered" },
|
||||
{ NUMBER_MERGE_FAILURES, "rocksdb.number.merge.failures" }
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -268,17 +268,16 @@ class TtlMergeOperator : public MergeOperator {
|
|||
if (existing_value && existing_value->size() < ts_len) {
|
||||
Log(logger, "Error: Could not remove timestamp from existing value.");
|
||||
return false;
|
||||
// TODO: Change Merge semantics and add a counter here
|
||||
}
|
||||
|
||||
// Extract time-stamp from each operand to be passed to user_merge_op_
|
||||
std::deque<std::string> operands_without_ts;
|
||||
for (auto it = operands.begin(); it != operands.end(); ++it) {
|
||||
if (it->size() < ts_len) {
|
||||
for (const auto &operand : operands) {
|
||||
if (operand.size() < ts_len) {
|
||||
Log(logger, "Error: Could not remove timestamp from operand value.");
|
||||
return false;
|
||||
}
|
||||
operands_without_ts.push_back(it->substr(0, it->size() - ts_len));
|
||||
operands_without_ts.push_back(operand.substr(0, operand.size() - ts_len));
|
||||
}
|
||||
|
||||
// Apply the user merge operator (store result in *new_value)
|
||||
|
@ -316,7 +315,6 @@ class TtlMergeOperator : public MergeOperator {
|
|||
if (left_operand.size() < ts_len || right_operand.size() < ts_len) {
|
||||
Log(logger, "Error: Could not remove timestamp from value.");
|
||||
return false;
|
||||
//TODO: Change Merge semantics and add a counter here
|
||||
}
|
||||
|
||||
// Apply the user partial-merge operator (store result in *new_value)
|
||||
|
|
Loading…
Reference in New Issue