mirror of
https://github.com/facebook/rocksdb.git
synced 2024-12-04 20:02:50 +00:00
BlobDB: Remove the need to get sequence number per write
Summary: Previously we store sequence number range of each blob files, and use the sequence number range to check if the file can be possibly visible by a snapshot. But it adds complexity to the code, since the sequence number is only available after a write. (The current implementation get sequence number by calling GetLatestSequenceNumber(), which is wrong.) With the patch, we are not storing sequence number range, and check if snapshot_sequence < obsolete_sequence to decide if the file is visible by a snapshot (previously we check if first_sequence <= snapshot_sequence < obsolete_sequence). Closes https://github.com/facebook/rocksdb/pull/3274 Differential Revision: D6571497 Pulled By: yiwu-arbug fbshipit-source-id: ca06479dc1fcd8782f6525b62b7762cd47d61909
This commit is contained in:
parent
f7285ce5be
commit
1e21dcc833
|
@ -1682,12 +1682,6 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
|
|||
delete casted_s;
|
||||
}
|
||||
|
||||
bool DBImpl::HasActiveSnapshotInRange(SequenceNumber lower_bound,
|
||||
SequenceNumber upper_bound) {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return snapshots_.HasSnapshotInRange(lower_bound, upper_bound);
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
|
||||
TablePropertiesCollection* props) {
|
||||
|
|
|
@ -229,10 +229,6 @@ class DBImpl : public DB {
|
|||
|
||||
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override;
|
||||
|
||||
// Whether there is an active snapshot in range [lower_bound, upper_bound).
|
||||
bool HasActiveSnapshotInRange(SequenceNumber lower_bound,
|
||||
SequenceNumber upper_bound);
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
using DB::ResetStats;
|
||||
virtual Status ResetStats() override;
|
||||
|
|
|
@ -39,52 +39,6 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) {
|
|||
ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
|
||||
}
|
||||
|
||||
// Sequence number should be return through input write batch.
|
||||
TEST_P(DBWriteTest, ReturnSeuqneceNumber) {
|
||||
Random rnd(4422);
|
||||
Open();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
WriteBatch batch;
|
||||
batch.Put("key" + ToString(i), test::RandomHumanReadableString(&rnd, 10));
|
||||
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(),
|
||||
WriteBatchInternal::Sequence(&batch));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) {
|
||||
constexpr size_t kThreads = 16;
|
||||
constexpr size_t kNumKeys = 1000;
|
||||
Open();
|
||||
ASSERT_EQ(0, dbfull()->GetLatestSequenceNumber());
|
||||
// Check each sequence is used once and only once.
|
||||
std::vector<std::atomic_flag> flags(kNumKeys * kThreads + 1);
|
||||
for (size_t i = 0; i < flags.size(); i++) {
|
||||
flags[i].clear();
|
||||
}
|
||||
auto writer = [&](size_t id) {
|
||||
Random rnd(4422 + static_cast<uint32_t>(id));
|
||||
for (size_t k = 0; k < kNumKeys; k++) {
|
||||
WriteBatch batch;
|
||||
batch.Put("key" + ToString(id) + "-" + ToString(k),
|
||||
test::RandomHumanReadableString(&rnd, 10));
|
||||
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
|
||||
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
|
||||
ASSERT_GT(sequence, 0);
|
||||
ASSERT_LE(sequence, kNumKeys * kThreads);
|
||||
// The sequence isn't consumed by someone else.
|
||||
ASSERT_FALSE(flags[sequence].test_and_set());
|
||||
}
|
||||
};
|
||||
std::vector<port::Thread> threads;
|
||||
for (size_t i = 0; i < kThreads; i++) {
|
||||
threads.emplace_back(writer, i);
|
||||
}
|
||||
for (size_t i = 0; i < kThreads; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
|
||||
constexpr int kNumThreads = 5;
|
||||
std::unique_ptr<FaultInjectionTestEnv> mock_env(
|
||||
|
|
|
@ -108,22 +108,6 @@ class SnapshotList {
|
|||
return ret;
|
||||
}
|
||||
|
||||
// Whether there is an active snapshot in range [lower_bound, upper_bound).
|
||||
bool HasSnapshotInRange(SequenceNumber lower_bound,
|
||||
SequenceNumber upper_bound) {
|
||||
if (empty()) {
|
||||
return false;
|
||||
}
|
||||
const SnapshotImpl* s = &list_;
|
||||
while (s->next_ != &list_) {
|
||||
if (s->next_->number_ >= lower_bound) {
|
||||
return s->next_->number_ < upper_bound;
|
||||
}
|
||||
s = s->next_;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// get the sequence number of the most recent snapshot
|
||||
SequenceNumber GetNewest() {
|
||||
if (empty()) {
|
||||
|
|
|
@ -571,18 +571,14 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
|
|||
const WriteOptions& options_;
|
||||
BlobDBImpl* blob_db_impl_;
|
||||
uint32_t default_cf_id_;
|
||||
SequenceNumber sequence_;
|
||||
WriteBatch batch_;
|
||||
|
||||
public:
|
||||
BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
|
||||
uint32_t default_cf_id, SequenceNumber seq)
|
||||
uint32_t default_cf_id)
|
||||
: options_(options),
|
||||
blob_db_impl_(blob_db_impl),
|
||||
default_cf_id_(default_cf_id),
|
||||
sequence_(seq) {}
|
||||
|
||||
SequenceNumber sequence() { return sequence_; }
|
||||
default_cf_id_(default_cf_id) {}
|
||||
|
||||
WriteBatch* batch() { return &batch_; }
|
||||
|
||||
|
@ -597,8 +593,7 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
|
|||
uint64_t expiration =
|
||||
blob_db_impl_->ExtractExpiration(key, value, &value_slice, &new_value);
|
||||
Status s = blob_db_impl_->PutBlobValue(options_, key, value_slice,
|
||||
expiration, sequence_, &batch_);
|
||||
sequence_++;
|
||||
expiration, &batch_);
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -609,7 +604,6 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
|
|||
"Blob DB doesn't support non-default column family.");
|
||||
}
|
||||
Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
|
||||
sequence_++;
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -621,7 +615,6 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
|
|||
}
|
||||
Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
|
||||
begin_key, end_key);
|
||||
sequence_++;
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -643,12 +636,8 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
|
|||
RecordTick(statistics_, BLOB_DB_NUM_WRITE);
|
||||
uint32_t default_cf_id =
|
||||
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
||||
// TODO(yiwu): In case there are multiple writers the latest sequence would
|
||||
// not be the actually sequence we are writting. Need to get the sequence
|
||||
// from write batch after DB write instead.
|
||||
SequenceNumber current_seq = GetLatestSequenceNumber() + 1;
|
||||
Status s;
|
||||
BlobInserter blob_inserter(options, this, default_cf_id, current_seq);
|
||||
BlobInserter blob_inserter(options, this, default_cf_id);
|
||||
{
|
||||
// Release write_mutex_ before DB write to avoid race condition with
|
||||
// flush begin listener, which also require write_mutex_ to sync
|
||||
|
@ -693,6 +682,8 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
|
|||
|
||||
if (bdb_options_.enable_garbage_collection) {
|
||||
// add deleted key to list of keys that have been deleted for book-keeping
|
||||
SequenceNumber current_seq =
|
||||
WriteBatchInternal::Sequence(blob_inserter.batch());
|
||||
DeleteBookkeeper delete_bookkeeper(this, current_seq);
|
||||
s = updates->Iterate(&delete_bookkeeper);
|
||||
}
|
||||
|
@ -761,11 +752,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
|
|||
// flush begin listener, which also require write_mutex_ to sync
|
||||
// blob files.
|
||||
MutexLock l(&write_mutex_);
|
||||
// TODO(yiwu): In case there are multiple writers the latest sequence would
|
||||
// not be the actually sequence we are writting. Need to get the sequence
|
||||
// from write batch after DB write instead.
|
||||
SequenceNumber sequence = GetLatestSequenceNumber() + 1;
|
||||
s = PutBlobValue(options, key, value, expiration, sequence, &batch);
|
||||
s = PutBlobValue(options, key, value, expiration, &batch);
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = db_->Write(options, &batch);
|
||||
|
@ -776,7 +763,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
|
|||
|
||||
Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
|
||||
const Slice& value, uint64_t expiration,
|
||||
SequenceNumber sequence, WriteBatch* batch) {
|
||||
WriteBatch* batch) {
|
||||
Status s;
|
||||
std::string index_entry;
|
||||
uint32_t column_family_id =
|
||||
|
@ -817,7 +804,6 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
|
|||
}
|
||||
|
||||
if (s.ok()) {
|
||||
bfile->ExtendSequenceRange(sequence);
|
||||
if (expiration != kNoExpiration) {
|
||||
bfile->ExtendExpirationRange(expiration);
|
||||
}
|
||||
|
@ -898,7 +884,7 @@ bool BlobDBImpl::EvictOldestBlobFile() {
|
|||
total_blob_space_.load(), bdb_options_.blob_dir_size,
|
||||
oldest_file->BlobFileNumber(), expiration_range.first,
|
||||
expiration_range.second);
|
||||
oldest_file->MarkObsolete(oldest_file->GetSequenceRange().second);
|
||||
oldest_file->MarkObsolete(GetLatestSequenceNumber());
|
||||
obsolete_files_.push_back(oldest_file);
|
||||
oldest_file_evicted_.store(true);
|
||||
RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
|
||||
|
@ -1271,9 +1257,26 @@ Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
|
|||
bool BlobDBImpl::VisibleToActiveSnapshot(
|
||||
const std::shared_ptr<BlobFile>& bfile) {
|
||||
assert(bfile->Obsolete());
|
||||
SequenceNumber first_sequence = bfile->GetSequenceRange().first;
|
||||
|
||||
// We check whether the oldest snapshot is no less than the last sequence
|
||||
// by the time the blob file become obsolete. If so, the blob file is not
|
||||
// visible to all existing snapshots.
|
||||
//
|
||||
// If we keep track of the earliest sequence of the keys in the blob file,
|
||||
// we could instead check if there's a snapshot falls in range
|
||||
// [earliest_sequence, obsolete_sequence). But doing so will make the
|
||||
// implementation more complicated.
|
||||
SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
|
||||
return db_impl_->HasActiveSnapshotInRange(first_sequence, obsolete_sequence);
|
||||
SequenceNumber oldest_snapshot = 0;
|
||||
{
|
||||
// Need to lock DBImpl mutex before access snapshot list.
|
||||
InstrumentedMutexLock l(db_impl_->mutex());
|
||||
auto snapshots = db_impl_->snapshots();
|
||||
if (!snapshots.empty()) {
|
||||
oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
|
||||
}
|
||||
}
|
||||
return oldest_snapshot < obsolete_sequence;
|
||||
}
|
||||
|
||||
bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
|
||||
|
@ -1757,8 +1760,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|||
&rewrite_batch, &callback);
|
||||
}
|
||||
if (rewrite_status.ok()) {
|
||||
newfile->ExtendSequenceRange(
|
||||
WriteBatchInternal::Sequence(&rewrite_batch));
|
||||
gc_stats->num_keys_relocated++;
|
||||
gc_stats->bytes_relocated += record.record_size();
|
||||
} else if (rewrite_status.IsBusy()) {
|
||||
|
@ -1775,10 +1776,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|||
} // end of ReadRecord loop
|
||||
|
||||
if (s.ok()) {
|
||||
SequenceNumber obsolete_sequence =
|
||||
newfile == nullptr ? bfptr->GetSequenceRange().second + 1
|
||||
: newfile->GetSequenceRange().second;
|
||||
bfptr->MarkObsolete(obsolete_sequence);
|
||||
bfptr->MarkObsolete(GetLatestSequenceNumber());
|
||||
if (!first_gc) {
|
||||
WriteLock wl(&mutex_);
|
||||
obsolete_files_.push_back(bfptr);
|
||||
|
|
|
@ -284,7 +284,7 @@ class BlobDBImpl : public BlobDB {
|
|||
|
||||
Status PutBlobValue(const WriteOptions& options, const Slice& key,
|
||||
const Slice& value, uint64_t expiration,
|
||||
SequenceNumber sequence, WriteBatch* batch);
|
||||
WriteBatch* batch);
|
||||
|
||||
Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
||||
const std::string& headerbuf, const Slice& key,
|
||||
|
|
|
@ -901,20 +901,18 @@ TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
|
|||
ASSERT_EQ(1, gc_stats.blob_count);
|
||||
if (delete_key) {
|
||||
ASSERT_EQ(0, gc_stats.num_keys_relocated);
|
||||
ASSERT_EQ(bfile->GetSequenceRange().second + 1,
|
||||
bfile->GetObsoleteSequence());
|
||||
} else {
|
||||
ASSERT_EQ(1, gc_stats.num_keys_relocated);
|
||||
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
|
||||
bfile->GetObsoleteSequence());
|
||||
}
|
||||
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
|
||||
bfile->GetObsoleteSequence());
|
||||
if (i == 3) {
|
||||
snapshot = blob_db_->GetSnapshot();
|
||||
}
|
||||
size_t num_files = delete_key ? 3 : 4;
|
||||
ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
||||
if (i == 0 || i == 3 || (i == 2 && delete_key)) {
|
||||
if (i == 3) {
|
||||
// The snapshot shouldn't see data in bfile
|
||||
ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||
blob_db_->ReleaseSnapshot(snapshot);
|
||||
|
@ -1111,10 +1109,6 @@ TEST_F(BlobDBTest, InlineSmallValues) {
|
|||
Open(bdb_options, options);
|
||||
std::map<std::string, std::string> data;
|
||||
std::map<std::string, KeyVersion> versions;
|
||||
SequenceNumber first_non_ttl_seq = kMaxSequenceNumber;
|
||||
SequenceNumber first_ttl_seq = kMaxSequenceNumber;
|
||||
SequenceNumber last_non_ttl_seq = 0;
|
||||
SequenceNumber last_ttl_seq = 0;
|
||||
for (size_t i = 0; i < 1000; i++) {
|
||||
bool is_small_value = rnd.Next() % 2;
|
||||
bool has_ttl = rnd.Next() % 2;
|
||||
|
@ -1134,15 +1128,6 @@ TEST_F(BlobDBTest, InlineSmallValues) {
|
|||
versions[key] =
|
||||
KeyVersion(key, value, sequence,
|
||||
(is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
|
||||
if (!is_small_value) {
|
||||
if (!has_ttl) {
|
||||
first_non_ttl_seq = std::min(first_non_ttl_seq, sequence);
|
||||
last_non_ttl_seq = std::max(last_non_ttl_seq, sequence);
|
||||
} else {
|
||||
first_ttl_seq = std::min(first_ttl_seq, sequence);
|
||||
last_ttl_seq = std::max(last_ttl_seq, sequence);
|
||||
}
|
||||
}
|
||||
}
|
||||
VerifyDB(data);
|
||||
VerifyBaseDB(versions);
|
||||
|
@ -1159,11 +1144,7 @@ TEST_F(BlobDBTest, InlineSmallValues) {
|
|||
ttl_file = blob_files[1];
|
||||
}
|
||||
ASSERT_FALSE(non_ttl_file->HasTTL());
|
||||
ASSERT_EQ(first_non_ttl_seq, non_ttl_file->GetSequenceRange().first);
|
||||
ASSERT_EQ(last_non_ttl_seq, non_ttl_file->GetSequenceRange().second);
|
||||
ASSERT_TRUE(ttl_file->HasTTL());
|
||||
ASSERT_EQ(first_ttl_seq, ttl_file->GetSequenceRange().first);
|
||||
ASSERT_EQ(last_ttl_seq, ttl_file->GetSequenceRange().second);
|
||||
}
|
||||
|
||||
TEST_F(BlobDBTest, CompactionFilterNotSupported) {
|
||||
|
|
|
@ -142,8 +142,6 @@ Status BlobDumpTool::DumpBlobLogFooter(uint64_t file_size,
|
|||
fprintf(stdout, " Blob count : %" PRIu64 "\n", footer.blob_count);
|
||||
fprintf(stdout, " Expiration Range : %s\n",
|
||||
GetString(footer.expiration_range).c_str());
|
||||
fprintf(stdout, " Sequence Range : %s\n",
|
||||
GetString(footer.sequence_range).c_str());
|
||||
return s;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,6 @@ BlobFile::BlobFile()
|
|||
obsolete_(false),
|
||||
gc_once_after_open_(false),
|
||||
expiration_range_({0, 0}),
|
||||
sequence_range_({kMaxSequenceNumber, 0}),
|
||||
last_access_(-1),
|
||||
last_fsync_(0),
|
||||
header_valid_(false),
|
||||
|
@ -67,7 +66,6 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
|
|||
obsolete_(false),
|
||||
gc_once_after_open_(false),
|
||||
expiration_range_({0, 0}),
|
||||
sequence_range_({kMaxSequenceNumber, 0}),
|
||||
last_access_(-1),
|
||||
last_fsync_(0),
|
||||
header_valid_(false),
|
||||
|
@ -116,12 +114,11 @@ std::string BlobFile::DumpState() const {
|
|||
" file_size: %" PRIu64 " deleted_count: %" PRIu64
|
||||
" deleted_size: %" PRIu64
|
||||
" closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64
|
||||
") sequence_range: (%" PRIu64 " %" PRIu64 "), writer: %d reader: %d",
|
||||
"), writer: %d reader: %d",
|
||||
path_to_dir_.c_str(), file_number_, blob_count_.load(),
|
||||
gc_epoch_.load(), file_size_.load(), deleted_count_, deleted_size_,
|
||||
closed_.load(), obsolete_.load(), expiration_range_.first,
|
||||
expiration_range_.second, sequence_range_.first,
|
||||
sequence_range_.second, (!!log_writer_), (!!ra_file_reader_));
|
||||
expiration_range_.second, (!!log_writer_), (!!ra_file_reader_));
|
||||
return str;
|
||||
}
|
||||
|
||||
|
@ -144,8 +141,6 @@ Status BlobFile::WriteFooterAndCloseLocked() {
|
|||
footer.expiration_range = expiration_range_;
|
||||
}
|
||||
|
||||
footer.sequence_range = sequence_range_;
|
||||
|
||||
// this will close the file and reset the Writable File Pointer.
|
||||
Status s = log_writer_->AppendFooter(footer);
|
||||
if (s.ok()) {
|
||||
|
@ -185,7 +180,6 @@ Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
|
|||
last_fsync_.store(file_size_);
|
||||
blob_count_ = footer.blob_count;
|
||||
expiration_range_ = footer.expiration_range;
|
||||
sequence_range_ = footer.sequence_range;
|
||||
closed_ = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
|
@ -84,8 +84,6 @@ class BlobFile {
|
|||
|
||||
ExpirationRange expiration_range_;
|
||||
|
||||
SequenceRange sequence_range_;
|
||||
|
||||
// Sequential/Append writer for blobs
|
||||
std::shared_ptr<Writer> log_writer_;
|
||||
|
||||
|
@ -177,17 +175,6 @@ class BlobFile {
|
|||
expiration_range_.second = std::max(expiration_range_.second, expiration);
|
||||
}
|
||||
|
||||
SequenceRange GetSequenceRange() const { return sequence_range_; }
|
||||
|
||||
void SetSequenceRange(SequenceRange sequence_range) {
|
||||
sequence_range_ = sequence_range;
|
||||
}
|
||||
|
||||
void ExtendSequenceRange(SequenceNumber sequence) {
|
||||
sequence_range_.first = std::min(sequence_range_.first, sequence);
|
||||
sequence_range_.second = std::max(sequence_range_.second, sequence);
|
||||
}
|
||||
|
||||
bool HasTTL() const { return has_ttl_; }
|
||||
|
||||
void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; }
|
||||
|
|
|
@ -67,8 +67,6 @@ void BlobLogFooter::EncodeTo(std::string* dst) {
|
|||
PutFixed64(dst, blob_count);
|
||||
PutFixed64(dst, expiration_range.first);
|
||||
PutFixed64(dst, expiration_range.second);
|
||||
PutFixed64(dst, sequence_range.first);
|
||||
PutFixed64(dst, sequence_range.second);
|
||||
crc = crc32c::Value(dst->c_str(), dst->size());
|
||||
crc = crc32c::Mask(crc);
|
||||
PutFixed32(dst, crc);
|
||||
|
@ -82,14 +80,12 @@ Status BlobLogFooter::DecodeFrom(Slice src) {
|
|||
"Unexpected blob file footer size");
|
||||
}
|
||||
uint32_t src_crc = 0;
|
||||
src_crc = crc32c::Value(src.data(), BlobLogFooter::kSize - 4);
|
||||
src_crc = crc32c::Value(src.data(), BlobLogFooter::kSize - sizeof(uint32_t));
|
||||
src_crc = crc32c::Mask(src_crc);
|
||||
uint32_t magic_number;
|
||||
if (!GetFixed32(&src, &magic_number) || !GetFixed64(&src, &blob_count) ||
|
||||
!GetFixed64(&src, &expiration_range.first) ||
|
||||
!GetFixed64(&src, &expiration_range.second) ||
|
||||
!GetFixed64(&src, &sequence_range.first) ||
|
||||
!GetFixed64(&src, &sequence_range.second) || !GetFixed32(&src, &crc)) {
|
||||
!GetFixed64(&src, &expiration_range.second) || !GetFixed32(&src, &crc)) {
|
||||
return Status::Corruption(kErrorMessage, "Error decoding content");
|
||||
}
|
||||
if (magic_number != kMagicNumber) {
|
||||
|
|
|
@ -24,7 +24,6 @@ constexpr uint32_t kVersion1 = 1;
|
|||
constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
using ExpirationRange = std::pair<uint64_t, uint64_t>;
|
||||
using SequenceRange = std::pair<uint64_t, uint64_t>;
|
||||
|
||||
// Format of blob log file header (30 bytes):
|
||||
//
|
||||
|
@ -53,24 +52,23 @@ struct BlobLogHeader {
|
|||
Status DecodeFrom(Slice slice);
|
||||
};
|
||||
|
||||
// Format of blob log file footer (48 bytes):
|
||||
// Format of blob log file footer (32 bytes):
|
||||
//
|
||||
// +--------------+------------+-------------------+-------------------+------------+
|
||||
// | magic number | blob count | expiration range | sequence range | footer CRC |
|
||||
// +--------------+------------+-------------------+-------------------+------------+
|
||||
// | Fixed32 | Fixed64 | Fixed64 + Fixed64 | Fixed64 + Fixed64 | Fixed32 |
|
||||
// +--------------+------------+-------------------+-------------------+------------+
|
||||
// +--------------+------------+-------------------+------------+
|
||||
// | magic number | blob count | expiration range | footer CRC |
|
||||
// +--------------+------------+-------------------+------------+
|
||||
// | Fixed32 | Fixed64 | Fixed64 + Fixed64 | Fixed32 |
|
||||
// +--------------+------------+-------------------+------------+
|
||||
//
|
||||
// The footer will be presented only when the blob file is properly closed.
|
||||
//
|
||||
// Unlike the same field in file header, expiration range in the footer is the
|
||||
// range of smallest and largest expiration of the data in this file.
|
||||
struct BlobLogFooter {
|
||||
static constexpr size_t kSize = 48;
|
||||
static constexpr size_t kSize = 32;
|
||||
|
||||
uint64_t blob_count = 0;
|
||||
ExpirationRange expiration_range = std::make_pair(0, 0);
|
||||
SequenceRange sequence_range = std::make_pair(0, 0);
|
||||
uint32_t crc = 0;
|
||||
|
||||
void EncodeTo(std::string* dst);
|
||||
|
|
Loading…
Reference in a new issue