Run clang-format on utilities/ (except utilities/transactions/) (#10853)

Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/10853

Test Plan: `make check`

Reviewed By: siying

Differential Revision: D40651315

Pulled By: ltamasi

fbshipit-source-id: 8b270ff4777a06464be86e376c2a680427866a46
This commit is contained in:
Levi Tamasi 2022-10-24 16:38:09 -07:00 committed by Facebook GitHub Bot
parent 966cd42c7d
commit 4d9cb433fa
62 changed files with 748 additions and 844 deletions

View File

@ -88,9 +88,7 @@ const std::string kSharedChecksumDirSlash = kSharedChecksumDirName + "/";
void BackupStatistics::IncrementNumberSuccessBackup() {
number_success_backup++;
}
void BackupStatistics::IncrementNumberFailBackup() {
number_fail_backup++;
}
void BackupStatistics::IncrementNumberFailBackup() { number_fail_backup++; }
uint32_t BackupStatistics::GetNumberSuccessBackup() const {
return number_success_backup;
@ -399,12 +397,8 @@ class BackupEngineImpl {
timestamp_ = /* something clearly fabricated */ 1;
}
}
int64_t GetTimestamp() const {
return timestamp_;
}
uint64_t GetSize() const {
return size_;
}
int64_t GetTimestamp() const { return timestamp_; }
uint64_t GetSize() const { return size_; }
uint32_t GetNumberFiles() const {
return static_cast<uint32_t>(files_.size());
}
@ -506,12 +500,11 @@ class BackupEngineImpl {
bool include_file_details) const;
inline std::string GetAbsolutePath(
const std::string &relative_path = "") const {
const std::string& relative_path = "") const {
assert(relative_path.size() == 0 || relative_path[0] != '/');
return options_.backup_dir + "/" + relative_path;
}
inline std::string GetPrivateFileRel(BackupID backup_id,
bool tmp = false,
inline std::string GetPrivateFileRel(BackupID backup_id, bool tmp = false,
const std::string& file = "") const {
assert(file.size() == 0 || file[0] != '/');
return kPrivateDirSlash + std::to_string(backup_id) + (tmp ? ".tmp" : "") +
@ -727,12 +720,12 @@ class BackupEngineImpl {
std::string dst_path;
std::string dst_relative;
BackupAfterCopyOrCreateWorkItem()
: shared(false),
needed_to_copy(false),
backup_env(nullptr),
dst_path_tmp(""),
dst_path(""),
dst_relative("") {}
: shared(false),
needed_to_copy(false),
backup_env(nullptr),
dst_path_tmp(""),
dst_path(""),
dst_relative("") {}
BackupAfterCopyOrCreateWorkItem(
BackupAfterCopyOrCreateWorkItem&& o) noexcept {
@ -832,8 +825,8 @@ class BackupEngineImpl {
std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
std::map<BackupID, std::pair<IOStatus, std::unique_ptr<BackupMeta>>>
corrupt_backups_;
std::unordered_map<std::string,
std::shared_ptr<FileInfo>> backuped_file_infos_;
std::unordered_map<std::string, std::shared_ptr<FileInfo>>
backuped_file_infos_;
std::atomic<bool> stop_backup_;
// options data
@ -1044,8 +1037,8 @@ IOStatus BackupEngineImpl::Initialize() {
options_.max_valid_backups_to_open = std::numeric_limits<int32_t>::max();
ROCKS_LOG_WARN(
options_.info_log,
"`max_valid_backups_to_open` is not set to the default value. Ignoring "
"its value since BackupEngine is not read-only.");
"`max_valid_backups_to_open` is not set to the default value. "
"Ignoring its value since BackupEngine is not read-only.");
}
// gather the list of directories that we need to create
@ -1147,8 +1140,7 @@ IOStatus BackupEngineImpl::Initialize() {
// load the backups if any, until valid_backups_to_open of the latest
// non-corrupted backups have been successfully opened.
int valid_backups_to_open = options_.max_valid_backups_to_open;
for (auto backup_iter = backups_.rbegin();
backup_iter != backups_.rend();
for (auto backup_iter = backups_.rbegin(); backup_iter != backups_.rend();
++backup_iter) {
assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
if (latest_backup_id_ == 0) {

View File

@ -23,7 +23,7 @@ struct TEST_BackupMetaSchemaOptions {
// unpublished schema version 2, for the life of this object (not backup_dir).
// TEST_BackupMetaSchemaOptions offers some customization for testing.
void TEST_SetBackupMetaSchemaOptions(
BackupEngine *engine, const TEST_BackupMetaSchemaOptions &options);
BackupEngine* engine, const TEST_BackupMetaSchemaOptions& options);
// Modifies the BackupEngine(Impl) to use specified clocks for backup and
// restore rate limiters created by default if not specified by users for

View File

@ -63,8 +63,11 @@ class DummyDB : public StackableDB {
public:
/* implicit */
DummyDB(const Options& options, const std::string& dbname)
: StackableDB(nullptr), options_(options), dbname_(dbname),
deletions_enabled_(true), sequence_number_(0) {}
: StackableDB(nullptr),
options_(options),
dbname_(dbname),
deletions_enabled_(true),
sequence_number_(0) {}
SequenceNumber GetLatestSequenceNumber() const override {
return ++sequence_number_;
@ -139,7 +142,7 @@ class DummyDB : public StackableDB {
std::string dbname_;
bool deletions_enabled_;
mutable SequenceNumber sequence_number_;
}; // DummyDB
}; // DummyDB
class TestFs : public FileSystemWrapper {
public:
@ -545,7 +548,7 @@ class FileManager : public EnvWrapper {
private:
Random rnd_;
}; // FileManager
}; // FileManager
// utility functions
namespace {
@ -608,8 +611,8 @@ class BackupEngineTest : public testing::Test {
kShareWithChecksum,
};
const std::vector<ShareOption> kAllShareOptions = {
kNoShare, kShareNoChecksum, kShareWithChecksum};
const std::vector<ShareOption> kAllShareOptions = {kNoShare, kShareNoChecksum,
kShareWithChecksum};
BackupEngineTest() {
// set up files
@ -632,7 +635,7 @@ class BackupEngineTest : public testing::Test {
// set up db options
options_.create_if_missing = true;
options_.paranoid_checks = true;
options_.write_buffer_size = 1 << 17; // 128KB
options_.write_buffer_size = 1 << 17; // 128KB
options_.wal_dir = dbname_;
options_.enable_blob_files = true;
@ -3540,107 +3543,106 @@ TEST_F(BackupEngineTest, Concurrency) {
std::array<std::thread, 4> restore_verify_threads;
for (uint32_t i = 0; i < read_threads.size(); ++i) {
uint32_t sleep_micros = rng() % 100000;
read_threads[i] =
std::thread([this, i, sleep_micros, &db_opts, &be_opts,
&restore_verify_threads, &limiter] {
test_db_env_->SleepForMicroseconds(sleep_micros);
read_threads[i] = std::thread([this, i, sleep_micros, &db_opts, &be_opts,
&restore_verify_threads, &limiter] {
test_db_env_->SleepForMicroseconds(sleep_micros);
// Whether to also re-open the BackupEngine, potentially seeing
// additional backups
bool reopen = i == 3;
// Whether we are going to restore "latest"
bool latest = i > 1;
// Whether to also re-open the BackupEngine, potentially seeing
// additional backups
bool reopen = i == 3;
// Whether we are going to restore "latest"
bool latest = i > 1;
BackupEngine* my_be;
if (reopen) {
ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be));
} else {
my_be = backup_engine_.get();
}
BackupEngine* my_be;
if (reopen) {
ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be));
} else {
my_be = backup_engine_.get();
}
// Verify metadata (we don't receive updates from concurrently
// creating a new backup)
std::vector<BackupInfo> infos;
my_be->GetBackupInfo(&infos);
const uint32_t count = static_cast<uint32_t>(infos.size());
infos.clear();
if (reopen) {
ASSERT_GE(count, 2U);
ASSERT_LE(count, 4U);
fprintf(stderr, "Reopen saw %u backups\n", count);
} else {
ASSERT_EQ(count, 2U);
}
std::vector<BackupID> ids;
my_be->GetCorruptedBackups(&ids);
ASSERT_EQ(ids.size(), 0U);
// Verify metadata (we don't receive updates from concurrently
// creating a new backup)
std::vector<BackupInfo> infos;
my_be->GetBackupInfo(&infos);
const uint32_t count = static_cast<uint32_t>(infos.size());
infos.clear();
if (reopen) {
ASSERT_GE(count, 2U);
ASSERT_LE(count, 4U);
fprintf(stderr, "Reopen saw %u backups\n", count);
} else {
ASSERT_EQ(count, 2U);
}
std::vector<BackupID> ids;
my_be->GetCorruptedBackups(&ids);
ASSERT_EQ(ids.size(), 0U);
// (Eventually, see below) Restore one of the backups, or "latest"
std::string restore_db_dir = dbname_ + "/restore" + std::to_string(i);
DestroyDir(test_db_env_.get(), restore_db_dir).PermitUncheckedError();
BackupID to_restore;
if (latest) {
to_restore = count;
} else {
to_restore = i + 1;
}
// (Eventually, see below) Restore one of the backups, or "latest"
std::string restore_db_dir = dbname_ + "/restore" + std::to_string(i);
DestroyDir(test_db_env_.get(), restore_db_dir).PermitUncheckedError();
BackupID to_restore;
if (latest) {
to_restore = count;
} else {
to_restore = i + 1;
}
// Open restored DB to verify its contents, but test atomic restore
// by doing it async and ensuring we either get OK or InvalidArgument
restore_verify_threads[i] =
std::thread([this, &db_opts, restore_db_dir, to_restore] {
DB* restored;
Status s;
for (;;) {
s = DB::Open(db_opts, restore_db_dir, &restored);
if (s.IsInvalidArgument()) {
// Restore hasn't finished
test_db_env_->SleepForMicroseconds(1000);
continue;
} else {
// We should only get InvalidArgument if restore is
// incomplete, or OK if complete
ASSERT_OK(s);
break;
}
}
int factor = std::min(static_cast<int>(to_restore), max_factor);
AssertExists(restored, 0, factor * keys_iteration);
AssertEmpty(restored, factor * keys_iteration,
(factor + 1) * keys_iteration);
delete restored;
});
// Open restored DB to verify its contents, but test atomic restore
// by doing it async and ensuring we either get OK or InvalidArgument
restore_verify_threads[i] =
std::thread([this, &db_opts, restore_db_dir, to_restore] {
DB* restored;
Status s;
for (;;) {
s = DB::Open(db_opts, restore_db_dir, &restored);
if (s.IsInvalidArgument()) {
// Restore hasn't finished
test_db_env_->SleepForMicroseconds(1000);
continue;
} else {
// We should only get InvalidArgument if restore is
// incomplete, or OK if complete
ASSERT_OK(s);
break;
}
}
int factor = std::min(static_cast<int>(to_restore), max_factor);
AssertExists(restored, 0, factor * keys_iteration);
AssertEmpty(restored, factor * keys_iteration,
(factor + 1) * keys_iteration);
delete restored;
});
// (Ok now) Restore one of the backups, or "latest"
if (latest) {
ASSERT_OK(my_be->RestoreDBFromLatestBackup(restore_db_dir,
restore_db_dir));
} else {
ASSERT_OK(my_be->VerifyBackup(to_restore, true));
ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir,
restore_db_dir));
}
// (Ok now) Restore one of the backups, or "latest"
if (latest) {
ASSERT_OK(
my_be->RestoreDBFromLatestBackup(restore_db_dir, restore_db_dir));
} else {
ASSERT_OK(my_be->VerifyBackup(to_restore, true));
ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir,
restore_db_dir));
}
// Test for race condition in reconfiguring limiter
// FIXME: this could set to a different value in all threads, except
// GenericRateLimiter::SetBytesPerSecond has a write-write race
// reported by TSAN
if (i == 0) {
limiter->SetBytesPerSecond(2000000000);
}
// Test for race condition in reconfiguring limiter
// FIXME: this could set to a different value in all threads, except
// GenericRateLimiter::SetBytesPerSecond has a write-write race
// reported by TSAN
if (i == 0) {
limiter->SetBytesPerSecond(2000000000);
}
// Re-verify metadata (we don't receive updates from concurrently
// creating a new backup)
my_be->GetBackupInfo(&infos);
ASSERT_EQ(infos.size(), count);
my_be->GetCorruptedBackups(&ids);
ASSERT_EQ(ids.size(), 0);
// fprintf(stderr, "Finished read thread\n");
// Re-verify metadata (we don't receive updates from concurrently
// creating a new backup)
my_be->GetBackupInfo(&infos);
ASSERT_EQ(infos.size(), count);
my_be->GetCorruptedBackups(&ids);
ASSERT_EQ(ids.size(), 0);
// fprintf(stderr, "Finished read thread\n");
if (reopen) {
delete my_be;
}
});
if (reopen) {
delete my_be;
}
});
}
BackupEngine* alt_be;
@ -4196,7 +4198,7 @@ TEST_F(BackupEngineTest, FileTemperatures) {
}
}
} // anon namespace
} // namespace
} // namespace ROCKSDB_NAMESPACE

View File

@ -155,8 +155,7 @@ class BlobDB : public StackableDB {
using ROCKSDB_NAMESPACE::StackableDB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<Slice>& keys,
const ReadOptions& options, const std::vector<Slice>& keys,
std::vector<std::string>* values) override = 0;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
@ -179,8 +178,8 @@ class BlobDB : public StackableDB {
PinnableSlice* /*values*/, Status* statuses,
const bool /*sorted_input*/ = false) override {
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = Status::NotSupported(
"Blob DB doesn't support batched MultiGet");
statuses[i] =
Status::NotSupported("Blob DB doesn't support batched MultiGet");
}
}

View File

@ -6,6 +6,7 @@
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_db_impl.h"
#include <algorithm>
#include <cinttypes>
#include <iomanip>
@ -1023,9 +1024,8 @@ Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
return PutUntil(options, key, value, kNoExpiration);
}
Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
const Slice& key, const Slice& value,
uint64_t ttl) {
Status BlobDBImpl::PutWithTTL(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
uint64_t now = EpochNow();
uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
return PutUntil(options, key, value, expiration);
@ -1385,9 +1385,9 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
return s;
}
std::vector<Status> BlobDBImpl::MultiGet(
const ReadOptions& read_options,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
std::vector<Status> BlobDBImpl::MultiGet(const ReadOptions& read_options,
const std::vector<Slice>& keys,
std::vector<std::string>* values) {
StopWatch multiget_sw(clock_, statistics_, BLOB_DB_MULTIGET_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
// Get a snapshot to avoid blob file get deleted between we

View File

@ -124,8 +124,7 @@ class BlobDBImpl : public BlobDB {
using BlobDB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& read_options,
const std::vector<Slice>& keys,
const ReadOptions& read_options, const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
using BlobDB::Write;

View File

@ -58,8 +58,7 @@ class BlobDBTest : public testing::Test {
};
BlobDBTest()
: dbname_(test::PerThreadDBPath("blob_db_test")),
blob_db_(nullptr) {
: dbname_(test::PerThreadDBPath("blob_db_test")), blob_db_(nullptr) {
mock_clock_ = std::make_shared<MockSystemClock>(SystemClock::Default());
mock_env_.reset(new CompositeEnvWrapper(Env::Default(), mock_clock_));
fault_injection_env_.reset(new FaultInjectionTestEnv(Env::Default()));
@ -209,7 +208,7 @@ class BlobDBTest : public testing::Test {
void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
// Verify normal Get
auto* cfh = db->DefaultColumnFamily();
auto *cfh = db->DefaultColumnFamily();
for (auto &p : data) {
PinnableSlice value_slice;
ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
@ -2391,7 +2390,7 @@ TEST_F(BlobDBTest, SyncBlobFileBeforeCloseIOError) {
} // namespace ROCKSDB_NAMESPACE
// A black-box test for the ttl wrapper around rocksdb
int main(int argc, char** argv) {
int main(int argc, char **argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();

View File

@ -226,7 +226,9 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
DumpSlice(Slice(slice.data(), static_cast<size_t>(key_size)), show_key);
if (show_blob != DisplayType::kNone) {
fprintf(stdout, " blob : ");
DumpSlice(Slice(slice.data() + static_cast<size_t>(key_size), static_cast<size_t>(value_size)), show_blob);
DumpSlice(Slice(slice.data() + static_cast<size_t>(key_size),
static_cast<size_t>(value_size)),
show_blob);
}
if (show_uncompressed_blob != DisplayType::kNone) {
fprintf(stdout, " raw blob : ");

View File

@ -7,9 +7,9 @@
#include "utilities/blob_db/blob_file.h"
#include <stdio.h>
#include <cinttypes>
#include <algorithm>
#include <cinttypes>
#include <memory>
#include "db/column_family.h"
@ -210,16 +210,14 @@ Status BlobFile::ReadMetadata(const std::shared_ptr<FileSystem>& fs,
file_size_ = file_size;
} else {
ROCKS_LOG_ERROR(info_log_,
"Failed to get size of blob file %" PRIu64
", status: %s",
"Failed to get size of blob file %" PRIu64 ", status: %s",
file_number_, s.ToString().c_str());
return s;
}
if (file_size < BlobLogHeader::kSize) {
ROCKS_LOG_ERROR(info_log_,
"Incomplete blob file blob file %" PRIu64
", size: %" PRIu64,
file_number_, file_size);
ROCKS_LOG_ERROR(
info_log_, "Incomplete blob file blob file %" PRIu64 ", size: %" PRIu64,
file_number_, file_size);
return Status::Corruption("Incomplete blob file header.");
}
@ -250,10 +248,9 @@ Status BlobFile::ReadMetadata(const std::shared_ptr<FileSystem>& fs,
Env::IO_TOTAL /* rate_limiter_priority */);
}
if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_,
"Failed to read header of blob file %" PRIu64
", status: %s",
file_number_, s.ToString().c_str());
ROCKS_LOG_ERROR(
info_log_, "Failed to read header of blob file %" PRIu64 ", status: %s",
file_number_, s.ToString().c_str());
return s;
}
BlobLogHeader header;
@ -294,10 +291,9 @@ Status BlobFile::ReadMetadata(const std::shared_ptr<FileSystem>& fs,
nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
}
if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_,
"Failed to read footer of blob file %" PRIu64
", status: %s",
file_number_, s.ToString().c_str());
ROCKS_LOG_ERROR(
info_log_, "Failed to read footer of blob file %" PRIu64 ", status: %s",
file_number_, s.ToString().c_str());
return s;
}
BlobLogFooter footer;

View File

@ -7,8 +7,6 @@
#include "table/block_based/block_based_table_reader.h"
#ifndef ROCKSDB_LITE
#include "utilities/cache_dump_load_impl.h"
#include "cache/cache_entry_roles.h"
#include "file/writable_file_writer.h"
#include "port/lang.h"
@ -17,6 +15,7 @@
#include "rocksdb/utilities/ldb_cmd.h"
#include "table/format.h"
#include "util/crc32c.h"
#include "utilities/cache_dump_load_impl.h"
namespace ROCKSDB_NAMESPACE {

View File

@ -40,8 +40,8 @@ CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const {
bool value_changed = false;
RowValue row_value = RowValue::Deserialize(
existing_value.data(), existing_value.size());
RowValue row_value =
RowValue::Deserialize(existing_value.data(), existing_value.size());
RowValue compacted =
options_.purge_ttl_on_expiration
? row_value.RemoveExpiredColumns(&value_changed)
@ -51,7 +51,7 @@ CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
compacted = compacted.RemoveTombstones(options_.gc_grace_period_in_seconds);
}
if(compacted.Empty()) {
if (compacted.Empty()) {
return Decision::kRemove;
}

View File

@ -25,18 +25,18 @@ namespace cassandra {
* promoted to kValue type after serials of merging in compaction.
*/
class CassandraCompactionFilter : public CompactionFilter {
public:
explicit CassandraCompactionFilter(bool purge_ttl_on_expiration,
int32_t gc_grace_period_in_seconds);
static const char* kClassName() { return "CassandraCompactionFilter"; }
const char* Name() const override { return kClassName(); }
public:
explicit CassandraCompactionFilter(bool purge_ttl_on_expiration,
int32_t gc_grace_period_in_seconds);
static const char* kClassName() { return "CassandraCompactionFilter"; }
const char* Name() const override { return kClassName(); }
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const override;
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const override;
private:
CassandraOptions options_;
private:
CassandraOptions options_;
};
class CassandraCompactionFilterFactory : public CompactionFilterFactory {

View File

@ -5,12 +5,12 @@
#include <cstring>
#include <memory>
#include "test_util/testharness.h"
#include "utilities/cassandra/format.h"
#include "utilities/cassandra/serialize.h"
#include "utilities/cassandra/test_utils.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
@ -51,8 +51,8 @@ TEST(ColumnTest, Column) {
c1->Serialize(&dest);
EXPECT_EQ(dest.size(), 2 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0);
EXPECT_TRUE(std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) ==
0);
// Verify the ColumnBase::Deserialization.
saved_dest = dest;
@ -60,9 +60,8 @@ TEST(ColumnTest, Column) {
ColumnBase::Deserialize(saved_dest.c_str(), c.Size());
c2->Serialize(&dest);
EXPECT_EQ(dest.size(), 3 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size())
== 0);
EXPECT_TRUE(std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2,
c.Size()) == 0);
}
TEST(ExpiringColumnTest, ExpiringColumn) {
@ -71,8 +70,8 @@ TEST(ExpiringColumnTest, ExpiringColumn) {
int8_t index = 3;
int64_t timestamp = 1494022807044;
int32_t ttl = 3600;
ExpiringColumn c = ExpiringColumn(mask, index, timestamp,
sizeof(data), data, ttl);
ExpiringColumn c =
ExpiringColumn(mask, index, timestamp, sizeof(data), data, ttl);
EXPECT_EQ(c.Index(), index);
EXPECT_EQ(c.Timestamp(), timestamp);
@ -107,8 +106,8 @@ TEST(ExpiringColumnTest, ExpiringColumn) {
c1->Serialize(&dest);
EXPECT_EQ(dest.size(), 2 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0);
EXPECT_TRUE(std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) ==
0);
// Verify the ColumnBase::Deserialization.
saved_dest = dest;
@ -116,23 +115,24 @@ TEST(ExpiringColumnTest, ExpiringColumn) {
ColumnBase::Deserialize(saved_dest.c_str(), c.Size());
c2->Serialize(&dest);
EXPECT_EQ(dest.size(), 3 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size())
== 0);
EXPECT_TRUE(std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2,
c.Size()) == 0);
}
TEST(TombstoneTest, TombstoneCollectable) {
int32_t now = (int32_t)time(nullptr);
int32_t gc_grace_seconds = 16440;
int32_t time_delta_seconds = 10;
EXPECT_TRUE(Tombstone(ColumnTypeMask::DELETION_MASK, 0,
now - gc_grace_seconds - time_delta_seconds,
ToMicroSeconds(now - gc_grace_seconds - time_delta_seconds))
.Collectable(gc_grace_seconds));
EXPECT_FALSE(Tombstone(ColumnTypeMask::DELETION_MASK, 0,
now - gc_grace_seconds + time_delta_seconds,
ToMicroSeconds(now - gc_grace_seconds + time_delta_seconds))
.Collectable(gc_grace_seconds));
EXPECT_TRUE(
Tombstone(ColumnTypeMask::DELETION_MASK, 0,
now - gc_grace_seconds - time_delta_seconds,
ToMicroSeconds(now - gc_grace_seconds - time_delta_seconds))
.Collectable(gc_grace_seconds));
EXPECT_FALSE(
Tombstone(ColumnTypeMask::DELETION_MASK, 0,
now - gc_grace_seconds + time_delta_seconds,
ToMicroSeconds(now - gc_grace_seconds + time_delta_seconds))
.Collectable(gc_grace_seconds));
}
TEST(TombstoneTest, Tombstone) {
@ -140,8 +140,8 @@ TEST(TombstoneTest, Tombstone) {
int8_t index = 2;
int32_t local_deletion_time = 1494022807;
int64_t marked_for_delete_at = 1494022807044;
Tombstone c = Tombstone(mask, index, local_deletion_time,
marked_for_delete_at);
Tombstone c =
Tombstone(mask, index, local_deletion_time, marked_for_delete_at);
EXPECT_EQ(c.Index(), index);
EXPECT_EQ(c.Timestamp(), marked_for_delete_at);
@ -170,17 +170,16 @@ TEST(TombstoneTest, Tombstone) {
c1->Serialize(&dest);
EXPECT_EQ(dest.size(), 2 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0);
EXPECT_TRUE(std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) ==
0);
// Verify the ColumnBase::Deserialization.
std::shared_ptr<ColumnBase> c2 =
ColumnBase::Deserialize(dest.c_str(), c.Size());
ColumnBase::Deserialize(dest.c_str(), c.Size());
c2->Serialize(&dest);
EXPECT_EQ(dest.size(), 3 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size())
== 0);
EXPECT_TRUE(std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2,
c.Size()) == 0);
}
class RowValueTest : public testing::Test {};
@ -213,8 +212,8 @@ TEST(RowValueTest, RowTombstone) {
r1.Serialize(&dest);
EXPECT_EQ(dest.size(), 2 * r.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0);
EXPECT_TRUE(std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) ==
0);
}
TEST(RowValueTest, RowWithColumns) {
@ -227,23 +226,23 @@ TEST(RowValueTest, RowWithColumns) {
int64_t e_timestamp = 1494022807044;
int32_t e_ttl = 3600;
columns.push_back(std::shared_ptr<ExpiringColumn>(
new ExpiringColumn(ColumnTypeMask::EXPIRATION_MASK, e_index,
e_timestamp, sizeof(e_data), e_data, e_ttl)));
new ExpiringColumn(ColumnTypeMask::EXPIRATION_MASK, e_index, e_timestamp,
sizeof(e_data), e_data, e_ttl)));
columns_data_size += columns[0]->Size();
char c_data[4] = {'d', 'a', 't', 'a'};
int8_t c_index = 1;
int64_t c_timestamp = 1494022807048;
columns.push_back(std::shared_ptr<Column>(
new Column(0, c_index, c_timestamp, sizeof(c_data), c_data)));
new Column(0, c_index, c_timestamp, sizeof(c_data), c_data)));
columns_data_size += columns[1]->Size();
int8_t t_index = 2;
int32_t t_local_deletion_time = 1494022801;
int64_t t_marked_for_delete_at = 1494022807043;
columns.push_back(std::shared_ptr<Tombstone>(
new Tombstone(ColumnTypeMask::DELETION_MASK,
t_index, t_local_deletion_time, t_marked_for_delete_at)));
new Tombstone(ColumnTypeMask::DELETION_MASK, t_index,
t_local_deletion_time, t_marked_for_delete_at)));
columns_data_size += columns[2]->Size();
RowValue r = RowValue(std::move(columns), last_modified_time);
@ -260,15 +259,15 @@ TEST(RowValueTest, RowWithColumns) {
EXPECT_EQ(dest.size(), r.Size());
std::size_t offset = 0;
EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset),
std::numeric_limits<int32_t>::max());
std::numeric_limits<int32_t>::max());
offset += sizeof(int32_t);
EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset),
std::numeric_limits<int64_t>::min());
std::numeric_limits<int64_t>::min());
offset += sizeof(int64_t);
// Column0: ExpiringColumn
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset),
ColumnTypeMask::EXPIRATION_MASK);
ColumnTypeMask::EXPIRATION_MASK);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), e_index);
offset += sizeof(int8_t);
@ -295,7 +294,7 @@ TEST(RowValueTest, RowWithColumns) {
// Column2: Tombstone
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset),
ColumnTypeMask::DELETION_MASK);
ColumnTypeMask::DELETION_MASK);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), t_index);
offset += sizeof(int8_t);
@ -311,19 +310,20 @@ TEST(RowValueTest, RowWithColumns) {
r1.Serialize(&dest);
EXPECT_EQ(dest.size(), 2 * r.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0);
EXPECT_TRUE(std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) ==
0);
}
TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired) {
int64_t now = time(nullptr);
auto row_value = CreateTestRowValue({
CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)),
CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired
CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired
CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
});
auto row_value = CreateTestRowValue(
{CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)),
CreateTestColumnSpec(kExpiringColumn, 1,
ToMicroSeconds(now - kTtl - 10)), // expired
CreateTestColumnSpec(kExpiringColumn, 2,
ToMicroSeconds(now)), // not expired
CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))});
bool changed = false;
auto purged = row_value.RemoveExpiredColumns(&changed);
@ -343,12 +343,13 @@ TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired) {
TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) {
int64_t now = time(nullptr);
auto row_value = CreateTestRowValue({
CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)),
CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired
CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired
CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
});
auto row_value = CreateTestRowValue(
{CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)),
CreateTestColumnSpec(kExpiringColumn, 1,
ToMicroSeconds(now - kTtl - 10)), // expired
CreateTestColumnSpec(kExpiringColumn, 2,
ToMicroSeconds(now)), // not expired
CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))});
bool changed = false;
auto compacted = row_value.ConvertExpiredColumnsToTombstones(&changed);
@ -366,7 +367,7 @@ TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) {
compacted.ConvertExpiredColumnsToTombstones(&changed);
EXPECT_FALSE(changed);
}
} // namespace cassandra
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -18,7 +18,6 @@
#include "utilities/cassandra/test_utils.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
@ -32,7 +31,7 @@ class CassandraStore {
assert(db);
}
bool Append(const std::string& key, const RowValue& val){
bool Append(const std::string& key, const RowValue& val) {
std::string result;
val.Serialize(&result);
Slice valSlice(result.data(), result.size());
@ -72,14 +71,13 @@ class CassandraStore {
db_->DefaultColumnFamily());
}
std::tuple<bool, RowValue> Get(const std::string& key){
std::tuple<bool, RowValue> Get(const std::string& key) {
std::string result;
auto s = db_->Get(get_option_, key, &result);
if (s.ok()) {
return std::make_tuple(true,
RowValue::Deserialize(result.data(),
result.size()));
return std::make_tuple(
true, RowValue::Deserialize(result.data(), result.size()));
}
if (!s.IsNotFound()) {
@ -98,29 +96,28 @@ class CassandraStore {
};
class TestCompactionFilterFactory : public CompactionFilterFactory {
public:
explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
int32_t gc_grace_period_in_seconds)
: purge_ttl_on_expiration_(purge_ttl_on_expiration),
gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
public:
explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
int32_t gc_grace_period_in_seconds)
: purge_ttl_on_expiration_(purge_ttl_on_expiration),
gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override {
return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override {
return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
}
const char* Name() const override { return "TestCompactionFilterFactory"; }
const char* Name() const override { return "TestCompactionFilterFactory"; }
private:
private:
bool purge_ttl_on_expiration_;
int32_t gc_grace_period_in_seconds_;
};
// The class for unit-testing
class CassandraFunctionalTest : public testing::Test {
public:
public:
CassandraFunctionalTest() {
EXPECT_OK(
DestroyDB(kDbName, Options())); // Start each test with a fresh DB
@ -130,7 +127,8 @@ public:
DB* db;
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
options.merge_operator.reset(
new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
auto* cf_factory = new TestCompactionFilterFactory(
purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
options.compaction_filter_factory.reset(cf_factory);
@ -148,23 +146,29 @@ TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
CassandraStore store(OpenDb());
int64_t now = time(nullptr);
store.Append("k1", CreateTestRowValue({
CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
}));
store.Append("k1",CreateTestRowValue({
CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
}));
store.Append("k1", CreateTestRowValue({
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
}));
store.Append(
"k1",
CreateTestRowValue({
CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
}));
store.Append(
"k1",
CreateTestRowValue({
CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
}));
store.Append(
"k1",
CreateTestRowValue({
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
}));
auto ret = store.Get("k1");
@ -188,7 +192,7 @@ constexpr int64_t kTestTimeoutSecs = 600;
TEST_F(CassandraFunctionalTest,
CompactionShouldConvertExpiredColumnsToTombstone) {
CassandraStore store(OpenDb());
int64_t now= time(nullptr);
int64_t now = time(nullptr);
store.Append(
"k1",
@ -202,10 +206,12 @@ TEST_F(CassandraFunctionalTest,
ASSERT_OK(store.Flush());
store.Append("k1",CreateTestRowValue({
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
}));
store.Append(
"k1",
CreateTestRowValue(
{CreateTestColumnSpec(kExpiringColumn, 0,
ToMicroSeconds(now - kTtl - 10)), // expired
CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))}));
ASSERT_OK(store.Flush());
ASSERT_OK(store.Compact());
@ -224,25 +230,29 @@ TEST_F(CassandraFunctionalTest,
ToMicroSeconds(now));
}
TEST_F(CassandraFunctionalTest,
CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
purge_ttl_on_expiration_ = true;
CassandraStore store(OpenDb());
int64_t now = time(nullptr);
store.Append("k1", CreateTestRowValue({
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired
CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
}));
store.Append(
"k1",
CreateTestRowValue(
{CreateTestColumnSpec(kExpiringColumn, 0,
ToMicroSeconds(now - kTtl - 20)), // expired
CreateTestColumnSpec(kExpiringColumn, 1,
ToMicroSeconds(now)), // not expired
CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))}));
ASSERT_OK(store.Flush());
store.Append("k1",CreateTestRowValue({
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
}));
store.Append(
"k1",
CreateTestRowValue(
{CreateTestColumnSpec(kExpiringColumn, 0,
ToMicroSeconds(now - kTtl - 10)), // expired
CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))}));
ASSERT_OK(store.Flush());
ASSERT_OK(store.Compact());
@ -266,15 +276,18 @@ TEST_F(CassandraFunctionalTest,
int64_t now = time(nullptr);
store.Append("k1", CreateTestRowValue({
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)),
CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)),
}));
CreateTestColumnSpec(kExpiringColumn, 0,
ToMicroSeconds(now - kTtl - 20)),
CreateTestColumnSpec(kExpiringColumn, 1,
ToMicroSeconds(now - kTtl - 20)),
}));
ASSERT_OK(store.Flush());
store.Append("k1",CreateTestRowValue({
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)),
}));
store.Append("k1", CreateTestRowValue({
CreateTestColumnSpec(kExpiringColumn, 0,
ToMicroSeconds(now - kTtl - 10)),
}));
ASSERT_OK(store.Flush());
ASSERT_OK(store.Compact());
@ -287,20 +300,21 @@ TEST_F(CassandraFunctionalTest,
CassandraStore store(OpenDb());
int64_t now = time(nullptr);
store.Append("k1", CreateTestRowValue({
CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))
}));
store.Append("k1",
CreateTestRowValue(
{CreateTestColumnSpec(
kTombstone, 0,
ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))}));
store.Append("k2", CreateTestRowValue({
CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now))
}));
store.Append("k2", CreateTestRowValue({CreateTestColumnSpec(
kColumn, 0, ToMicroSeconds(now))}));
ASSERT_OK(store.Flush());
store.Append("k1",CreateTestRowValue({
CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
}));
store.Append("k1", CreateTestRowValue({
CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
}));
ASSERT_OK(store.Flush());
ASSERT_OK(store.Compact());
@ -317,9 +331,12 @@ TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
CassandraStore store(OpenDb());
int64_t now = time(nullptr);
store.Put("k1", CreateTestRowValue({
CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
}));
store.Put("k1",
CreateTestRowValue({
CreateTestColumnSpec(
kTombstone, 0,
ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
}));
ASSERT_OK(store.Flush());
ASSERT_OK(store.Compact());
@ -419,7 +436,7 @@ TEST_F(CassandraFunctionalTest, LoadCompactionFilterFactory) {
}
#endif // ROCKSDB_LITE
} // namespace cassandra
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -4,6 +4,7 @@
// (found in the LICENSE.Apache file in the root directory).
#include <memory>
#include "test_util/testharness.h"
#include "utilities/cassandra/format.h"
#include "utilities/cassandra/test_utils.h"
@ -15,31 +16,25 @@ class RowValueMergeTest : public testing::Test {};
TEST(RowValueMergeTest, Merge) {
std::vector<RowValue> row_values;
row_values.push_back(
CreateTestRowValue({
row_values.push_back(CreateTestRowValue({
CreateTestColumnSpec(kTombstone, 0, 5),
CreateTestColumnSpec(kColumn, 1, 8),
CreateTestColumnSpec(kExpiringColumn, 2, 5),
})
);
}));
row_values.push_back(
CreateTestRowValue({
row_values.push_back(CreateTestRowValue({
CreateTestColumnSpec(kColumn, 0, 2),
CreateTestColumnSpec(kExpiringColumn, 1, 5),
CreateTestColumnSpec(kTombstone, 2, 7),
CreateTestColumnSpec(kExpiringColumn, 7, 17),
})
);
}));
row_values.push_back(
CreateTestRowValue({
row_values.push_back(CreateTestRowValue({
CreateTestColumnSpec(kExpiringColumn, 0, 6),
CreateTestColumnSpec(kTombstone, 1, 5),
CreateTestColumnSpec(kColumn, 2, 4),
CreateTestColumnSpec(kTombstone, 11, 11),
})
);
}));
RowValue merged = RowValue::Merge(std::move(row_values));
EXPECT_FALSE(merged.IsTombstone());
@ -55,33 +50,25 @@ TEST(RowValueMergeTest, MergeWithRowTombstone) {
std::vector<RowValue> row_values;
// A row tombstone.
row_values.push_back(
CreateRowTombstone(11)
);
row_values.push_back(CreateRowTombstone(11));
// This row's timestamp is smaller than tombstone.
row_values.push_back(
CreateTestRowValue({
row_values.push_back(CreateTestRowValue({
CreateTestColumnSpec(kColumn, 0, 5),
CreateTestColumnSpec(kColumn, 1, 6),
})
);
}));
// Some of the column's row is smaller, some is larger.
row_values.push_back(
CreateTestRowValue({
row_values.push_back(CreateTestRowValue({
CreateTestColumnSpec(kColumn, 2, 10),
CreateTestColumnSpec(kColumn, 3, 12),
})
);
}));
// All of the column's rows are larger than tombstone.
row_values.push_back(
CreateTestRowValue({
row_values.push_back(CreateTestRowValue({
CreateTestColumnSpec(kColumn, 4, 13),
CreateTestColumnSpec(kColumn, 5, 14),
})
);
}));
RowValue merged = RowValue::Merge(std::move(row_values));
EXPECT_FALSE(merged.IsTombstone());
@ -92,20 +79,16 @@ TEST(RowValueMergeTest, MergeWithRowTombstone) {
// If the tombstone's timestamp is the latest, then it returns a
// row tombstone.
row_values.push_back(
CreateRowTombstone(15)
);
row_values.push_back(CreateRowTombstone(15));
row_values.push_back(
CreateRowTombstone(17)
);
row_values.push_back(CreateRowTombstone(17));
merged = RowValue::Merge(std::move(row_values));
EXPECT_TRUE(merged.IsTombstone());
EXPECT_EQ(merged.LastModifiedTime(), 17);
}
} // namespace cassandra
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -6,46 +6,39 @@
#include "test_util/testharness.h"
#include "utilities/cassandra/serialize.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
TEST(SerializeTest, SerializeI64) {
std::string dest;
Serialize<int64_t>(0, &dest);
EXPECT_EQ(
std::string(
{'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00'}),
dest);
EXPECT_EQ(std::string({'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00',
'\x00'}),
dest);
dest.clear();
Serialize<int64_t>(1, &dest);
EXPECT_EQ(
std::string(
{'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x01'}),
dest);
EXPECT_EQ(std::string({'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00',
'\x01'}),
dest);
dest.clear();
Serialize<int64_t>(-1, &dest);
EXPECT_EQ(
std::string(
{'\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff'}),
dest);
EXPECT_EQ(std::string({'\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff',
'\xff'}),
dest);
dest.clear();
Serialize<int64_t>(9223372036854775807, &dest);
EXPECT_EQ(
std::string(
{'\x7f', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff'}),
dest);
EXPECT_EQ(std::string({'\x7f', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff',
'\xff'}),
dest);
dest.clear();
Serialize<int64_t>(-9223372036854775807, &dest);
EXPECT_EQ(
std::string(
{'\x80', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x01'}),
dest);
EXPECT_EQ(std::string({'\x80', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00',
'\x01'}),
dest);
}
TEST(SerializeTest, DeserializeI64) {
@ -74,39 +67,23 @@ TEST(SerializeTest, DeserializeI64) {
TEST(SerializeTest, SerializeI32) {
std::string dest;
Serialize<int32_t>(0, &dest);
EXPECT_EQ(
std::string(
{'\x00', '\x00', '\x00', '\x00'}),
dest);
EXPECT_EQ(std::string({'\x00', '\x00', '\x00', '\x00'}), dest);
dest.clear();
Serialize<int32_t>(1, &dest);
EXPECT_EQ(
std::string(
{'\x00', '\x00', '\x00', '\x01'}),
dest);
EXPECT_EQ(std::string({'\x00', '\x00', '\x00', '\x01'}), dest);
dest.clear();
Serialize<int32_t>(-1, &dest);
EXPECT_EQ(
std::string(
{'\xff', '\xff', '\xff', '\xff'}),
dest);
EXPECT_EQ(std::string({'\xff', '\xff', '\xff', '\xff'}), dest);
dest.clear();
Serialize<int32_t>(2147483647, &dest);
EXPECT_EQ(
std::string(
{'\x7f', '\xff', '\xff', '\xff'}),
dest);
EXPECT_EQ(std::string({'\x7f', '\xff', '\xff', '\xff'}), dest);
dest.clear();
Serialize<int32_t>(-2147483648LL, &dest);
EXPECT_EQ(
std::string(
{'\x80', '\x00', '\x00', '\x00'}),
dest);
EXPECT_EQ(std::string({'\x80', '\x00', '\x00', '\x00'}), dest);
}
TEST(SerializeTest, DeserializeI32) {
@ -141,7 +118,6 @@ TEST(SerializeTest, SerializeI8) {
Serialize<int8_t>(1, &dest);
EXPECT_EQ(std::string({'\x01'}), dest);
dest.clear();
Serialize<int8_t>(-1, &dest);
EXPECT_EQ(std::string({'\xff'}), dest);
@ -178,7 +154,7 @@ TEST(SerializeTest, DeserializeI8) {
EXPECT_EQ(-128, Deserialize<int8_t>(dest.c_str(), offset));
}
} // namespace cassandra
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -14,26 +14,18 @@
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
namespace {
const int32_t kDefaultLocalDeletionTime =
std::numeric_limits<int32_t>::max();
const int64_t kDefaultMarkedForDeleteAt =
std::numeric_limits<int64_t>::min();
}
const int32_t kDefaultLocalDeletionTime = std::numeric_limits<int32_t>::max();
const int64_t kDefaultMarkedForDeleteAt = std::numeric_limits<int64_t>::min();
} // namespace
ColumnBase::ColumnBase(int8_t mask, int8_t index)
: mask_(mask), index_(index) {}
: mask_(mask), index_(index) {}
std::size_t ColumnBase::Size() const {
return sizeof(mask_) + sizeof(index_);
}
std::size_t ColumnBase::Size() const { return sizeof(mask_) + sizeof(index_); }
int8_t ColumnBase::Mask() const {
return mask_;
}
int8_t ColumnBase::Mask() const { return mask_; }
int8_t ColumnBase::Index() const {
return index_;
}
int8_t ColumnBase::Index() const { return index_; }
void ColumnBase::Serialize(std::string* dest) const {
ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(mask_, dest);
@ -52,22 +44,18 @@ std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
}
}
Column::Column(
int8_t mask,
int8_t index,
int64_t timestamp,
int32_t value_size,
const char* value
) : ColumnBase(mask, index), timestamp_(timestamp),
value_size_(value_size), value_(value) {}
Column::Column(int8_t mask, int8_t index, int64_t timestamp, int32_t value_size,
const char* value)
: ColumnBase(mask, index),
timestamp_(timestamp),
value_size_(value_size),
value_(value) {}
int64_t Column::Timestamp() const {
return timestamp_;
}
int64_t Column::Timestamp() const { return timestamp_; }
std::size_t Column::Size() const {
return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_)
+ value_size_;
return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_) +
value_size_;
}
void Column::Serialize(std::string* dest) const {
@ -77,7 +65,7 @@ void Column::Serialize(std::string* dest) const {
dest->append(value_, value_size_);
}
std::shared_ptr<Column> Column::Deserialize(const char *src,
std::shared_ptr<Column> Column::Deserialize(const char* src,
std::size_t offset) {
int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
@ -89,19 +77,14 @@ std::shared_ptr<Column> Column::Deserialize(const char *src,
int32_t value_size =
ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(value_size);
return std::make_shared<Column>(
mask, index, timestamp, value_size, src + offset);
return std::make_shared<Column>(mask, index, timestamp, value_size,
src + offset);
}
ExpiringColumn::ExpiringColumn(
int8_t mask,
int8_t index,
int64_t timestamp,
int32_t value_size,
const char* value,
int32_t ttl
) : Column(mask, index, timestamp, value_size, value),
ttl_(ttl) {}
ExpiringColumn::ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp,
int32_t value_size, const char* value,
int32_t ttl)
: Column(mask, index, timestamp, value_size, value), ttl_(ttl) {}
std::size_t ExpiringColumn::Size() const {
return Column::Size() + sizeof(ttl_);
@ -112,8 +95,10 @@ void ExpiringColumn::Serialize(std::string* dest) const {
ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(ttl_, dest);
}
std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint() const {
return std::chrono::time_point<std::chrono::system_clock>(std::chrono::microseconds(Timestamp()));
std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint()
const {
return std::chrono::time_point<std::chrono::system_clock>(
std::chrono::microseconds(Timestamp()));
}
std::chrono::seconds ExpiringColumn::Ttl() const {
@ -127,19 +112,16 @@ bool ExpiringColumn::Expired() const {
std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const {
auto expired_at = (TimePoint() + Ttl()).time_since_epoch();
int32_t local_deletion_time = static_cast<int32_t>(
std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
int64_t marked_for_delete_at =
std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
return std::make_shared<Tombstone>(
static_cast<int8_t>(ColumnTypeMask::DELETION_MASK),
Index(),
local_deletion_time,
marked_for_delete_at);
static_cast<int8_t>(ColumnTypeMask::DELETION_MASK), Index(),
local_deletion_time, marked_for_delete_at);
}
std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
const char *src,
std::size_t offset) {
const char* src, std::size_t offset) {
int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
@ -153,25 +135,21 @@ std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
const char* value = src + offset;
offset += value_size;
int32_t ttl = ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
return std::make_shared<ExpiringColumn>(
mask, index, timestamp, value_size, value, ttl);
return std::make_shared<ExpiringColumn>(mask, index, timestamp, value_size,
value, ttl);
}
Tombstone::Tombstone(
int8_t mask,
int8_t index,
int32_t local_deletion_time,
int64_t marked_for_delete_at
) : ColumnBase(mask, index), local_deletion_time_(local_deletion_time),
marked_for_delete_at_(marked_for_delete_at) {}
Tombstone::Tombstone(int8_t mask, int8_t index, int32_t local_deletion_time,
int64_t marked_for_delete_at)
: ColumnBase(mask, index),
local_deletion_time_(local_deletion_time),
marked_for_delete_at_(marked_for_delete_at) {}
int64_t Tombstone::Timestamp() const {
return marked_for_delete_at_;
}
int64_t Tombstone::Timestamp() const { return marked_for_delete_at_; }
std::size_t Tombstone::Size() const {
return ColumnBase::Size() + sizeof(local_deletion_time_)
+ sizeof(marked_for_delete_at_);
return ColumnBase::Size() + sizeof(local_deletion_time_) +
sizeof(marked_for_delete_at_);
}
void Tombstone::Serialize(std::string* dest) const {
@ -187,7 +165,7 @@ bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const {
return local_deleted_at + gc_grace_period < std::chrono::system_clock::now();
}
std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src,
std::shared_ptr<Tombstone> Tombstone::Deserialize(const char* src,
std::size_t offset) {
int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
@ -198,26 +176,27 @@ std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src,
offset += sizeof(int32_t);
int64_t marked_for_delete_at =
ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
return std::make_shared<Tombstone>(
mask, index, local_deletion_time, marked_for_delete_at);
return std::make_shared<Tombstone>(mask, index, local_deletion_time,
marked_for_delete_at);
}
RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
: local_deletion_time_(local_deletion_time),
marked_for_delete_at_(marked_for_delete_at), columns_(),
last_modified_time_(0) {}
: local_deletion_time_(local_deletion_time),
marked_for_delete_at_(marked_for_delete_at),
columns_(),
last_modified_time_(0) {}
RowValue::RowValue(Columns columns,
int64_t last_modified_time)
: local_deletion_time_(kDefaultLocalDeletionTime),
marked_for_delete_at_(kDefaultMarkedForDeleteAt),
columns_(std::move(columns)), last_modified_time_(last_modified_time) {}
RowValue::RowValue(Columns columns, int64_t last_modified_time)
: local_deletion_time_(kDefaultLocalDeletionTime),
marked_for_delete_at_(kDefaultMarkedForDeleteAt),
columns_(std::move(columns)),
last_modified_time_(last_modified_time) {}
std::size_t RowValue::Size() const {
std::size_t size = sizeof(local_deletion_time_)
+ sizeof(marked_for_delete_at_);
std::size_t size =
sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_);
for (const auto& column : columns_) {
size += column -> Size();
size += column->Size();
}
return size;
}
@ -238,7 +217,7 @@ void RowValue::Serialize(std::string* dest) const {
ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
for (const auto& column : columns_) {
column -> Serialize(dest);
column->Serialize(dest);
}
}
@ -246,11 +225,11 @@ RowValue RowValue::RemoveExpiredColumns(bool* changed) const {
*changed = false;
Columns new_columns;
for (auto& column : columns_) {
if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
if (column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
std::shared_ptr<ExpiringColumn> expiring_column =
std::static_pointer_cast<ExpiringColumn>(column);
std::static_pointer_cast<ExpiringColumn>(column);
if(expiring_column->Expired()){
if (expiring_column->Expired()) {
*changed = true;
continue;
}
@ -265,11 +244,11 @@ RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const {
*changed = false;
Columns new_columns;
for (auto& column : columns_) {
if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
if (column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
std::shared_ptr<ExpiringColumn> expiring_column =
std::static_pointer_cast<ExpiringColumn>(column);
std::static_pointer_cast<ExpiringColumn>(column);
if(expiring_column->Expired()) {
if (expiring_column->Expired()) {
std::shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone();
new_columns.push_back(tombstone);
*changed = true;
@ -298,11 +277,9 @@ RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const {
return RowValue(std::move(new_columns), last_modified_time_);
}
bool RowValue::Empty() const {
return columns_.empty();
}
bool RowValue::Empty() const { return columns_.empty(); }
RowValue RowValue::Deserialize(const char *src, std::size_t size) {
RowValue RowValue::Deserialize(const char* src, std::size_t size) {
std::size_t offset = 0;
assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
int32_t local_deletion_time =
@ -321,9 +298,9 @@ RowValue RowValue::Deserialize(const char *src, std::size_t size) {
int64_t last_modified_time = 0;
while (offset < size) {
auto c = ColumnBase::Deserialize(src, offset);
offset += c -> Size();
offset += c->Size();
assert(offset <= size);
last_modified_time = std::max(last_modified_time, c -> Timestamp());
last_modified_time = std::max(last_modified_time, c->Timestamp());
columns.push_back(std::move(c));
}
@ -344,9 +321,9 @@ RowValue RowValue::Merge(std::vector<RowValue>&& values) {
// Merge columns by their last modified time, and skip once we hit
// a row tombstone.
std::sort(values.begin(), values.end(),
[](const RowValue& r1, const RowValue& r2) {
return r1.LastModifiedTime() > r2.LastModifiedTime();
});
[](const RowValue& r1, const RowValue& r2) {
return r1.LastModifiedTime() > r2.LastModifiedTime();
});
std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns;
int64_t tombstone_timestamp = 0;
@ -373,7 +350,7 @@ RowValue RowValue::Merge(std::vector<RowValue>&& values) {
int64_t last_modified_time = 0;
Columns columns;
for (auto& pair: merged_columns) {
for (auto& pair : merged_columns) {
// For some row, its last_modified_time > row tombstone_timestamp, but
// it might have rows whose timestamp is ealier than tombstone, so we
// ned to filter these rows.
@ -386,5 +363,5 @@ RowValue RowValue::Merge(std::vector<RowValue>&& values) {
return RowValue(std::move(columns), last_modified_time);
}
} // namepsace cassandrda
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -58,6 +58,7 @@
#include <chrono>
#include <memory>
#include <vector>
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
@ -70,9 +71,8 @@ enum ColumnTypeMask {
EXPIRATION_MASK = 0x02,
};
class ColumnBase {
public:
public:
ColumnBase(int8_t mask, int8_t index);
virtual ~ColumnBase() = default;
@ -84,15 +84,15 @@ public:
static std::shared_ptr<ColumnBase> Deserialize(const char* src,
std::size_t offset);
private:
private:
int8_t mask_;
int8_t index_;
};
class Column : public ColumnBase {
public:
Column(int8_t mask, int8_t index, int64_t timestamp,
int32_t value_size, const char* value);
public:
Column(int8_t mask, int8_t index, int64_t timestamp, int32_t value_size,
const char* value);
virtual int64_t Timestamp() const override;
virtual std::size_t Size() const override;
@ -100,16 +100,16 @@ public:
static std::shared_ptr<Column> Deserialize(const char* src,
std::size_t offset);
private:
private:
int64_t timestamp_;
int32_t value_size_;
const char* value_;
};
class Tombstone : public ColumnBase {
public:
Tombstone(int8_t mask, int8_t index,
int32_t local_deletion_time, int64_t marked_for_delete_at);
public:
Tombstone(int8_t mask, int8_t index, int32_t local_deletion_time,
int64_t marked_for_delete_at);
virtual int64_t Timestamp() const override;
virtual std::size_t Size() const override;
@ -118,15 +118,15 @@ public:
static std::shared_ptr<Tombstone> Deserialize(const char* src,
std::size_t offset);
private:
private:
int32_t local_deletion_time_;
int64_t marked_for_delete_at_;
};
class ExpiringColumn : public Column {
public:
public:
ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp,
int32_t value_size, const char* value, int32_t ttl);
int32_t value_size, const char* value, int32_t ttl);
virtual std::size_t Size() const override;
virtual void Serialize(std::string* dest) const override;
@ -136,7 +136,7 @@ public:
static std::shared_ptr<ExpiringColumn> Deserialize(const char* src,
std::size_t offset);
private:
private:
int32_t ttl_;
std::chrono::time_point<std::chrono::system_clock> TimePoint() const;
std::chrono::seconds Ttl() const;
@ -145,12 +145,11 @@ private:
using Columns = std::vector<std::shared_ptr<ColumnBase>>;
class RowValue {
public:
public:
// Create a Row Tombstone.
RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at);
// Create a Row containing columns.
RowValue(Columns columns,
int64_t last_modified_time);
RowValue(Columns columns, int64_t last_modified_time);
RowValue(const RowValue& /*that*/) = delete;
RowValue(RowValue&& /*that*/) noexcept = default;
RowValue& operator=(const RowValue& /*that*/) = delete;
@ -180,5 +179,5 @@ public:
int64_t last_modified_time_;
};
} // namepsace cassandrda
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -44,9 +44,8 @@ bool CassandraValueMergeOperator::FullMergeV2(
merge_out->new_value.clear();
std::vector<RowValue> row_values;
if (merge_in.existing_value) {
row_values.push_back(
RowValue::Deserialize(merge_in.existing_value->data(),
merge_in.existing_value->size()));
row_values.push_back(RowValue::Deserialize(
merge_in.existing_value->data(), merge_in.existing_value->size()));
}
for (auto& operand : merge_in.operand_list) {
@ -78,6 +77,6 @@ bool CassandraValueMergeOperator::PartialMergeMulti(
return true;
}
} // namespace cassandra
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -15,30 +15,30 @@ namespace cassandra {
* A MergeOperator for rocksdb that implements Cassandra row value merge.
*/
class CassandraValueMergeOperator : public MergeOperator {
public:
explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds,
size_t operands_limit = 0);
public:
explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds,
size_t operands_limit = 0);
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override;
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override;
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const override;
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const override;
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "CassandraValueMergeOperator"; }
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "CassandraValueMergeOperator"; }
virtual bool AllowSingleOperand() const override { return true; }
virtual bool AllowSingleOperand() const override { return true; }
virtual bool ShouldMerge(const std::vector<Slice>& operands) const override {
return options_.operands_limit > 0 &&
operands.size() >= options_.operands_limit;
}
virtual bool ShouldMerge(const std::vector<Slice>& operands) const override {
return options_.operands_limit > 0 &&
operands.size() >= options_.operands_limit;
}
private:
CassandraOptions options_;
private:
CassandraOptions options_;
};
} // namespace cassandra
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -20,61 +20,62 @@ namespace cassandra {
namespace {
const int64_t kCharMask = 0xFFLL;
const int32_t kBitsPerByte = 8;
}
} // namespace
template<typename T>
template <typename T>
void Serialize(T val, std::string* dest);
template<typename T>
T Deserialize(const char* src, std::size_t offset=0);
template <typename T>
T Deserialize(const char* src, std::size_t offset = 0);
// Specializations
template<>
template <>
inline void Serialize<int8_t>(int8_t t, std::string* dest) {
dest->append(1, static_cast<char>(t & kCharMask));
}
template<>
template <>
inline void Serialize<int32_t>(int32_t t, std::string* dest) {
for (unsigned long i = 0; i < sizeof(int32_t); i++) {
dest->append(1, static_cast<char>(
(t >> (sizeof(int32_t) - 1 - i) * kBitsPerByte) & kCharMask));
dest->append(
1, static_cast<char>((t >> (sizeof(int32_t) - 1 - i) * kBitsPerByte) &
kCharMask));
}
}
template<>
template <>
inline void Serialize<int64_t>(int64_t t, std::string* dest) {
for (unsigned long i = 0; i < sizeof(int64_t); i++) {
dest->append(
1, static_cast<char>(
(t >> (sizeof(int64_t) - 1 - i) * kBitsPerByte) & kCharMask));
dest->append(
1, static_cast<char>((t >> (sizeof(int64_t) - 1 - i) * kBitsPerByte) &
kCharMask));
}
}
template<>
template <>
inline int8_t Deserialize<int8_t>(const char* src, std::size_t offset) {
return static_cast<int8_t>(src[offset]);
}
template<>
template <>
inline int32_t Deserialize<int32_t>(const char* src, std::size_t offset) {
int32_t result = 0;
for (unsigned long i = 0; i < sizeof(int32_t); i++) {
result |= static_cast<int32_t>(static_cast<unsigned char>(src[offset + i]))
<< ((sizeof(int32_t) - 1 - i) * kBitsPerByte);
<< ((sizeof(int32_t) - 1 - i) * kBitsPerByte);
}
return result;
}
template<>
template <>
inline int64_t Deserialize<int64_t>(const char* src, std::size_t offset) {
int64_t result = 0;
for (unsigned long i = 0; i < sizeof(int64_t); i++) {
result |= static_cast<int64_t>(static_cast<unsigned char>(src[offset + i]))
<< ((sizeof(int64_t) - 1 - i) * kBitsPerByte);
<< ((sizeof(int64_t) - 1 - i) * kBitsPerByte);
}
return result;
}
} // namepsace cassandrda
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -14,18 +14,17 @@ const int8_t kColumn = 0;
const int8_t kTombstone = 1;
const int8_t kExpiringColumn = 2;
std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask,
int8_t index,
std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask, int8_t index,
int64_t timestamp) {
if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
return std::shared_ptr<Tombstone>(
new Tombstone(mask, index, ToSeconds(timestamp), timestamp));
} else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
return std::shared_ptr<ExpiringColumn>(new ExpiringColumn(
mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl));
mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl));
} else {
return std::shared_ptr<Column>(
new Column(mask, index, timestamp, sizeof(kData), kData));
new Column(mask, index, timestamp, sizeof(kData), kData));
}
}
@ -39,10 +38,10 @@ RowValue CreateTestRowValue(
std::vector<std::tuple<int8_t, int8_t, int64_t>> column_specs) {
std::vector<std::shared_ptr<ColumnBase>> columns;
int64_t last_modified_time = 0;
for (auto spec: column_specs) {
for (auto spec : column_specs) {
auto c = CreateTestColumn(std::get<0>(spec), std::get<1>(spec),
std::get<2>(spec));
last_modified_time = std::max(last_modified_time, c -> Timestamp());
last_modified_time = std::max(last_modified_time, c->Timestamp());
columns.push_back(std::move(c));
}
return RowValue(std::move(columns), last_modified_time);
@ -61,12 +60,10 @@ void VerifyRowValueColumns(
EXPECT_EQ(expected_index, columns[index_of_vector]->Index());
}
int64_t ToMicroSeconds(int64_t seconds) {
return seconds * (int64_t)1000000;
}
int64_t ToMicroSeconds(int64_t seconds) { return seconds * (int64_t)1000000; }
int32_t ToSeconds(int64_t microseconds) {
return (int32_t)(microseconds / (int64_t)1000000);
}
}
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -5,6 +5,7 @@
#pragma once
#include <memory>
#include "test_util/testharness.h"
#include "utilities/cassandra/format.h"
#include "utilities/cassandra/serialize.h"
@ -18,9 +19,7 @@ extern const int8_t kColumn;
extern const int8_t kTombstone;
extern const int8_t kExpiringColumn;
std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask,
int8_t index,
std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask, int8_t index,
int64_t timestamp);
std::tuple<int8_t, int8_t, int64_t> CreateTestColumnSpec(int8_t mask,
@ -39,5 +38,5 @@ void VerifyRowValueColumns(
int64_t ToMicroSeconds(int64_t seconds);
int32_t ToSeconds(int64_t microseconds);
}
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -6,11 +6,11 @@
#pragma once
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/checkpoint.h"
#include <string>
#include "file/filename.h"
#include "rocksdb/db.h"
#include "rocksdb/utilities/checkpoint.h"
namespace ROCKSDB_NAMESPACE {

View File

@ -136,9 +136,8 @@ class CheckpointTest : public testing::Test {
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
}
Status TryReopenWithColumnFamilies(
const std::vector<std::string>& cfs,
const std::vector<Options>& options) {
Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const std::vector<Options>& options) {
Close();
EXPECT_EQ(cfs.size(), options.size());
std::vector<ColumnFamilyDescriptor> column_families;
@ -156,9 +155,7 @@ class CheckpointTest : public testing::Test {
return TryReopenWithColumnFamilies(cfs, v_opts);
}
void Reopen(const Options& options) {
ASSERT_OK(TryReopen(options));
}
void Reopen(const Options& options) { ASSERT_OK(TryReopen(options)); }
void CompactAll() {
for (auto h : handles_) {
@ -223,9 +220,7 @@ class CheckpointTest : public testing::Test {
return db_->Put(wo, handles_[cf], k, v);
}
Status Delete(const std::string& k) {
return db_->Delete(WriteOptions(), k);
}
Status Delete(const std::string& k) { return db_->Delete(WriteOptions(), k); }
Status Delete(int cf, const std::string& k) {
return db_->Delete(WriteOptions(), handles_[cf], k);
@ -512,18 +507,18 @@ TEST_F(CheckpointTest, CheckpointCF) {
std::vector<std::string> cfs;
cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"};
std::vector<ColumnFamilyDescriptor> column_families;
for (size_t i = 0; i < cfs.size(); ++i) {
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options));
}
ASSERT_OK(DB::Open(options, snapshot_name_,
column_families, &cphandles, &snapshotDB));
for (size_t i = 0; i < cfs.size(); ++i) {
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options));
}
ASSERT_OK(DB::Open(options, snapshot_name_, column_families, &cphandles,
&snapshotDB));
ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result));
ASSERT_EQ("Default1", result);
ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result));
ASSERT_EQ("eleven", result);
ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result));
for (auto h : cphandles) {
delete h;
delete h;
}
cphandles.clear();
delete snapshotDB;

View File

@ -5,10 +5,11 @@
#ifndef ROCKSDB_LITE
#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
#include <string>
#include "rocksdb/slice.h"
#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
namespace ROCKSDB_NAMESPACE {

View File

@ -8,6 +8,7 @@
// found in the LICENSE file.
#include "rocksdb/utilities/info_log_finder.h"
#include "file/filename.h"
#include "rocksdb/env.h"

View File

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/env_mirror.h"
#include "env/mock_env.h"
#include "test_util/testharness.h"
@ -15,7 +16,7 @@ namespace ROCKSDB_NAMESPACE {
class EnvMirrorTest : public testing::Test {
public:
Env* default_;
MockEnv* a_, *b_;
MockEnv *a_, *b_;
EnvMirror* env_;
const EnvOptions soptions_;
@ -97,8 +98,9 @@ TEST_F(EnvMirrorTest, Basics) {
ASSERT_TRUE(
!env_->NewSequentialFile("/dir/non_existent", &seq_file, soptions_).ok());
ASSERT_TRUE(!seq_file);
ASSERT_TRUE(!env_->NewRandomAccessFile("/dir/non_existent", &rand_file,
soptions_).ok());
ASSERT_TRUE(
!env_->NewRandomAccessFile("/dir/non_existent", &rand_file, soptions_)
.ok());
ASSERT_TRUE(!rand_file);
// Check that deleting works.

View File

@ -11,8 +11,7 @@
namespace ROCKSDB_NAMESPACE {
class TimedEnvTest : public testing::Test {
};
class TimedEnvTest : public testing::Test {};
TEST_F(TimedEnvTest, BasicTest) {
SetPerfLevel(PerfLevel::kEnableTime);

View File

@ -85,8 +85,7 @@ class TestWritableFile : public WritableFile {
virtual Status Flush() override;
virtual Status Sync() override;
virtual bool IsSyncThreadSafe() const override { return true; }
virtual Status PositionedAppend(const Slice& data,
uint64_t offset) override {
virtual Status PositionedAppend(const Slice& data, uint64_t offset) override {
return target_->PositionedAppend(data, offset);
}
virtual Status PositionedAppend(
@ -227,8 +226,8 @@ class FaultInjectionTestEnv : public EnvWrapper {
MutexLock l(&mutex_);
return filesystem_active_;
}
void SetFilesystemActiveNoLock(bool active,
Status error = Status::Corruption("Not active")) {
void SetFilesystemActiveNoLock(
bool active, Status error = Status::Corruption("Not active")) {
error.PermitUncheckedError();
filesystem_active_ = active;
if (!active) {
@ -237,7 +236,7 @@ class FaultInjectionTestEnv : public EnvWrapper {
error.PermitUncheckedError();
}
void SetFilesystemActive(bool active,
Status error = Status::Corruption("Not active")) {
Status error = Status::Corruption("Not active")) {
error.PermitUncheckedError();
MutexLock l(&mutex_);
SetFilesystemActiveNoLock(active, error);

View File

@ -386,9 +386,9 @@ IOStatus TestFSRandomRWFile::Sync(const IOOptions& options,
return target_->Sync(options, dbg);
}
TestFSRandomAccessFile::TestFSRandomAccessFile(const std::string& /*fname*/,
std::unique_ptr<FSRandomAccessFile>&& f,
FaultInjectionTestFS* fs)
TestFSRandomAccessFile::TestFSRandomAccessFile(
const std::string& /*fname*/, std::unique_ptr<FSRandomAccessFile>&& f,
FaultInjectionTestFS* fs)
: target_(std::move(f)), fs_(fs) {
assert(target_ != nullptr);
}
@ -912,8 +912,7 @@ IOStatus FaultInjectionTestFS::InjectThreadSpecificReadError(
bool dummy_bool;
bool& ret_fault_injected = fault_injected ? *fault_injected : dummy_bool;
ret_fault_injected = false;
ErrorContext* ctx =
static_cast<ErrorContext*>(thread_local_error_->Get());
ErrorContext* ctx = static_cast<ErrorContext*>(thread_local_error_->Get());
if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in) {
return IOStatus::OK();
}
@ -1019,8 +1018,7 @@ IOStatus FaultInjectionTestFS::InjectMetadataWriteError() {
void FaultInjectionTestFS::PrintFaultBacktrace() {
#if defined(OS_LINUX)
ErrorContext* ctx =
static_cast<ErrorContext*>(thread_local_error_->Get());
ErrorContext* ctx = static_cast<ErrorContext*>(thread_local_error_->Get());
if (ctx == nullptr) {
return;
}

View File

@ -135,8 +135,8 @@ class TestFSRandomRWFile : public FSRandomRWFile {
class TestFSRandomAccessFile : public FSRandomAccessFile {
public:
explicit TestFSRandomAccessFile(const std::string& fname,
std::unique_ptr<FSRandomAccessFile>&& f,
FaultInjectionTestFS* fs);
std::unique_ptr<FSRandomAccessFile>&& f,
FaultInjectionTestFS* fs);
~TestFSRandomAccessFile() override {}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
@ -331,8 +331,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
error.PermitUncheckedError();
SetFilesystemActiveNoLock(active, error);
}
void SetFilesystemDirectWritable(
bool writable) {
void SetFilesystemDirectWritable(bool writable) {
MutexLock l(&mutex_);
filesystem_writable_ = writable;
}
@ -396,7 +395,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
// 1/one_in probability)
void SetThreadLocalReadErrorContext(uint32_t seed, int one_in) {
struct ErrorContext* ctx =
static_cast<struct ErrorContext*>(thread_local_error_->Get());
static_cast<struct ErrorContext*>(thread_local_error_->Get());
if (ctx == nullptr) {
ctx = new ErrorContext(seed);
thread_local_error_->Reset(ctx);
@ -405,7 +404,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
ctx->count = 0;
}
static void DeleteThreadLocalErrorContext(void *p) {
static void DeleteThreadLocalErrorContext(void* p) {
ErrorContext* ctx = static_cast<ErrorContext*>(p);
delete ctx;
}
@ -466,8 +465,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
// Get the count of how many times we injected since the previous call
int GetAndResetErrorCount() {
ErrorContext* ctx =
static_cast<ErrorContext*>(thread_local_error_->Get());
ErrorContext* ctx = static_cast<ErrorContext*>(thread_local_error_->Get());
int count = 0;
if (ctx != nullptr) {
count = ctx->count;
@ -477,8 +475,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
}
void EnableErrorInjection() {
ErrorContext* ctx =
static_cast<ErrorContext*>(thread_local_error_->Get());
ErrorContext* ctx = static_cast<ErrorContext*>(thread_local_error_->Get());
if (ctx) {
ctx->enable_error_injection = true;
}
@ -499,8 +496,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
}
void DisableErrorInjection() {
ErrorContext* ctx =
static_cast<ErrorContext*>(thread_local_error_->Get());
ErrorContext* ctx = static_cast<ErrorContext*>(thread_local_error_->Get());
if (ctx) {
ctx->enable_error_injection = false;
}
@ -530,7 +526,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
// will be recovered to content accordingly.
std::unordered_map<std::string, std::map<std::string, std::string>>
dir_to_new_files_since_last_sync_;
bool filesystem_active_; // Record flushes, syncs, writes
bool filesystem_active_; // Record flushes, syncs, writes
bool filesystem_writable_; // Bypass FaultInjectionTestFS and go directly
// to underlying FS for writable files
IOStatus error_;

View File

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/utilities/leveldb_options.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"

View File

@ -4,13 +4,13 @@
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include "rocksdb/merge_operator.h"
#include <stdio.h>
#include <memory>
#include <string>
#include "rocksdb/merge_operator.h"
namespace ROCKSDB_NAMESPACE {
class MergeOperators {
@ -19,7 +19,8 @@ class MergeOperators {
static std::shared_ptr<MergeOperator> CreateDeprecatedPutOperator();
static std::shared_ptr<MergeOperator> CreateUInt64AddOperator();
static std::shared_ptr<MergeOperator> CreateStringAppendOperator();
static std::shared_ptr<MergeOperator> CreateStringAppendOperator(char delim_char);
static std::shared_ptr<MergeOperator> CreateStringAppendOperator(
char delim_char);
static std::shared_ptr<MergeOperator> CreateStringAppendOperator(
const std::string& delim);
static std::shared_ptr<MergeOperator> CreateStringAppendTESTOperator();

View File

@ -3,28 +3,26 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/merge_operators/bytesxor.h"
#include <algorithm>
#include <string>
#include "utilities/merge_operators/bytesxor.h"
namespace ROCKSDB_NAMESPACE {
std::shared_ptr<MergeOperator> MergeOperators::CreateBytesXOROperator() {
return std::make_shared<BytesXOROperator>();
}
bool BytesXOROperator::Merge(const Slice& /*key*/,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* /*logger*/) const {
bool BytesXOROperator::Merge(const Slice& /*key*/, const Slice* existing_value,
const Slice& value, std::string* new_value,
Logger* /*logger*/) const {
XOR(existing_value, value, new_value);
return true;
}
void BytesXOROperator::XOR(const Slice* existing_value,
const Slice& value, std::string* new_value) const {
void BytesXOROperator::XOR(const Slice* existing_value, const Slice& value,
std::string* new_value) const {
if (!existing_value) {
new_value->clear();
new_value->assign(value.data(), value.size());

View File

@ -8,6 +8,7 @@
#include <algorithm>
#include <memory>
#include <string>
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
@ -22,10 +23,8 @@ class BytesXOROperator : public AssociativeMergeOperator {
public:
// XORs the two array of bytes one byte at a time and stores the result
// in new_value. len is the number of xored bytes, and the length of new_value
virtual bool Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
virtual bool Merge(const Slice& key, const Slice* existing_value,
const Slice& value, std::string* new_value,
Logger* logger) const override;
static const char* kClassName() { return "BytesXOR"; }
@ -35,7 +34,7 @@ class BytesXOROperator : public AssociativeMergeOperator {
const char* Name() const override { return kClassName(); }
void XOR(const Slice* existing_value, const Slice& value,
std::string* new_value) const;
std::string* new_value) const;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -4,11 +4,12 @@
// (found in the LICENSE.Apache file in the root directory).
#include <memory>
#include "rocksdb/slice.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
#include "utilities/merge_operators.h"
namespace { // anonymous namespace
namespace { // anonymous namespace
using ROCKSDB_NAMESPACE::Logger;
using ROCKSDB_NAMESPACE::MergeOperator;
@ -77,7 +78,7 @@ class PutOperatorV2 : public PutOperator {
const char* NickName() const override { return kNickName(); }
};
} // end of anonymous namespace
} // end of anonymous namespace
namespace ROCKSDB_NAMESPACE {

View File

@ -3,11 +3,11 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/merge_operators/sortlist.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
bool SortList::FullMergeV2(const MergeOperationInput& merge_in,

View File

@ -48,7 +48,7 @@ bool StringAppendOperator::Merge(const Slice& /*key*/,
if (!existing_value) {
// No existing_value. Set *new_value = value
new_value->assign(value.data(),value.size());
new_value->assign(value.data(), value.size());
} else {
// Generic append (existing_value != null).
// Reserve *new_value to correct size, and apply concatenation.
@ -61,12 +61,12 @@ bool StringAppendOperator::Merge(const Slice& /*key*/,
return true;
}
std::shared_ptr<MergeOperator> MergeOperators::CreateStringAppendOperator() {
return std::make_shared<StringAppendOperator>(',');
}
std::shared_ptr<MergeOperator> MergeOperators::CreateStringAppendOperator(char delim_char) {
std::shared_ptr<MergeOperator> MergeOperators::CreateStringAppendOperator(
char delim_char) {
return std::make_shared<StringAppendOperator>(delim_char);
}

View File

@ -16,10 +16,8 @@ class StringAppendOperator : public AssociativeMergeOperator {
explicit StringAppendOperator(char delim_char);
explicit StringAppendOperator(const std::string& delim);
virtual bool Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
virtual bool Merge(const Slice& key, const Slice* existing_value,
const Slice& value, std::string* new_value,
Logger* logger) const override;
static const char* kClassName() { return "StringAppendOperator"; }

View File

@ -31,8 +31,8 @@ class StringAppendTESTOperator : public MergeOperator {
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const
override;
std::string* new_value,
Logger* logger) const override;
static const char* kClassName() { return "StringAppendTESTOperator"; }
static const char* kNickName() { return "stringappendtest"; }

View File

@ -10,7 +10,7 @@
*
* @author Deon Nicholas (dnicholas@fb.com)
* Copyright 2013 Facebook, Inc.
*/
*/
#include "utilities/merge_operators/string_append/stringappend.h"
@ -27,7 +27,6 @@
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend2.h"
namespace ROCKSDB_NAMESPACE {
// Path to the database on file system
@ -73,18 +72,15 @@ std::shared_ptr<DB> OpenTtlDb(const std::string& delim) {
/// Supports Append(list, string) and Get(list)
class StringLists {
public:
//Constructor: specifies the rocksdb db
// Constructor: specifies the rocksdb db
/* implicit */
StringLists(std::shared_ptr<DB> db)
: db_(db),
merge_option_(),
get_option_() {
: db_(db), merge_option_(), get_option_() {
assert(db);
}
// Append string val onto the list defined by key; return true on success
bool Append(const std::string& key, const std::string& val){
bool Append(const std::string& key, const std::string& val) {
Slice valSlice(val.data(), val.size());
auto s = db_->Merge(merge_option_, key, valSlice);
@ -97,8 +93,8 @@ class StringLists {
}
// Returns the list of strings associated with key (or "" if does not exist)
bool Get(const std::string& key, std::string* const result){
assert(result != nullptr); // we should have a place to store the result
bool Get(const std::string& key, std::string* const result) {
assert(result != nullptr); // we should have a place to store the result
auto s = db_->Get(get_option_, key, result);
if (s.ok()) {
@ -106,10 +102,10 @@ class StringLists {
}
// Either key does not exist, or there is some error.
*result = ""; // Always return empty string (just for convention)
*result = ""; // Always return empty string (just for convention)
//NotFound is okay; just return empty (similar to std::map)
//But network or db errors, etc, should fail the test (or at least yell)
// NotFound is okay; just return empty (similar to std::map)
// But network or db errors, etc, should fail the test (or at least yell)
if (!s.IsNotFound()) {
std::cerr << "ERROR " << s.ToString() << std::endl;
}
@ -118,15 +114,12 @@ class StringLists {
return false;
}
private:
std::shared_ptr<DB> db_;
WriteOptions merge_option_;
ReadOptions get_option_;
};
// The class for unit-testing
class StringAppendOperatorTest : public testing::Test,
public ::testing::WithParamInterface<bool> {
@ -153,14 +146,13 @@ class StringAppendOperatorTest : public testing::Test,
// Allows user to open databases with different configurations.
// e.g.: Can open a DB or a TtlDB, etc.
static void SetOpenDbFunction(OpenFuncPtr func) {
OpenDb = func;
}
static void SetOpenDbFunction(OpenFuncPtr func) { OpenDb = func; }
protected:
static OpenFuncPtr OpenDb;
};
StringAppendOperatorTest::OpenFuncPtr StringAppendOperatorTest::OpenDb = nullptr;
StringAppendOperatorTest::OpenFuncPtr StringAppendOperatorTest::OpenDb =
nullptr;
// THE TEST CASES BEGIN HERE
@ -206,7 +198,6 @@ TEST_P(StringAppendOperatorTest, IteratorTest) {
}
}
// Should release the snapshot and be aware of the new stuff now
it.reset(db_->NewIterator(ReadOptions()));
first = true;
@ -236,7 +227,7 @@ TEST_P(StringAppendOperatorTest, IteratorTest) {
it.reset(db_->NewIterator(ReadOptions()));
first = true;
std::string k3("k3");
for(it->Seek(k2); it->Valid(); it->Next()) {
for (it->Seek(k2); it->Valid(); it->Next()) {
res = it->value().ToString();
if (first) {
ASSERT_EQ(res, "a1,a2,a3,a4");
@ -245,7 +236,7 @@ TEST_P(StringAppendOperatorTest, IteratorTest) {
ASSERT_EQ(res, "g1");
}
}
for(it->Seek(k3); it->Valid(); it->Next()) {
for (it->Seek(k3); it->Valid(); it->Next()) {
res = it->value().ToString();
if (first) {
// should not be hit
@ -353,7 +344,7 @@ TEST_P(StringAppendOperatorTest, VariousKeys) {
sb = slists.Get("b", &b);
sc = slists.Get("c", &c);
ASSERT_TRUE(sa && sb && sc); // All three keys should have been found
ASSERT_TRUE(sa && sb && sc); // All three keys should have been found
ASSERT_EQ(a, "x\nt\nr");
ASSERT_EQ(b, "y\n2");
@ -367,22 +358,23 @@ TEST_P(StringAppendOperatorTest, RandomMixGetAppend) {
// Generate a list of random keys and values
const int kWordCount = 15;
std::string words[] = {"sdasd", "triejf", "fnjsdfn", "dfjisdfsf", "342839",
"dsuha", "mabuais", "sadajsid", "jf9834hf", "2d9j89",
"dj9823jd", "a", "dk02ed2dh", "$(jd4h984$(*", "mabz"};
std::string words[] = {"sdasd", "triejf", "fnjsdfn", "dfjisdfsf",
"342839", "dsuha", "mabuais", "sadajsid",
"jf9834hf", "2d9j89", "dj9823jd", "a",
"dk02ed2dh", "$(jd4h984$(*", "mabz"};
const int kKeyCount = 6;
std::string keys[] = {"dhaiusdhu", "denidw", "daisda", "keykey", "muki",
"shzassdianmd"};
std::string keys[] = {"dhaiusdhu", "denidw", "daisda",
"keykey", "muki", "shzassdianmd"};
// Will store a local copy of all data in order to verify correctness
std::map<std::string, std::string> parallel_copy;
// Generate a bunch of random queries (Append and Get)!
enum query_t { APPEND_OP, GET_OP, NUM_OPS };
Random randomGen(1337); //deterministic seed; always get same results!
enum query_t { APPEND_OP, GET_OP, NUM_OPS };
Random randomGen(1337); // deterministic seed; always get same results!
const int kNumQueries = 30;
for (int q=0; q<kNumQueries; ++q) {
for (int q = 0; q < kNumQueries; ++q) {
// Generate a random query (Append or Get) and random parameters
query_t query = (query_t)randomGen.Uniform((int)NUM_OPS);
std::string key = keys[randomGen.Uniform((int)kKeyCount)];
@ -390,9 +382,8 @@ TEST_P(StringAppendOperatorTest, RandomMixGetAppend) {
// Apply the query and any checks.
if (query == APPEND_OP) {
// Apply the rocksdb test-harness Append defined above
slists.Append(key, word); //apply the rocksdb append
slists.Append(key, word); // apply the rocksdb append
// Apply the similar "Append" to the parallel copy
if (parallel_copy[key].size() > 0) {
@ -407,7 +398,6 @@ TEST_P(StringAppendOperatorTest, RandomMixGetAppend) {
slists.Get(key, &res);
ASSERT_EQ(res, parallel_copy[key]);
}
}
}
@ -417,32 +407,32 @@ TEST_P(StringAppendOperatorTest, BIGRandomMixGetAppend) {
// Generate a list of random keys and values
const int kWordCount = 15;
std::string words[] = {"sdasd", "triejf", "fnjsdfn", "dfjisdfsf", "342839",
"dsuha", "mabuais", "sadajsid", "jf9834hf", "2d9j89",
"dj9823jd", "a", "dk02ed2dh", "$(jd4h984$(*", "mabz"};
std::string words[] = {"sdasd", "triejf", "fnjsdfn", "dfjisdfsf",
"342839", "dsuha", "mabuais", "sadajsid",
"jf9834hf", "2d9j89", "dj9823jd", "a",
"dk02ed2dh", "$(jd4h984$(*", "mabz"};
const int kKeyCount = 6;
std::string keys[] = {"dhaiusdhu", "denidw", "daisda", "keykey", "muki",
"shzassdianmd"};
std::string keys[] = {"dhaiusdhu", "denidw", "daisda",
"keykey", "muki", "shzassdianmd"};
// Will store a local copy of all data in order to verify correctness
std::map<std::string, std::string> parallel_copy;
// Generate a bunch of random queries (Append and Get)!
enum query_t { APPEND_OP, GET_OP, NUM_OPS };
Random randomGen(9138204); // deterministic seed
enum query_t { APPEND_OP, GET_OP, NUM_OPS };
Random randomGen(9138204); // deterministic seed
const int kNumQueries = 1000;
for (int q=0; q<kNumQueries; ++q) {
for (int q = 0; q < kNumQueries; ++q) {
// Generate a random query (Append or Get) and random parameters
query_t query = (query_t)randomGen.Uniform((int)NUM_OPS);
std::string key = keys[randomGen.Uniform((int)kKeyCount)];
std::string word = words[randomGen.Uniform((int)kWordCount)];
//Apply the query and any checks.
// Apply the query and any checks.
if (query == APPEND_OP) {
// Apply the rocksdb test-harness Append defined above
slists.Append(key, word); //apply the rocksdb append
slists.Append(key, word); // apply the rocksdb append
// Apply the similar "Append" to the parallel copy
if (parallel_copy[key].size() > 0) {
@ -457,7 +447,6 @@ TEST_P(StringAppendOperatorTest, BIGRandomMixGetAppend) {
slists.Get(key, &res);
ASSERT_EQ(res, parallel_copy[key]);
}
}
}
@ -578,7 +567,7 @@ TEST_P(StringAppendOperatorTest, PersistentFlushAndCompaction) {
ASSERT_TRUE(slists.Get("a", &a));
ASSERT_EQ(a, "x\nt\nr");
//Append, Compact, Get
// Append, Compact, Get
slists.Append("c", "bbnagnagsx");
slists.Append("a", "sa");
slists.Append("b", "df");
@ -629,8 +618,8 @@ TEST_P(StringAppendOperatorTest, SimpleTestNullDelimiter) {
ASSERT_TRUE(slists.Get("k1", &res));
// Construct the desired string. Default constructor doesn't like '\0' chars.
std::string checker("v1,v2,v3"); // Verify that the string is right size.
checker[2] = '\0'; // Use null delimiter instead of comma.
std::string checker("v1,v2,v3"); // Verify that the string is right size.
checker[2] = '\0'; // Use null delimiter instead of comma.
checker[5] = '\0';
ASSERT_EQ(checker.size(), 8); // Verify it is still the correct size

View File

@ -12,7 +12,7 @@
#include "util/coding.h"
#include "utilities/merge_operators.h"
namespace { // anonymous namespace
namespace { // anonymous namespace
using ROCKSDB_NAMESPACE::AssociativeMergeOperator;
using ROCKSDB_NAMESPACE::InfoLogLevel;
@ -27,7 +27,7 @@ class UInt64AddOperator : public AssociativeMergeOperator {
const Slice& value, std::string* new_value,
Logger* logger) const override {
uint64_t orig_value = 0;
if (existing_value){
if (existing_value) {
orig_value = DecodeInteger(*existing_value, logger);
}
uint64_t operand = DecodeInteger(value, logger);

View File

@ -57,8 +57,8 @@ Status LoadOptionsFromFile(const ConfigOptions& config_options,
return Status::OK();
}
Status GetLatestOptionsFileName(const std::string& dbpath,
Env* env, std::string* options_file_name) {
Status GetLatestOptionsFileName(const std::string& dbpath, Env* env,
std::string* options_file_name) {
Status s;
std::string latest_file_name;
uint64_t latest_time_stamp = 0;

View File

@ -132,7 +132,7 @@ Status BlockCacheTier::Close() {
return Status::OK();
}
template<class T>
template <class T>
void Add(std::map<std::string, double>* stats, const std::string& key,
const T& t) {
stats->insert({key, static_cast<double>(t)});
@ -148,8 +148,7 @@ PersistentCache::StatsType BlockCacheTier::Stats() {
stats_.bytes_read_.Average());
Add(&stats, "persistentcache.blockcachetier.insert_dropped",
stats_.insert_dropped_);
Add(&stats, "persistentcache.blockcachetier.cache_hits",
stats_.cache_hits_);
Add(&stats, "persistentcache.blockcachetier.cache_hits", stats_.cache_hits_);
Add(&stats, "persistentcache.blockcachetier.cache_misses",
stats_.cache_misses_);
Add(&stats, "persistentcache.blockcachetier.cache_errors",
@ -326,10 +325,9 @@ Status BlockCacheTier::NewCacheFile() {
TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",
(void*)(GetCachePath().c_str()));
std::unique_ptr<WriteableCacheFile> f(
new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_,
GetCachePath(), writer_cache_id_,
opt_.cache_file_size, opt_.log));
std::unique_ptr<WriteableCacheFile> f(new WriteableCacheFile(
opt_.env, &buffer_allocator_, &writer_, GetCachePath(), writer_cache_id_,
opt_.cache_file_size, opt_.log));
bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
if (!status) {

View File

@ -6,9 +6,9 @@
#ifndef ROCKSDB_LITE
#ifndef OS_WIN
#ifndef OS_WIN
#include <unistd.h>
#endif // ! OS_WIN
#endif // ! OS_WIN
#include <atomic>
#include <list>
@ -45,7 +45,8 @@ class BlockCacheTier : public PersistentCacheTier {
: opt_(opt),
insert_ops_(static_cast<size_t>(opt_.max_write_pipeline_backlog_size)),
buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()),
writer_(this, opt_.writer_qdepth, static_cast<size_t>(opt_.writer_dispatch_size)) {
writer_(this, opt_.writer_qdepth,
static_cast<size_t>(opt_.writer_dispatch_size)) {
Info(opt_.log, "Initializing allocator. size=%d B count=%" ROCKSDB_PRIszt,
opt_.write_buffer_size, opt_.write_buffer_count());
}
@ -147,7 +148,7 @@ class BlockCacheTier : public PersistentCacheTier {
ThreadedWriter writer_; // Writer threads
BlockCacheTierMetadata metadata_; // Cache meta data manager
std::atomic<uint64_t> size_{0}; // Size of the cache
Statistics stats_; // Statistics
Statistics stats_; // Statistics
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -68,8 +68,7 @@ Status BlockCacheFile::Delete(uint64_t* size) {
// <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size -->
//
struct CacheRecordHeader {
CacheRecordHeader()
: magic_(0), crc_(0), key_size_(0), val_size_(0) {}
CacheRecordHeader() : magic_(0), crc_(0), key_size_(0), val_size_(0) {}
CacheRecordHeader(const uint32_t magic, const uint32_t key_size,
const uint32_t val_size)
: magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {}

View File

@ -12,19 +12,16 @@
#include <vector>
#include "file/random_access_file_reader.h"
#include "port/port.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "util/crc32c.h"
#include "util/mutexlock.h"
#include "utilities/persistent_cache/block_cache_tier_file_buffer.h"
#include "utilities/persistent_cache/lrulist.h"
#include "utilities/persistent_cache/persistent_cache_tier.h"
#include "utilities/persistent_cache/persistent_cache_util.h"
#include "port/port.h"
#include "util/crc32c.h"
#include "util/mutexlock.h"
// The io code path of persistent cache uses pipelined architecture
//
// client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel

View File

@ -8,8 +8,8 @@
#include <memory>
#include <string>
#include "rocksdb/comparator.h"
#include "memory/arena.h"
#include "rocksdb/comparator.h"
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE {

View File

@ -32,8 +32,8 @@ BlockCacheFile* BlockCacheTierMetadata::Evict() {
}
void BlockCacheTierMetadata::Clear() {
cache_file_index_.Clear([](BlockCacheFile* arg){ delete arg; });
block_index_.Clear([](BlockInfo* arg){ delete arg; });
cache_file_index_.Clear([](BlockCacheFile* arg) { delete arg; });
block_index_.Clear([](BlockInfo* arg) { delete arg; });
}
BlockInfo* BlockCacheTierMetadata::Insert(const Slice& key, const LBA& lba) {

View File

@ -11,7 +11,6 @@
#include <unordered_map>
#include "rocksdb/slice.h"
#include "utilities/persistent_cache/block_cache_tier_file.h"
#include "utilities/persistent_cache/hash_table.h"
#include "utilities/persistent_cache/hash_table_evictable.h"

View File

@ -8,6 +8,7 @@
#ifndef ROCKSDB_LITE
#include <assert.h>
#include <list>
#include <vector>

View File

@ -3,7 +3,10 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#include "utilities/persistent_cache/hash_table.h"
#include <stdlib.h>
#include <iostream>
#include <set>
#include <string>
@ -12,7 +15,6 @@
#include "memory/arena.h"
#include "test_util/testharness.h"
#include "util/random.h"
#include "utilities/persistent_cache/hash_table.h"
#include "utilities/persistent_cache/hash_table_evictable.h"
#ifndef ROCKSDB_LITE

View File

@ -234,7 +234,7 @@ class CacheTierBenchmark {
fprintf(stderr, "%s\n", status.ToString().c_str());
}
assert(status.ok());
assert(size == (size_t) FLAGS_iosize);
assert(size == (size_t)FLAGS_iosize);
// adjust stats
const size_t elapsed_micro = timer.ElapsedNanos() / 1000;

View File

@ -84,7 +84,8 @@ std::unique_ptr<PersistentCacheTier> NewBlockCache(
Env* env, const std::string& path,
const uint64_t max_size = std::numeric_limits<uint64_t>::max(),
const bool enable_direct_writes = false) {
const uint32_t max_file_size = static_cast<uint32_t>(12 * 1024 * 1024 * kStressFactor);
const uint32_t max_file_size =
static_cast<uint32_t>(12 * 1024 * 1024 * kStressFactor);
auto log = std::make_shared<ConsoleLogger>();
PersistentCacheConfig opt(env, path, max_size, log);
opt.cache_file_size = max_file_size;
@ -101,7 +102,8 @@ std::unique_ptr<PersistentTieredCache> NewTieredCache(
Env* env, const std::string& path, const uint64_t max_volatile_cache_size,
const uint64_t max_block_cache_size =
std::numeric_limits<uint64_t>::max()) {
const uint32_t max_file_size = static_cast<uint32_t>(12 * 1024 * 1024 * kStressFactor);
const uint32_t max_file_size =
static_cast<uint32_t>(12 * 1024 * 1024 * kStressFactor);
auto log = std::make_shared<ConsoleLogger>();
auto opt = PersistentCacheConfig(env, path, max_block_cache_size, log);
opt.cache_file_size = max_file_size;
@ -126,13 +128,13 @@ PersistentCacheTierTest::PersistentCacheTierTest()
TEST_F(PersistentCacheTierTest, DISABLED_BlockCacheInsertWithFileCreateError) {
cache_ = NewBlockCache(Env::Default(), path_,
/*size=*/std::numeric_limits<uint64_t>::max(),
/*direct_writes=*/ false);
/*direct_writes=*/false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlockCacheTier::NewCacheFile:DeleteDir", OnDeleteDir);
RunNegativeInsertTest(/*nthreads=*/ 1,
RunNegativeInsertTest(/*nthreads=*/1,
/*max_keys*/
static_cast<size_t>(10 * 1024 * kStressFactor));
static_cast<size_t>(10 * 1024 * kStressFactor));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
@ -171,7 +173,8 @@ TEST_F(PersistentCacheTierTest, DISABLED_VolatileCacheInsertWithEviction) {
for (auto nthreads : {1, 5}) {
for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) {
cache_ = std::make_shared<VolatileCacheTier>(
/*compressed=*/true, /*size=*/static_cast<size_t>(1 * 1024 * 1024 * kStressFactor));
/*compressed=*/true,
/*size=*/static_cast<size_t>(1 * 1024 * 1024 * kStressFactor));
RunInsertTestWithEviction(nthreads, static_cast<size_t>(max_keys));
}
}
@ -197,8 +200,9 @@ TEST_F(PersistentCacheTierTest, DISABLED_BlockCacheInsert) {
TEST_F(PersistentCacheTierTest, DISABLED_BlockCacheInsertWithEviction) {
for (auto nthreads : {1, 5}) {
for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) {
cache_ = NewBlockCache(Env::Default(), path_,
/*max_size=*/static_cast<size_t>(200 * 1024 * 1024 * kStressFactor));
cache_ = NewBlockCache(
Env::Default(), path_,
/*max_size=*/static_cast<size_t>(200 * 1024 * 1024 * kStressFactor));
RunInsertTestWithEviction(nthreads, static_cast<size_t>(max_keys));
}
}
@ -210,8 +214,9 @@ TEST_F(PersistentCacheTierTest, DISABLED_TieredCacheInsert) {
for (auto nthreads : {1, 5}) {
for (auto max_keys :
{10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) {
cache_ = NewTieredCache(Env::Default(), path_,
/*memory_size=*/static_cast<size_t>(1 * 1024 * 1024 * kStressFactor));
cache_ = NewTieredCache(
Env::Default(), path_,
/*memory_size=*/static_cast<size_t>(1 * 1024 * 1024 * kStressFactor));
RunInsertTest(nthreads, static_cast<size_t>(max_keys));
}
}
@ -226,7 +231,8 @@ TEST_F(PersistentCacheTierTest, DISABLED_TieredCacheInsertWithEviction) {
cache_ = NewTieredCache(
Env::Default(), path_,
/*memory_size=*/static_cast<size_t>(1 * 1024 * 1024 * kStressFactor),
/*block_cache_size*/ static_cast<size_t>(200 * 1024 * 1024 * kStressFactor));
/*block_cache_size*/
static_cast<size_t>(200 * 1024 * 1024 * kStressFactor));
RunInsertTestWithEviction(nthreads, static_cast<size_t>(max_keys));
}
}
@ -291,14 +297,13 @@ PersistentCacheDBTest::PersistentCacheDBTest()
void PersistentCacheDBTest::RunTest(
const std::function<std::shared_ptr<PersistentCacheTier>(bool)>& new_pcache,
const size_t max_keys = 100 * 1024, const size_t max_usecase = 5) {
// number of insertion interations
int num_iter = static_cast<int>(max_keys * kStressFactor);
for (size_t iter = 0; iter < max_usecase; iter++) {
Options options;
options.write_buffer_size =
static_cast<size_t>(64 * 1024 * kStressFactor); // small write buffer
static_cast<size_t>(64 * 1024 * kStressFactor); // small write buffer
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options = CurrentOptions(options);

View File

@ -41,9 +41,7 @@ class CompactOnDeletionCollector : public TablePropertiesCollector {
}
// EXPERIMENTAL Return whether the output file should be further compacted
virtual bool NeedCompact() const override {
return need_compaction_;
}
virtual bool NeedCompact() const override { return need_compaction_; }
static const int kNumBuckets = 128;

View File

@ -80,10 +80,10 @@ TEST(CompactOnDeletionCollector, DeletionRatio) {
}
TEST(CompactOnDeletionCollector, SlidingWindow) {
const int kWindowSizes[] =
{1000, 10000, 10000, 127, 128, 129, 255, 256, 257, 2, 10000};
const int kDeletionTriggers[] =
{500, 9500, 4323, 47, 61, 128, 250, 250, 250, 2, 2};
const int kWindowSizes[] = {1000, 10000, 10000, 127, 128, 129,
255, 256, 257, 2, 10000};
const int kDeletionTriggers[] = {500, 9500, 4323, 47, 61, 128,
250, 250, 250, 2, 2};
TablePropertiesCollectorFactory::Context context;
context.column_family_id =
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
@ -134,13 +134,13 @@ TEST(CompactOnDeletionCollector, SlidingWindow) {
collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0));
}
}
if (collector->NeedCompact() !=
(deletions >= kNumDeletionTrigger) &&
if (collector->NeedCompact() != (deletions >= kNumDeletionTrigger) &&
std::abs(deletions - kNumDeletionTrigger) > kBias) {
fprintf(stderr, "[Error] collector->NeedCompact() != (%d >= %d)"
fprintf(stderr,
"[Error] collector->NeedCompact() != (%d >= %d)"
" with kWindowSize = %d and kNumDeletionTrigger = %d\n",
deletions, kNumDeletionTrigger,
kWindowSize, kNumDeletionTrigger);
deletions, kNumDeletionTrigger, kWindowSize,
kNumDeletionTrigger);
ASSERT_TRUE(false);
}
ASSERT_OK(collector->Finish(nullptr));
@ -182,11 +182,11 @@ TEST(CompactOnDeletionCollector, SlidingWindow) {
}
if (collector->NeedCompact() != (deletions >= kNumDeletionTrigger) &&
std::abs(deletions - kNumDeletionTrigger) > kBias) {
fprintf(stderr, "[Error] collector->NeedCompact() %d != (%d >= %d)"
fprintf(stderr,
"[Error] collector->NeedCompact() %d != (%d >= %d)"
" with kWindowSize = %d, kNumDeletionTrigger = %d\n",
collector->NeedCompact(),
deletions, kNumDeletionTrigger, kWindowSize,
kNumDeletionTrigger);
collector->NeedCompact(), deletions, kNumDeletionTrigger,
kWindowSize, kNumDeletionTrigger);
ASSERT_TRUE(false);
}
ASSERT_OK(collector->Finish(nullptr));
@ -218,7 +218,8 @@ TEST(CompactOnDeletionCollector, SlidingWindow) {
}
if (collector->NeedCompact() &&
std::abs(kDeletionsPerSection - kNumDeletionTrigger) > kBias) {
fprintf(stderr, "[Error] collector->NeedCompact() != false"
fprintf(stderr,
"[Error] collector->NeedCompact() != false"
" with kWindowSize = %d and kNumDeletionTrigger = %d\n",
kWindowSize, kNumDeletionTrigger);
ASSERT_TRUE(false);

View File

@ -595,14 +595,13 @@ Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
return new TtlIterator(db_->NewIterator(opts, column_family));
}
void DBWithTTLImpl::SetTtl(ColumnFamilyHandle *h, int32_t ttl) {
void DBWithTTLImpl::SetTtl(ColumnFamilyHandle* h, int32_t ttl) {
std::shared_ptr<TtlCompactionFilterFactory> filter;
Options opts;
opts = GetOptions(h);
filter = std::static_pointer_cast<TtlCompactionFilterFactory>(
opts.compaction_filter_factory);
if (!filter)
return;
opts.compaction_filter_factory);
if (!filter) return;
filter->SetTtl(ttl);
}

View File

@ -103,7 +103,7 @@ class DBWithTTLImpl : public DBWithTTL {
void SetTtl(int32_t ttl) override { SetTtl(DefaultColumnFamily(), ttl); }
void SetTtl(ColumnFamilyHandle *h, int32_t ttl) override;
void SetTtl(ColumnFamilyHandle* h, int32_t ttl) override;
private:
// remember whether the Close completes or not
@ -111,7 +111,6 @@ class DBWithTTLImpl : public DBWithTTL {
};
class TtlIterator : public Iterator {
public:
explicit TtlIterator(Iterator* iter) : iter_(iter) { assert(iter_); }
@ -189,9 +188,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override;
void SetTtl(int32_t ttl) {
ttl_ = ttl;
}
void SetTtl(int32_t ttl) { ttl_ = ttl; }
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "TtlCompactionFilterFactory"; }
@ -209,7 +206,6 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
};
class TtlMergeOperator : public MergeOperator {
public:
explicit TtlMergeOperator(const std::shared_ptr<MergeOperator>& merge_op,
SystemClock* clock);

View File

@ -28,7 +28,7 @@ namespace {
using KVMap = std::map<std::string, std::string>;
enum BatchOperation { OP_PUT = 0, OP_DELETE = 1 };
}
} // namespace
class SpecialTimeEnv : public EnvWrapper {
public:
@ -81,8 +81,8 @@ class TtlTest : public testing::Test {
// Open with TestFilter compaction filter
void OpenTtlWithTestCompaction(int32_t ttl) {
options_.compaction_filter_factory =
std::shared_ptr<CompactionFilterFactory>(
new TestFilterFactory(kSampleSize_, kNewValue_));
std::shared_ptr<CompactionFilterFactory>(
new TestFilterFactory(kSampleSize_, kNewValue_));
OpenTtl(ttl);
}
@ -121,7 +121,7 @@ class TtlTest : public testing::Test {
if (i % 10 == 0) {
digits_in_i++;
}
for(int j = digits_in_i; j < digits; j++) {
for (int j = digits_in_i; j < digits; j++) {
key.append("0");
value.append("0");
}
@ -210,16 +210,19 @@ class TtlTest : public testing::Test {
static ReadOptions ropts;
bool value_found;
std::string val;
for(auto &kv : kvmap_) {
for (auto& kv : kvmap_) {
bool ret = db_ttl_->KeyMayExist(ropts, kv.first, &val, &value_found);
if (ret == false || value_found == false) {
fprintf(stderr, "KeyMayExist could not find key=%s in the database but"
" should have\n", kv.first.c_str());
fprintf(stderr,
"KeyMayExist could not find key=%s in the database but"
" should have\n",
kv.first.c_str());
FAIL();
} else if (val.compare(kv.second) != 0) {
fprintf(stderr, " value for key=%s present in database is %s but"
" should be %s\n", kv.first.c_str(), val.c_str(),
kv.second.c_str());
fprintf(stderr,
" value for key=%s present in database is %s but"
" should be %s\n",
kv.first.c_str(), val.c_str(), kv.second.c_str());
FAIL();
}
}
@ -263,17 +266,19 @@ class TtlTest : public testing::Test {
}
FAIL();
} else if (s.ok()) {
if (test_compaction_change && v.compare(kNewValue_) != 0) {
fprintf(stderr, " value for key=%s present in database is %s but "
" should be %s\n", kv_it_->first.c_str(), v.c_str(),
kNewValue_.c_str());
FAIL();
} else if (!test_compaction_change && v.compare(kv_it_->second) !=0) {
fprintf(stderr, " value for key=%s present in database is %s but "
" should be %s\n", kv_it_->first.c_str(), v.c_str(),
kv_it_->second.c_str());
FAIL();
}
if (test_compaction_change && v.compare(kNewValue_) != 0) {
fprintf(stderr,
" value for key=%s present in database is %s but "
" should be %s\n",
kv_it_->first.c_str(), v.c_str(), kNewValue_.c_str());
FAIL();
} else if (!test_compaction_change && v.compare(kv_it_->second) != 0) {
fprintf(stderr,
" value for key=%s present in database is %s but "
" should be %s\n",
kv_it_->first.c_str(), v.c_str(), kv_it_->second.c_str());
FAIL();
}
}
}
}
@ -299,7 +304,7 @@ class TtlTest : public testing::Test {
env_->Sleep(slp);
ASSERT_OK(ManualCompact());
static ReadOptions ropts;
Iterator *dbiter = db_ttl_->NewIterator(ropts);
Iterator* dbiter = db_ttl_->NewIterator(ropts);
kv_it_ = kvmap_.begin();
advance(kv_it_, st_pos);
@ -329,9 +334,7 @@ class TtlTest : public testing::Test {
class TestFilter : public CompactionFilter {
public:
TestFilter(const int64_t kSampleSize, const std::string& kNewValue)
: kSampleSize_(kSampleSize),
kNewValue_(kNewValue) {
}
: kSampleSize_(kSampleSize), kNewValue_(kNewValue) {}
// Works on keys of the form "key<number>"
// Drops key if number at the end of key is in [0, kSampleSize_/3),
@ -355,7 +358,7 @@ class TtlTest : public testing::Test {
#endif
} else {
return false; // Keep keys not matching the format "key<NUMBER>"
return false; // Keep keys not matching the format "key<NUMBER>"
}
int64_t partition = kSampleSize_ / 3;
@ -378,26 +381,23 @@ class TtlTest : public testing::Test {
};
class TestFilterFactory : public CompactionFilterFactory {
public:
TestFilterFactory(const int64_t kSampleSize, const std::string& kNewValue)
: kSampleSize_(kSampleSize),
kNewValue_(kNewValue) {
}
public:
TestFilterFactory(const int64_t kSampleSize, const std::string& kNewValue)
: kSampleSize_(kSampleSize), kNewValue_(kNewValue) {}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override {
return std::unique_ptr<CompactionFilter>(
new TestFilter(kSampleSize_, kNewValue_));
}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override {
return std::unique_ptr<CompactionFilter>(
new TestFilter(kSampleSize_, kNewValue_));
}
const char* Name() const override { return "TestFilterFactory"; }
const char* Name() const override { return "TestFilterFactory"; }
private:
const int64_t kSampleSize_;
const std::string kNewValue_;
private:
const int64_t kSampleSize_;
const std::string kNewValue_;
};
// Choose carefully so that Put, Gets & Compaction complete in 1 second buffer
static const int64_t kSampleSize_ = 100;
std::string dbname_;
@ -410,7 +410,7 @@ class TtlTest : public testing::Test {
KVMap::iterator kv_it_;
const std::string kNewValue_ = "new_value";
std::unique_ptr<CompactionFilter> test_comp_filter_;
}; // class TtlTest
}; // class TtlTest
// If TTL is non positive or not provided, the behaviour is TTL = infinity
// This test opens the db 3 times with such default behavior and inserts a
@ -422,18 +422,18 @@ TEST_F(TtlTest, NoEffect) {
int64_t boundary2 = 2 * boundary1;
OpenTtl();
PutValues(0, boundary1); //T=0: Set1 never deleted
SleepCompactCheck(1, 0, boundary1); //T=1: Set1 still there
PutValues(0, boundary1); // T=0: Set1 never deleted
SleepCompactCheck(1, 0, boundary1); // T=1: Set1 still there
CloseTtl();
OpenTtl(0);
PutValues(boundary1, boundary2 - boundary1); //T=1: Set2 never deleted
SleepCompactCheck(1, 0, boundary2); //T=2: Sets1 & 2 still there
PutValues(boundary1, boundary2 - boundary1); // T=1: Set2 never deleted
SleepCompactCheck(1, 0, boundary2); // T=2: Sets1 & 2 still there
CloseTtl();
OpenTtl(-1);
PutValues(boundary2, kSampleSize_ - boundary2); //T=3: Set3 never deleted
SleepCompactCheck(1, 0, kSampleSize_, true); //T=4: Sets 1,2,3 still there
PutValues(boundary2, kSampleSize_ - boundary2); // T=3: Set3 never deleted
SleepCompactCheck(1, 0, kSampleSize_, true); // T=4: Sets 1,2,3 still there
CloseTtl();
}
@ -464,9 +464,10 @@ TEST_F(TtlTest, DestructWithoutClose) {
TEST_F(TtlTest, PresentDuringTTL) {
MakeKVMap(kSampleSize_);
OpenTtl(2); // T=0:Open the db with ttl = 2
PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=2
SleepCompactCheck(1, 0, kSampleSize_, true); // T=1:Set1 should still be there
OpenTtl(2); // T=0:Open the db with ttl = 2
PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=2
SleepCompactCheck(1, 0, kSampleSize_,
true); // T=1:Set1 should still be there
CloseTtl();
}
@ -474,9 +475,9 @@ TEST_F(TtlTest, PresentDuringTTL) {
TEST_F(TtlTest, AbsentAfterTTL) {
MakeKVMap(kSampleSize_);
OpenTtl(1); // T=0:Open the db with ttl = 2
PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=2
SleepCompactCheck(2, 0, kSampleSize_, false); // T=2:Set1 should not be there
OpenTtl(1); // T=0:Open the db with ttl = 2
PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=2
SleepCompactCheck(2, 0, kSampleSize_, false); // T=2:Set1 should not be there
CloseTtl();
}
@ -486,10 +487,10 @@ TEST_F(TtlTest, ResetTimestamp) {
MakeKVMap(kSampleSize_);
OpenTtl(3);
PutValues(0, kSampleSize_); // T=0: Insert Set1. Delete at t=3
env_->Sleep(2); // T=2
PutValues(0, kSampleSize_); // T=2: Insert Set1. Delete at t=5
SleepCompactCheck(2, 0, kSampleSize_); // T=4: Set1 should still be there
PutValues(0, kSampleSize_); // T=0: Insert Set1. Delete at t=3
env_->Sleep(2); // T=2
PutValues(0, kSampleSize_); // T=2: Insert Set1. Delete at t=5
SleepCompactCheck(2, 0, kSampleSize_); // T=4: Set1 should still be there
CloseTtl();
}
@ -508,8 +509,8 @@ TEST_F(TtlTest, IterAbsentAfterTTL) {
MakeKVMap(kSampleSize_);
OpenTtl(1);
PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1
SleepCompactCheckIter(2, 0, kSampleSize_, false); // T=2: Should not be there
PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1
SleepCompactCheckIter(2, 0, kSampleSize_, false); // T=2: Should not be there
CloseTtl();
}
@ -519,11 +520,11 @@ TEST_F(TtlTest, MultiOpenSamePresent) {
MakeKVMap(kSampleSize_);
OpenTtl(2);
PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=2
PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=2
CloseTtl();
OpenTtl(2); // T=0. Delete at t=2
SleepCompactCheck(1, 0, kSampleSize_); // T=1: Set should be there
OpenTtl(2); // T=0. Delete at t=2
SleepCompactCheck(1, 0, kSampleSize_); // T=1: Set should be there
CloseTtl();
}
@ -533,11 +534,11 @@ TEST_F(TtlTest, MultiOpenSameAbsent) {
MakeKVMap(kSampleSize_);
OpenTtl(1);
PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1
PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1
CloseTtl();
OpenTtl(1); // T=0.Delete at t=1
SleepCompactCheck(2, 0, kSampleSize_, false); // T=2: Set should not be there
OpenTtl(1); // T=0.Delete at t=1
SleepCompactCheck(2, 0, kSampleSize_, false); // T=2: Set should not be there
CloseTtl();
}
@ -546,11 +547,11 @@ TEST_F(TtlTest, MultiOpenDifferent) {
MakeKVMap(kSampleSize_);
OpenTtl(1);
PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1
PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1
CloseTtl();
OpenTtl(3); // T=0: Set deleted at t=3
SleepCompactCheck(2, 0, kSampleSize_); // T=2: Set should be there
OpenTtl(3); // T=0: Set deleted at t=3
SleepCompactCheck(2, 0, kSampleSize_); // T=2: Set should be there
CloseTtl();
}
@ -558,8 +559,8 @@ TEST_F(TtlTest, MultiOpenDifferent) {
TEST_F(TtlTest, ReadOnlyPresentForever) {
MakeKVMap(kSampleSize_);
OpenTtl(1); // T=0:Open the db normally
PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=1
OpenTtl(1); // T=0:Open the db normally
PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=1
CloseTtl();
OpenReadOnlyTtl(1);
@ -597,17 +598,17 @@ TEST_F(TtlTest, CompactionFilter) {
MakeKVMap(kSampleSize_);
OpenTtlWithTestCompaction(1);
PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=1
PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=1
// T=2: TTL logic takes precedence over TestFilter:-Set1 should not be there
SleepCompactCheck(2, 0, kSampleSize_, false);
CloseTtl();
OpenTtlWithTestCompaction(3);
PutValues(0, kSampleSize_); // T=0:Insert Set1.
PutValues(0, kSampleSize_); // T=0:Insert Set1.
int64_t partition = kSampleSize_ / 3;
SleepCompactCheck(1, 0, partition, false); // Part dropped
SleepCompactCheck(0, partition, partition); // Part kept
SleepCompactCheck(0, 2 * partition, partition, true, true); // Part changed
SleepCompactCheck(1, 0, partition, false); // Part dropped
SleepCompactCheck(0, partition, partition); // Part kept
SleepCompactCheck(0, 2 * partition, partition, true, true); // Part changed
CloseTtl();
}
@ -696,10 +697,10 @@ TEST_F(TtlTest, ColumnFamiliesTest) {
TEST_F(TtlTest, ChangeTtlOnOpenDb) {
MakeKVMap(kSampleSize_);
OpenTtl(1); // T=0:Open the db with ttl = 2
OpenTtl(1); // T=0:Open the db with ttl = 2
SetTtl(3);
PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=2
SleepCompactCheck(2, 0, kSampleSize_, true); // T=2:Set1 should be there
PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=2
SleepCompactCheck(2, 0, kSampleSize_, true); // T=2:Set1 should be there
CloseTtl();
}

View File

@ -163,7 +163,7 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
auto* index_entry =
new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
key.data() - wb_data.data(), key.size());
key.data() - wb_data.data(), key.size());
skip_list.Insert(index_entry);
}
@ -207,8 +207,8 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() {
// set offset of current entry for call to AddNewEntry()
last_entry_offset = input.data() - write_batch.Data().data();
s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
&value, &blob, &xid);
s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, &value,
&blob, &xid);
if (!s.ok()) {
break;
}

View File

@ -237,7 +237,7 @@ void AssertIterEqual(WBWIIteratorImpl* wbwii,
}
ASSERT_FALSE(wbwii->Valid());
}
} // namespace anonymous
} // namespace
class WBWIBaseTest : public testing::Test {
public:
@ -512,14 +512,10 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
TEST_F(WBWIKeepTest, TestValueAsSecondaryIndex) {
Entry entries[] = {
{"aaa", "0005", kPutRecord},
{"b", "0002", kPutRecord},
{"cdd", "0002", kMergeRecord},
{"aab", "00001", kPutRecord},
{"cc", "00005", kPutRecord},
{"cdd", "0002", kPutRecord},
{"aab", "0003", kPutRecord},
{"cc", "00005", kDeleteRecord},
{"aaa", "0005", kPutRecord}, {"b", "0002", kPutRecord},
{"cdd", "0002", kMergeRecord}, {"aab", "00001", kPutRecord},
{"cc", "00005", kPutRecord}, {"cdd", "0002", kPutRecord},
{"aab", "0003", kPutRecord}, {"cc", "00005", kDeleteRecord},
};
std::vector<Entry> entries_list(entries, entries + 8);
@ -531,14 +527,10 @@ TEST_F(WBWIKeepTest, TestValueAsSecondaryIndex) {
batch_->Clear();
Entry new_entries[] = {
{"aaa", "0005", kPutRecord},
{"e", "0002", kPutRecord},
{"add", "0002", kMergeRecord},
{"aab", "00001", kPutRecord},
{"zz", "00005", kPutRecord},
{"add", "0002", kPutRecord},
{"aab", "0003", kPutRecord},
{"zz", "00005", kDeleteRecord},
{"aaa", "0005", kPutRecord}, {"e", "0002", kPutRecord},
{"add", "0002", kMergeRecord}, {"aab", "00001", kPutRecord},
{"zz", "00005", kPutRecord}, {"add", "0002", kPutRecord},
{"aab", "0003", kPutRecord}, {"zz", "00005", kDeleteRecord},
};
entries_list = std::vector<Entry>(new_entries, new_entries + 8);