mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-26 07:30:54 +00:00
Update all blob db TTL and timestamps to uint64_t
Summary: The current blob db implementation use mix of int32_t, uint32_t and uint64_t for TTL and expiration. Update all timestamps to uint64_t for consistency. Closes https://github.com/facebook/rocksdb/pull/2683 Differential Revision: D5557103 Pulled By: yiwu-arbug fbshipit-source-id: e4eab2691629a755e614e8cf1eed9c3a681d0c42
This commit is contained in:
parent
5883a1ae24
commit
92afe830f9
|
@ -50,7 +50,7 @@ struct BlobDBOptions {
|
|||
// first bucket is 1471542000 - 1471542600
|
||||
// second bucket is 1471542600 - 1471543200
|
||||
// and so on
|
||||
uint32_t ttl_range_secs = 3600;
|
||||
uint64_t ttl_range_secs = 3600;
|
||||
|
||||
// at what bytes will the blob files be synced to blob log.
|
||||
uint64_t bytes_per_sync = 0;
|
||||
|
@ -97,21 +97,21 @@ class BlobDB : public StackableDB {
|
|||
|
||||
virtual Status PutWithTTL(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& value, int32_t ttl) = 0;
|
||||
const Slice& value, uint64_t ttl) = 0;
|
||||
|
||||
virtual Status PutWithTTL(const WriteOptions& options, const Slice& key,
|
||||
const Slice& value, int32_t ttl) {
|
||||
const Slice& value, uint64_t ttl) {
|
||||
return PutWithTTL(options, DefaultColumnFamily(), key, value, ttl);
|
||||
}
|
||||
|
||||
// Put with expiration. Key with expiration time equal to -1
|
||||
// means the key don't expire.
|
||||
// Put with expiration. Key with expiration time equal to
|
||||
// std::numeric_limits<uint64_t>::max() means the key don't expire.
|
||||
virtual Status PutUntil(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& value, int32_t expiration) = 0;
|
||||
const Slice& value, uint64_t expiration) = 0;
|
||||
|
||||
virtual Status PutUntil(const WriteOptions& options, const Slice& key,
|
||||
const Slice& value, int32_t expiration) {
|
||||
const Slice& value, uint64_t expiration) {
|
||||
return PutUntil(options, DefaultColumnFamily(), key, value, expiration);
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@
|
|||
namespace {
|
||||
int kBlockBasedTableVersionFormat = 2;
|
||||
|
||||
void extendTTL(rocksdb::blob_db::ttlrange_t* ttl_range, uint32_t ttl) {
|
||||
void extendTTL(rocksdb::blob_db::ttlrange_t* ttl_range, uint64_t ttl) {
|
||||
ttl_range->first = std::min(ttl_range->first, ttl);
|
||||
ttl_range->second = std::max(ttl_range->second, ttl);
|
||||
}
|
||||
|
@ -489,9 +489,8 @@ Status BlobDBImpl::OpenAllFiles() {
|
|||
ttl_range.first + (uint32_t)bdb_options_.ttl_range_secs);
|
||||
bfptr->set_ttl_range(ttl_range);
|
||||
|
||||
std::time_t epoch_now = std::chrono::system_clock::to_time_t(
|
||||
std::chrono::system_clock::now());
|
||||
if (ttl_range.second < epoch_now) {
|
||||
uint64_t now = EpochNow();
|
||||
if (ttl_range.second < now) {
|
||||
Status fstatus = CreateWriterLocked(bfptr);
|
||||
if (fstatus.ok()) fstatus = bfptr->WriteFooterAndCloseLocked();
|
||||
if (!fstatus.ok()) {
|
||||
|
@ -503,7 +502,7 @@ Status BlobDBImpl::OpenAllFiles() {
|
|||
} else {
|
||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||
"Blob File Closed: %s now: %d ttl_range: (%d, %d)",
|
||||
bfpath.c_str(), epoch_now, ttl_range.first,
|
||||
bfpath.c_str(), now, ttl_range.first,
|
||||
ttl_range.second);
|
||||
}
|
||||
} else {
|
||||
|
@ -591,7 +590,7 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
|
|||
}
|
||||
|
||||
std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
|
||||
uint32_t expiration) const {
|
||||
uint64_t expiration) const {
|
||||
if (open_blob_files_.empty()) return nullptr;
|
||||
|
||||
std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
|
||||
|
@ -684,7 +683,8 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
|
|||
return bfile;
|
||||
}
|
||||
|
||||
std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint32_t expiration) {
|
||||
std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
|
||||
assert(expiration != kNoExpiration);
|
||||
uint64_t epoch_read = 0;
|
||||
std::shared_ptr<BlobFile> bfile;
|
||||
{
|
||||
|
@ -698,9 +698,9 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint32_t expiration) {
|
|||
return bfile;
|
||||
}
|
||||
|
||||
uint32_t exp_low =
|
||||
uint64_t exp_low =
|
||||
(expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
|
||||
uint32_t exp_high = exp_low + bdb_options_.ttl_range_secs;
|
||||
uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
|
||||
ttlrange_t ttl_guess = std::make_pair(exp_low, exp_high);
|
||||
|
||||
bfile = NewBlobFile("SelectBlobFileTTL");
|
||||
|
@ -758,7 +758,7 @@ Status BlobDBImpl::Put(const WriteOptions& options,
|
|||
const Slice& value) {
|
||||
std::string new_value;
|
||||
Slice value_slice;
|
||||
int32_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
|
||||
uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
|
||||
return PutUntil(options, column_family, key, value_slice, expiration);
|
||||
}
|
||||
|
||||
|
@ -808,11 +808,11 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
|||
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
||||
const Slice& value_slice) override {
|
||||
Slice value_unc;
|
||||
int32_t expiration =
|
||||
uint64_t expiration =
|
||||
impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_);
|
||||
|
||||
std::shared_ptr<BlobFile> bfile =
|
||||
(expiration != -1)
|
||||
(expiration != kNoExpiration)
|
||||
? impl_->SelectBlobFileTTL(expiration)
|
||||
: ((last_file_) ? last_file_ : impl_->SelectBlobFile());
|
||||
if (last_file_ && last_file_ != bfile) {
|
||||
|
@ -840,8 +840,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
|||
sequence_++;
|
||||
}
|
||||
|
||||
if (expiration != -1) {
|
||||
extendTTL(&(bfile->ttl_range_), (uint32_t)expiration);
|
||||
if (expiration != kNoExpiration) {
|
||||
extendTTL(&(bfile->ttl_range_), expiration);
|
||||
}
|
||||
|
||||
if (!st.ok()) {
|
||||
|
@ -935,9 +935,10 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
|||
Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family,
|
||||
const Slice& key, const Slice& value,
|
||||
int32_t ttl) {
|
||||
return PutUntil(options, column_family, key, value,
|
||||
static_cast<int32_t>(EpochNow()) + ttl);
|
||||
uint64_t ttl) {
|
||||
uint64_t now = EpochNow();
|
||||
assert(std::numeric_limits<uint64_t>::max() - now > ttl);
|
||||
return PutUntil(options, column_family, key, value, now + ttl);
|
||||
}
|
||||
|
||||
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
||||
|
@ -952,15 +953,15 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
|||
return *compression_output;
|
||||
}
|
||||
|
||||
// TODO(yiwu): We should use uint64_t for expiration.
|
||||
Status BlobDBImpl::PutUntil(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& value_unc, int32_t expiration) {
|
||||
const Slice& value_unc, uint64_t expiration) {
|
||||
MutexLock l(&write_mutex_);
|
||||
UpdateWriteOptions(options);
|
||||
|
||||
std::shared_ptr<BlobFile> bfile =
|
||||
(expiration != -1) ? SelectBlobFileTTL(expiration) : SelectBlobFile();
|
||||
std::shared_ptr<BlobFile> bfile = (expiration != kNoExpiration)
|
||||
? SelectBlobFileTTL(expiration)
|
||||
: SelectBlobFile();
|
||||
|
||||
if (!bfile) return Status::NotFound("Blob file not found");
|
||||
|
||||
|
@ -1020,29 +1021,27 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options,
|
|||
bfile->DumpState().c_str());
|
||||
}
|
||||
|
||||
if (expiration != -1) extendTTL(&(bfile->ttl_range_), (uint32_t)expiration);
|
||||
if (expiration != kNoExpiration) {
|
||||
extendTTL(&(bfile->ttl_range_), expiration);
|
||||
}
|
||||
|
||||
CloseIf(bfile);
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
// TODO(yiwu): We should return uint64_t after updating the rest of the code
|
||||
// to use uint64_t for expiration.
|
||||
int32_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
|
||||
Slice* value_slice,
|
||||
std::string* new_value) {
|
||||
uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
|
||||
Slice* value_slice,
|
||||
std::string* new_value) {
|
||||
uint64_t expiration = kNoExpiration;
|
||||
bool has_expiration = false;
|
||||
bool value_changed = false;
|
||||
if (ttl_extractor_ != nullptr) {
|
||||
bool has_ttl = ttl_extractor_->ExtractExpiration(
|
||||
has_expiration = ttl_extractor_->ExtractExpiration(
|
||||
key, value, EpochNow(), &expiration, new_value, &value_changed);
|
||||
if (!has_ttl) {
|
||||
expiration = kNoExpiration;
|
||||
}
|
||||
}
|
||||
*value_slice = value_changed ? Slice(*new_value) : value;
|
||||
return (expiration == kNoExpiration) ? -1 : static_cast<int32_t>(expiration);
|
||||
return has_expiration ? expiration : kNoExpiration;
|
||||
}
|
||||
|
||||
Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
||||
|
@ -1847,11 +1846,11 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|||
// Ideally we should hold the lock during the entire function,
|
||||
// but under the asusmption that this is only called when a
|
||||
// file is Immutable, we can reduce the critical section
|
||||
bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, std::time_t tt,
|
||||
bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
|
||||
uint64_t last_id, std::string* reason) {
|
||||
if (bfile->HasTTL()) {
|
||||
ttlrange_t ttl_range = bfile->GetTTLRange();
|
||||
if (tt > ttl_range.second) {
|
||||
if (now > ttl_range.second) {
|
||||
*reason = "entire file ttl expired";
|
||||
return true;
|
||||
}
|
||||
|
@ -2057,8 +2056,7 @@ void BlobDBImpl::FilterSubsetOfFiles(
|
|||
// 100.0 / 15.0 = 7
|
||||
uint64_t next_epoch_increment = static_cast<uint64_t>(
|
||||
std::ceil(100 / static_cast<double>(kGCFilePercentage)));
|
||||
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
|
||||
std::time_t tt = std::chrono::system_clock::to_time_t(now);
|
||||
uint64_t now = EpochNow();
|
||||
|
||||
size_t files_processed = 0;
|
||||
for (auto bfile : blob_files) {
|
||||
|
@ -2081,18 +2079,20 @@ void BlobDBImpl::FilterSubsetOfFiles(
|
|||
if (bfile->Obsolete() || !bfile->Immutable()) continue;
|
||||
|
||||
std::string reason;
|
||||
bool shouldgc = ShouldGCFile(bfile, tt, last_id, &reason);
|
||||
bool shouldgc = ShouldGCFile(bfile, now, last_id, &reason);
|
||||
if (!shouldgc) {
|
||||
ROCKS_LOG_DEBUG(db_options_.info_log,
|
||||
"File has been skipped for GC ttl %s %d %d reason='%s'",
|
||||
bfile->PathName().c_str(), tt,
|
||||
"File has been skipped for GC ttl %s %" PRIu64 " %" PRIu64
|
||||
" reason='%s'",
|
||||
bfile->PathName().c_str(), now,
|
||||
bfile->GetTTLRange().second, reason.c_str());
|
||||
continue;
|
||||
}
|
||||
|
||||
ROCKS_LOG_INFO(db_options_.info_log,
|
||||
"File has been chosen for GC ttl %s %d %d reason='%s'",
|
||||
bfile->PathName().c_str(), tt, bfile->GetTTLRange().second,
|
||||
"File has been chosen for GC ttl %s %" PRIu64 " %" PRIu64
|
||||
" reason='%s'",
|
||||
bfile->PathName().c_str(), now, bfile->GetTTLRange().second,
|
||||
reason.c_str());
|
||||
to_process->push_back(bfile);
|
||||
}
|
||||
|
|
|
@ -202,9 +202,6 @@ class BlobDBImpl : public BlobDB {
|
|||
// how often to schedule check seq files period
|
||||
static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000;
|
||||
|
||||
static constexpr uint64_t kNoExpiration =
|
||||
std::numeric_limits<uint64_t>::max();
|
||||
|
||||
using rocksdb::StackableDB::Put;
|
||||
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, const Slice& value) override;
|
||||
|
@ -238,12 +235,12 @@ class BlobDBImpl : public BlobDB {
|
|||
using BlobDB::PutWithTTL;
|
||||
Status PutWithTTL(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& value, int32_t ttl) override;
|
||||
const Slice& value, uint64_t ttl) override;
|
||||
|
||||
using BlobDB::PutUntil;
|
||||
Status PutUntil(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& value_unc, int32_t expiration) override;
|
||||
const Slice& value_unc, uint64_t expiration) override;
|
||||
|
||||
Status LinkToBaseDB(DB* db) override;
|
||||
|
||||
|
@ -290,7 +287,7 @@ class BlobDBImpl : public BlobDB {
|
|||
// has expired or if threshold of the file has been evicted
|
||||
// tt - current time
|
||||
// last_id - the id of the non-TTL file to evict
|
||||
bool ShouldGCFile(std::shared_ptr<BlobFile> bfile, std::time_t tt,
|
||||
bool ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
|
||||
uint64_t last_id, std::string* reason);
|
||||
|
||||
// collect all the blob log files from the blob directory
|
||||
|
@ -299,8 +296,8 @@ class BlobDBImpl : public BlobDB {
|
|||
// appends a task into timer queue to close the file
|
||||
void CloseIf(const std::shared_ptr<BlobFile>& bfile);
|
||||
|
||||
int32_t ExtractExpiration(const Slice& key, const Slice& value,
|
||||
Slice* value_slice, std::string* new_value);
|
||||
uint64_t ExtractExpiration(const Slice& key, const Slice& value,
|
||||
Slice* value_slice, std::string* new_value);
|
||||
|
||||
Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
||||
const std::string& headerbuf, const Slice& key,
|
||||
|
@ -311,12 +308,12 @@ class BlobDBImpl : public BlobDB {
|
|||
|
||||
// find an existing blob log file based on the expiration unix epoch
|
||||
// if such a file does not exist, return nullptr
|
||||
std::shared_ptr<BlobFile> SelectBlobFileTTL(uint32_t expiration);
|
||||
std::shared_ptr<BlobFile> SelectBlobFileTTL(uint64_t expiration);
|
||||
|
||||
// find an existing blob log file to append the value to
|
||||
std::shared_ptr<BlobFile> SelectBlobFile();
|
||||
|
||||
std::shared_ptr<BlobFile> FindBlobFileLocked(uint32_t expiration) const;
|
||||
std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
|
||||
|
||||
void UpdateWriteOptions(const WriteOptions& options);
|
||||
|
||||
|
|
|
@ -63,7 +63,7 @@ class BlobDBTest : public testing::Test {
|
|||
}
|
||||
}
|
||||
|
||||
void PutRandomWithTTL(const std::string &key, int32_t ttl, Random *rnd,
|
||||
void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
|
||||
std::map<std::string, std::string> *data = nullptr) {
|
||||
int len = rnd->Next() % kMaxBlobSize + 1;
|
||||
std::string value = test::RandomHumanReadableString(rnd, len);
|
||||
|
@ -74,7 +74,7 @@ class BlobDBTest : public testing::Test {
|
|||
}
|
||||
}
|
||||
|
||||
void PutRandomUntil(const std::string &key, int32_t expiration, Random *rnd,
|
||||
void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
|
||||
std::map<std::string, std::string> *data = nullptr) {
|
||||
int len = rnd->Next() % kMaxBlobSize + 1;
|
||||
std::string value = test::RandomHumanReadableString(rnd, len);
|
||||
|
@ -136,7 +136,7 @@ class BlobDBTest : public testing::Test {
|
|||
|
||||
Random rnd(301);
|
||||
for (size_t i = 0; i < 100000; i++) {
|
||||
int32_t ttl = rnd.Next() % 86400;
|
||||
uint64_t ttl = rnd.Next() % 86400;
|
||||
PutRandomWithTTL("key" + ToString(i % 500), ttl, &rnd, nullptr);
|
||||
}
|
||||
|
||||
|
@ -175,7 +175,7 @@ TEST_F(BlobDBTest, PutWithTTL) {
|
|||
std::map<std::string, std::string> data;
|
||||
mock_env_->set_now_micros(50 * 1000000);
|
||||
for (size_t i = 0; i < 100; i++) {
|
||||
int32_t ttl = rnd.Next() % 100;
|
||||
uint64_t ttl = rnd.Next() % 100;
|
||||
PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
|
||||
(ttl < 50 ? nullptr : &data));
|
||||
}
|
||||
|
@ -204,7 +204,7 @@ TEST_F(BlobDBTest, PutUntil) {
|
|||
std::map<std::string, std::string> data;
|
||||
mock_env_->set_now_micros(50 * 1000000);
|
||||
for (size_t i = 0; i < 100; i++) {
|
||||
int32_t expiration = rnd.Next() % 100 + 50;
|
||||
uint64_t expiration = rnd.Next() % 100 + 50;
|
||||
PutRandomUntil("key" + ToString(i), expiration, &rnd,
|
||||
(expiration < 100 ? nullptr : &data));
|
||||
}
|
||||
|
|
|
@ -102,8 +102,8 @@ Status BlobDumpTool::DumpBlobLogHeader(uint64_t* offset) {
|
|||
return s;
|
||||
}
|
||||
fprintf(stdout, "Blob log header:\n");
|
||||
fprintf(stdout, " Magic Number : %u\n", header.magic_number());
|
||||
fprintf(stdout, " Version : %d\n", header.version());
|
||||
fprintf(stdout, " Magic Number : %" PRIu32 "\n", header.magic_number());
|
||||
fprintf(stdout, " Version : %" PRIu32 "\n", header.version());
|
||||
CompressionType compression = header.compression();
|
||||
std::string compression_str;
|
||||
if (!GetStringFromCompressionType(&compression_str, compression).ok()) {
|
||||
|
@ -175,13 +175,13 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
|
|||
}
|
||||
uint32_t key_size = record.GetKeySize();
|
||||
uint64_t blob_size = record.GetBlobSize();
|
||||
fprintf(stdout, " key size : %d\n", key_size);
|
||||
fprintf(stdout, " key size : %" PRIu32 "\n", key_size);
|
||||
fprintf(stdout, " blob size : %" PRIu64 "\n", record.GetBlobSize());
|
||||
fprintf(stdout, " TTL : %u\n", record.GetTTL());
|
||||
fprintf(stdout, " TTL : %" PRIu64 "\n", record.GetTTL());
|
||||
fprintf(stdout, " time : %" PRIu64 "\n", record.GetTimeVal());
|
||||
fprintf(stdout, " type : %d, %d\n", record.type(), record.subtype());
|
||||
fprintf(stdout, " header CRC : %u\n", record.header_checksum());
|
||||
fprintf(stdout, " CRC : %u\n", record.checksum());
|
||||
fprintf(stdout, " header CRC : %" PRIu32 "\n", record.header_checksum());
|
||||
fprintf(stdout, " CRC : %" PRIu32 "\n", record.checksum());
|
||||
uint32_t header_crc =
|
||||
crc32c::Extend(0, slice.data(), slice.size() - 2 * sizeof(uint32_t));
|
||||
*offset += BlobLogRecord::kHeaderSize;
|
||||
|
@ -213,7 +213,7 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
|
|||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
fprintf(stdout, " footer CRC : %u\n", record.footer_checksum());
|
||||
fprintf(stdout, " footer CRC : %" PRIu32 "\n", record.footer_checksum());
|
||||
fprintf(stdout, " sequence : %" PRIu64 "\n", record.GetSN());
|
||||
*offset += key_size + blob_size + BlobLogRecord::kFooterSize;
|
||||
return s;
|
||||
|
|
|
@ -94,8 +94,8 @@ std::string BlobFile::DumpState() const {
|
|||
"path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " gc_epoch: %" PRIu64
|
||||
" file_size: %" PRIu64 " deleted_count: %" PRIu64
|
||||
" deleted_size: %" PRIu64
|
||||
" closed: %d can_be_deleted: %d ttl_range: (%d, %d)"
|
||||
" sn_range: (%" PRIu64 " %" PRIu64 "), writer: %d reader: %d",
|
||||
" closed: %d can_be_deleted: %d ttl_range: (%" PRIu64 ", %" PRIu64
|
||||
") sn_range: (%" PRIu64 " %" PRIu64 "), writer: %d reader: %d",
|
||||
path_to_dir_.c_str(), file_number_, blob_count_.load(),
|
||||
gc_epoch_.load(), file_size_.load(), deleted_count_, deleted_size_,
|
||||
closed_.load(), can_be_deleted_.load(), ttl_range_.first,
|
||||
|
|
|
@ -61,8 +61,8 @@ Status BlobLogFooter::DecodeFrom(const Slice& input) {
|
|||
}
|
||||
|
||||
ttlrange_t temp_ttl;
|
||||
if (!GetFixed32(&slice, &temp_ttl.first) ||
|
||||
!GetFixed32(&slice, &temp_ttl.second)) {
|
||||
if (!GetFixed64(&slice, &temp_ttl.first) ||
|
||||
!GetFixed64(&slice, &temp_ttl.second)) {
|
||||
return Status::Corruption("Invalid Blob Footer: ttl_range");
|
||||
}
|
||||
if (has_ttl) {
|
||||
|
@ -108,11 +108,11 @@ void BlobLogFooter::EncodeTo(std::string* dst) const {
|
|||
bool has_ts = HasTimestamp();
|
||||
|
||||
if (has_ttl) {
|
||||
PutFixed32(dst, ttl_range_.get()->first);
|
||||
PutFixed32(dst, ttl_range_.get()->second);
|
||||
PutFixed64(dst, ttl_range_.get()->first);
|
||||
PutFixed64(dst, ttl_range_.get()->second);
|
||||
} else {
|
||||
PutFixed32(dst, 0);
|
||||
PutFixed32(dst, 0);
|
||||
PutFixed64(dst, 0);
|
||||
PutFixed64(dst, 0);
|
||||
}
|
||||
PutFixed64(dst, sn_range_.first);
|
||||
PutFixed64(dst, sn_range_.second);
|
||||
|
@ -149,11 +149,11 @@ void BlobLogHeader::EncodeTo(std::string* dst) const {
|
|||
PutFixed32(dst, val);
|
||||
|
||||
if (has_ttl) {
|
||||
PutFixed32(dst, ttl_guess_.get()->first);
|
||||
PutFixed32(dst, ttl_guess_.get()->second);
|
||||
PutFixed64(dst, ttl_guess_.get()->first);
|
||||
PutFixed64(dst, ttl_guess_.get()->second);
|
||||
} else {
|
||||
PutFixed32(dst, 0);
|
||||
PutFixed32(dst, 0);
|
||||
PutFixed64(dst, 0);
|
||||
PutFixed64(dst, 0);
|
||||
}
|
||||
|
||||
if (has_ts) {
|
||||
|
@ -199,11 +199,13 @@ Status BlobLogHeader::DecodeFrom(const Slice& input) {
|
|||
}
|
||||
|
||||
ttlrange_t temp_ttl;
|
||||
if (!GetFixed32(&slice, &temp_ttl.first) ||
|
||||
!GetFixed32(&slice, &temp_ttl.second)) {
|
||||
if (!GetFixed64(&slice, &temp_ttl.first) ||
|
||||
!GetFixed64(&slice, &temp_ttl.second)) {
|
||||
return Status::Corruption("Invalid Blob Log Header: ttl");
|
||||
}
|
||||
if (has_ttl) set_ttl_guess(temp_ttl);
|
||||
if (has_ttl) {
|
||||
set_ttl_guess(temp_ttl);
|
||||
}
|
||||
|
||||
tsrange_t temp_ts;
|
||||
if (!GetFixed64(&slice, &temp_ts.first) ||
|
||||
|
@ -265,7 +267,7 @@ Status BlobLogRecord::DecodeHeaderFrom(const Slice& hdrslice) {
|
|||
if (!GetFixed64(&input, &blob_size_)) {
|
||||
return Status::Corruption("Invalid Blob Record Header: blob_size");
|
||||
}
|
||||
if (!GetFixed32(&input, &ttl_val_)) {
|
||||
if (!GetFixed64(&input, &ttl_val_)) {
|
||||
return Status::Corruption("Invalid Blob Record Header: ttl_val");
|
||||
}
|
||||
if (!GetFixed64(&input, &time_val_)) {
|
||||
|
|
|
@ -25,6 +25,8 @@ namespace blob_db {
|
|||
class BlobFile;
|
||||
class BlobDBImpl;
|
||||
|
||||
constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
enum RecordType : uint8_t {
|
||||
// Zero is reserved for preallocated files
|
||||
kFullType = 0,
|
||||
|
@ -46,9 +48,9 @@ extern const uint32_t kMagicNumber;
|
|||
|
||||
class Reader;
|
||||
|
||||
typedef std::pair<uint32_t, uint32_t> ttlrange_t;
|
||||
typedef std::pair<uint64_t, uint64_t> tsrange_t;
|
||||
typedef std::pair<rocksdb::SequenceNumber, rocksdb::SequenceNumber> snrange_t;
|
||||
using ttlrange_t = std::pair<uint64_t, uint64_t>;
|
||||
using tsrange_t = std::pair<uint64_t, uint64_t>;
|
||||
using snrange_t = std::pair<rocksdb::SequenceNumber, rocksdb::SequenceNumber>;
|
||||
|
||||
class BlobLogHeader {
|
||||
friend class BlobFile;
|
||||
|
@ -71,8 +73,8 @@ class BlobLogHeader {
|
|||
void set_ts_guess(const tsrange_t& ts) { ts_guess_.reset(new tsrange_t(ts)); }
|
||||
|
||||
public:
|
||||
// magic number + version + flags + ttl guess + timestamp range = 36
|
||||
static const size_t kHeaderSize = 4 + 4 + 4 + 4 * 2 + 8 * 2;
|
||||
// magic number + version + flags + ttl guess + timestamp range = 44
|
||||
static const size_t kHeaderSize = 4 + 4 + 4 + 8 * 2 + 8 * 2;
|
||||
|
||||
void EncodeTo(std::string* dst) const;
|
||||
|
||||
|
@ -100,9 +102,9 @@ class BlobLogHeader {
|
|||
return *ts_guess_;
|
||||
}
|
||||
|
||||
bool HasTTL() const { return !!ttl_guess_; }
|
||||
bool HasTTL() const { return ttl_guess_ != nullptr; }
|
||||
|
||||
bool HasTimestamp() const { return !!ts_guess_; }
|
||||
bool HasTimestamp() const { return ts_guess_ != nullptr; }
|
||||
|
||||
BlobLogHeader& operator=(BlobLogHeader&& in) noexcept;
|
||||
};
|
||||
|
@ -128,11 +130,11 @@ class BlobLogFooter {
|
|||
|
||||
// footer size = 4 byte magic number
|
||||
// 8 bytes count
|
||||
// 4, 4 - ttl range
|
||||
// 8, 8 - ttl range
|
||||
// 8, 8 - sn range
|
||||
// 8, 8 - ts range
|
||||
// = 56
|
||||
static const size_t kFooterSize = 4 + 4 + 8 + (4 * 2) + (8 * 2) + (8 * 2);
|
||||
// = 64
|
||||
static const size_t kFooterSize = 4 + 4 + 8 + (8 * 2) + (8 * 2) + (8 * 2);
|
||||
|
||||
bool HasTTL() const { return !!ttl_range_; }
|
||||
|
||||
|
@ -185,7 +187,7 @@ class BlobLogRecord {
|
|||
uint32_t key_size_;
|
||||
uint64_t blob_size_;
|
||||
uint64_t time_val_;
|
||||
uint32_t ttl_val_;
|
||||
uint64_t ttl_val_;
|
||||
SequenceNumber sn_;
|
||||
uint32_t footer_cksum_;
|
||||
char type_;
|
||||
|
@ -209,11 +211,12 @@ class BlobLogRecord {
|
|||
public:
|
||||
// Header is
|
||||
// Key Length ( 4 bytes ),
|
||||
// Blob Length ( 8 bytes), timestamp/ttl (8 bytes),
|
||||
// Blob Length ( 8 bytes),
|
||||
// ttl (8 bytes), timestamp (8 bytes),
|
||||
// type (1 byte), subtype (1 byte)
|
||||
// header checksum (4 bytes), blob checksum (4 bytes),
|
||||
// = 34
|
||||
static const size_t kHeaderSize = 4 + 4 + 4 + 8 + 4 + 8 + 1 + 1;
|
||||
// = 42
|
||||
static const size_t kHeaderSize = 4 + 4 + 8 + 8 + 4 + 8 + 1 + 1;
|
||||
|
||||
static const size_t kFooterSize = 8 + 4;
|
||||
|
||||
|
@ -234,7 +237,7 @@ class BlobLogRecord {
|
|||
return ttl_val_ != std::numeric_limits<uint32_t>::max();
|
||||
}
|
||||
|
||||
uint32_t GetTTL() const { return ttl_val_; }
|
||||
uint64_t GetTTL() const { return ttl_val_; }
|
||||
|
||||
uint64_t GetTimeVal() const { return time_val_; }
|
||||
|
||||
|
|
|
@ -8,7 +8,6 @@
|
|||
#include "utilities/blob_db/blob_log_writer.h"
|
||||
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include "rocksdb/env.h"
|
||||
#include "util/coding.h"
|
||||
|
@ -72,7 +71,7 @@ Status Writer::AppendFooter(const BlobLogFooter& footer) {
|
|||
|
||||
Status Writer::AddRecord(const Slice& key, const Slice& val,
|
||||
uint64_t* key_offset, uint64_t* blob_offset,
|
||||
uint32_t ttl) {
|
||||
uint64_t ttl) {
|
||||
assert(block_offset_ != 0);
|
||||
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtFooter);
|
||||
|
||||
|
@ -96,26 +95,23 @@ Status Writer::AddRecord(const Slice& key, const Slice& val,
|
|||
}
|
||||
|
||||
void Writer::ConstructBlobHeader(std::string* headerbuf, const Slice& key,
|
||||
const Slice& val, int32_t ttl, int64_t ts) {
|
||||
const Slice& val, uint64_t ttl, int64_t ts) {
|
||||
headerbuf->reserve(BlobLogRecord::kHeaderSize);
|
||||
|
||||
uint32_t key_size = static_cast<uint32_t>(key.size());
|
||||
PutFixed32(headerbuf, key_size);
|
||||
PutFixed64(headerbuf, val.size());
|
||||
|
||||
uint32_t ttl_write = (ttl != -1) ? static_cast<uint32_t>(ttl)
|
||||
: std::numeric_limits<uint32_t>::max();
|
||||
PutFixed32(headerbuf, ttl_write);
|
||||
|
||||
uint64_t ts_write = (ts != -1) ? static_cast<uint64_t>(ts)
|
||||
: std::numeric_limits<uint64_t>::max();
|
||||
PutFixed64(headerbuf, ts_write);
|
||||
PutFixed64(headerbuf, ttl);
|
||||
PutFixed64(headerbuf, ts);
|
||||
|
||||
RecordType t = kFullType;
|
||||
headerbuf->push_back(static_cast<char>(t));
|
||||
|
||||
RecordSubType st = kRegularType;
|
||||
if (ttl != -1) st = kTTLType;
|
||||
if (ttl != kNoExpiration) {
|
||||
st = kTTLType;
|
||||
}
|
||||
headerbuf->push_back(static_cast<char>(st));
|
||||
|
||||
uint32_t header_crc = 0;
|
||||
|
|
|
@ -41,13 +41,13 @@ class Writer {
|
|||
~Writer();
|
||||
|
||||
static void ConstructBlobHeader(std::string* headerbuf, const Slice& key,
|
||||
const Slice& val, int32_t ttl, int64_t ts);
|
||||
const Slice& val, uint64_t ttl, int64_t ts);
|
||||
|
||||
Status AddRecord(const Slice& key, const Slice& val, uint64_t* key_offset,
|
||||
uint64_t* blob_offset);
|
||||
|
||||
Status AddRecord(const Slice& key, const Slice& val, uint64_t* key_offset,
|
||||
uint64_t* blob_offset, uint32_t ttl);
|
||||
uint64_t* blob_offset, uint64_t ttl);
|
||||
|
||||
Status EmitPhysicalRecord(const std::string& headerbuf, const Slice& key,
|
||||
const Slice& val, uint64_t* key_offset,
|
||||
|
|
Loading…
Reference in a new issue