Merge remote-tracking branch 'origin' into ms_win_port

This commit is contained in:
Dmitri Smirnov 2015-07-02 11:34:22 -07:00
commit 9dbde7277c
27 changed files with 890 additions and 193 deletions

View File

@ -21,6 +21,7 @@
* CompactRange() will now skip bottommost level compaction for level based compaction if there is no compaction filter, bottommost_level_compaction is introduced in CompactRangeOptions to control when it's possbile to skip bottommost level compaction. This mean that if you want the compaction to produce a single file you need to set bottommost_level_compaction to BottommostLevelCompaction::kForce.
* Add Cache.GetPinnedUsage() to get the size of memory occupied by entries that are in use by the system.
* DB:Open() will fail if the compression specified in Options is not linked with the binary. If you see this failure, recompile RocksDB with compression libraries present on your system. Also, previously our default compression was snappy. This behavior is now changed. Now, the default compression is snappy only if it's available on the system. If it isn't we change the default to kNoCompression.
* We changed how we account for memory used in block cache. Previously, we only counted the sum of block sizes currently present in block cache. Now, we count the actual memory usage of the blocks. For example, a block of size 4.5KB will use 8KB memory with jemalloc. This might decrease your memory usage and possibly decrease performance. Increase block cache size if you see this happening after an upgrade.
## 3.11.0 (5/19/2015)
### New Features

View File

@ -294,6 +294,18 @@ EOF
JAVA_LDFLAGS="$JAVA_LDFLAGS -ltcmalloc"
fi
fi
# Test whether malloc_usable_size is available
$CXX $CFLAGS -x c++ - -o /dev/null 2>/dev/null <<EOF
#include <malloc.h>
int main() {
size_t res = malloc_usable_size(0);
return 0;
}
EOF
if [ "$?" = 0 ]; then
COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_MALLOC_USABLE_SIZE"
fi
fi
# TODO(tec): Fix -Wshorten-64-to-32 errors on FreeBSD and enable the warning.

55
build_tools/fb_compile_mongo.sh Executable file
View File

@ -0,0 +1,55 @@
#!/bin/sh
# fail early
set -e
if test -z $ROCKSDB_PATH; then
ROCKSDB_PATH=~/rocksdb
fi
source $ROCKSDB_PATH/build_tools/fbcode_config4.8.1.sh
EXTRA_LDFLAGS=""
if test -z $ALLOC; then
# default
ALLOC=tcmalloc
elif [[ $ALLOC == "jemalloc" ]]; then
ALLOC=system
EXTRA_LDFLAGS+=" -Wl,--whole-archive $JEMALLOC_LIB -Wl,--no-whole-archive"
fi
# we need to force mongo to use static library, not shared
STATIC_LIB_DEP_DIR='build/static_library_dependencies'
test -d $STATIC_LIB_DEP_DIR || mkdir $STATIC_LIB_DEP_DIR
test -h $STATIC_LIB_DEP_DIR/`basename $SNAPPY_LIBS` || ln -s $SNAPPY_LIBS $STATIC_LIB_DEP_DIR
test -h $STATIC_LIB_DEP_DIR/`basename $LZ4_LIBS` || ln -s $LZ4_LIBS $STATIC_LIB_DEP_DIR
EXTRA_LDFLAGS+=" -L $STATIC_LIB_DEP_DIR"
set -x
EXTRA_CMD=""
if ! test -e version.json; then
# this is Mongo 3.0
EXTRA_CMD="--rocksdb \
--variant-dir=linux2/norm
--cxx=${CXX} \
--cc=${CC} \
--use-system-zlib" # add this line back to normal code path
# when https://jira.mongodb.org/browse/SERVER-19123 is resolved
fi
scons \
LINKFLAGS="$EXTRA_LDFLAGS $EXEC_LDFLAGS $PLATFORM_LDFLAGS" \
CCFLAGS="$CXXFLAGS -L $STATIC_LIB_DEP_DIR" \
LIBS="lz4 gcc stdc++" \
LIBPATH="$ROCKSDB_PATH" \
CPPPATH="$ROCKSDB_PATH/include" \
-j32 \
--allocator=$ALLOC \
--nostrip \
--opt=on \
--disable-minimum-compiler-version-enforcement \
--use-system-snappy \
--disable-warnings-as-errors \
$EXTRA_CMD $*

View File

@ -18,19 +18,6 @@ GLIBC_REV=7397bed99280af5d9543439cdb7d018af7542720
GLIBC_INCLUDE="/mnt/gvfs/third-party2/glibc/$GLIBC_REV/2.20/gcc-4.9-glibc-2.20/99df8fc/include"
GLIBC_LIBS=" -L /mnt/gvfs/third-party2/glibc/$GLIBC_REV/2.20/gcc-4.9-glibc-2.20/99df8fc/lib"
# snappy and zlib depend are bundled with MongoDB so we wan't to pick up the bundled headers when
# building for it and disable block compressors supported by RocksDB but not used by MongoDB.
if [[ -n $ROCKSDB_FOR_MONGO ]]; then
MONGO_SRC="$ROCKSDB_FOR_MONGO/src/third_party"
SNAPPY_INCLUDE=" -I $MONGO_SRC/snappy-1.1.2"
CFLAGS+=" -DSNAPPY"
ZLIB_INCLUDE=" -I $MONGO_SRC/zlib-1.2.8"
CFLAGS+=" -DZLIB"
else
SNAPPY_INCLUDE=" -I /mnt/gvfs/third-party2/snappy/b0f269b3ca47770121aa159b99e1d8d2ab260e1f/1.0.3/gcc-4.9-glibc-2.20/c32916f/include/"
if test -z $PIC_BUILD; then
@ -56,7 +43,6 @@ if test -z $PIC_BUILD; then
LZ4_LIBS=" /mnt/gvfs/third-party2/lz4/79d2943e2dd7208a3e0b06cf95e9f85f05fe9e1b/r124/gcc-4.9-glibc-2.20/4230243/lib/liblz4.a"
CFLAGS+=" -DLZ4"
fi
fi
# location of gflags headers and libraries
GFLAGS_INCLUDE=" -I /mnt/gvfs/third-party2/gflags/0fa60e2b88de3e469db6c482d6e6dac72f5d65f9/1.6/gcc-4.9-glibc-2.20/4230243/include/"
@ -125,7 +111,7 @@ else
fi
CFLAGS+=" $DEPS_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
CXXFLAGS+=" $CFLAGS"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $GFLAGS_LIBS $NUMA_LIB"

View File

@ -87,13 +87,14 @@ else
fi
CFLAGS+=" $DEPS_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
CFLAGS+=" -DSNAPPY -DGFLAGS=google -DZLIB -DBZIP2 -DLZ4 -DNUMA"
CXXFLAGS+=" $CFLAGS"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $GFLAGS_LIBS $NUMA_LIB"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.8.1-glibc-2.17/lib/ld.so"
EXEC_LDFLAGS+=" $LIBUNWIND"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.8.1-glibc-2.17/lib"
PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++"

View File

@ -98,10 +98,12 @@ class ColumnFamilyTest : public testing::Test {
&db_);
}
#ifndef ROCKSDB_LITE // ReadOnlyDB is not supported
void AssertOpenReadOnly(std::vector<std::string> cf,
std::vector<ColumnFamilyOptions> options = {}) {
ASSERT_OK(OpenReadOnly(cf, options));
}
#endif // !ROCKSDB_LITE
void Open(std::vector<std::string> cf,
@ -186,10 +188,28 @@ class ColumnFamilyTest : public testing::Test {
}
void WaitForFlush(int cf) {
#ifndef ROCKSDB_LITE // TEST functions are not supported in lite
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
#endif // !ROCKSDB_LITE
}
void WaitForCompaction() { ASSERT_OK(dbfull()->TEST_WaitForCompact()); }
void WaitForCompaction() {
#ifndef ROCKSDB_LITE // TEST functions are not supported in lite
ASSERT_OK(dbfull()->TEST_WaitForCompact());
#endif // !ROCKSDB_LITE
}
uint64_t MaxTotalInMemoryState() {
#ifndef ROCKSDB_LITE
return dbfull()->TEST_MaxTotalInMemoryState();
#else
return 0;
#endif // !ROCKSDB_LITE
}
void AssertMaxTotalInMemoryState(uint64_t value) {
ASSERT_EQ(value, MaxTotalInMemoryState());
}
Status Put(int cf, const std::string& key, const std::string& value) {
return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value));
@ -229,6 +249,7 @@ class ColumnFamilyTest : public testing::Test {
"rocksdb.num-files-at-level" + ToString(level));
}
#ifndef ROCKSDB_LITE
// Return spread of files per level
std::string FilesPerLevel(int cf) {
std::string result;
@ -245,12 +266,27 @@ class ColumnFamilyTest : public testing::Test {
result.resize(last_non_zero_offset);
return result;
}
#endif
void AssertFilesPerLevel(const std::string& value, int cf) {
#ifndef ROCKSDB_LITE
ASSERT_EQ(value, FilesPerLevel(cf));
#endif
}
#ifndef ROCKSDB_LITE // GetLiveFilesMetaData is not supported
int CountLiveFiles() {
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
return static_cast<int>(metadata.size());
}
#endif // !ROCKSDB_LITE
void AssertCountLiveFiles(int expected_value) {
#ifndef ROCKSDB_LITE
ASSERT_EQ(expected_value, CountLiveFiles());
#endif
}
// Do n memtable flushes, each of which produces an sstable
// covering the range [small,large].
@ -263,6 +299,7 @@ class ColumnFamilyTest : public testing::Test {
}
}
#ifndef ROCKSDB_LITE // GetSortedWalFiles is not supported
int CountLiveLogFiles() {
int micros_wait_for_log_deletion = 20000;
env_->SleepForMicroseconds(micros_wait_for_log_deletion);
@ -289,15 +326,25 @@ class ColumnFamilyTest : public testing::Test {
}
}
return ret;
return 0;
}
#endif // !ROCKSDB_LITE
void AssertCountLiveLogFiles(int value) {
#ifndef ROCKSDB_LITE // GetSortedWalFiles is not supported
ASSERT_EQ(value, CountLiveLogFiles());
#endif // !ROCKSDB_LITE
}
void AssertNumberOfImmutableMemtables(std::vector<int> num_per_cf) {
assert(num_per_cf.size() == handles_.size());
#ifndef ROCKSDB_LITE // GetProperty is not supported in lite
for (size_t i = 0; i < num_per_cf.size(); ++i) {
ASSERT_EQ(num_per_cf[i], GetProperty(static_cast<int>(i),
"rocksdb.num-immutable-mem-table"));
}
#endif // !ROCKSDB_LITE
}
void CopyFile(const std::string& source, const std::string& destination,
@ -410,10 +457,10 @@ TEST_F(ColumnFamilyTest, DropTest) {
}
ASSERT_EQ("bar1", Get(1, "1"));
ASSERT_EQ(CountLiveFiles(), 1);
AssertCountLiveFiles(1);
DropColumnFamilies({1});
// make sure that all files are deleted when we drop the column family
ASSERT_EQ(CountLiveFiles(), 0);
AssertCountLiveFiles(0);
Destroy();
}
}
@ -554,10 +601,9 @@ TEST_F(ColumnFamilyTest, FlushTest) {
for (int i = 0; i < 3; ++i) {
uint64_t max_total_in_memory_state =
dbfull()->TEST_MaxTotalInMemoryState();
MaxTotalInMemoryState();
Flush(i);
ASSERT_EQ(dbfull()->TEST_MaxTotalInMemoryState(),
max_total_in_memory_state);
AssertMaxTotalInMemoryState(max_total_in_memory_state);
}
ASSERT_OK(Put(1, "foofoo", "bar"));
ASSERT_OK(Put(0, "foofoo", "bar"));
@ -592,7 +638,7 @@ TEST_F(ColumnFamilyTest, LogDeletionTest) {
// Each bracket is one log file. if number is in (), it means
// we don't need it anymore (it's been flushed)
// []
ASSERT_EQ(CountLiveLogFiles(), 0);
AssertCountLiveLogFiles(0);
PutRandomData(0, 1, 100);
// [0]
PutRandomData(1, 1, 100);
@ -600,53 +646,53 @@ TEST_F(ColumnFamilyTest, LogDeletionTest) {
PutRandomData(1, 1000, 100);
WaitForFlush(1);
// [0, (1)] [1]
ASSERT_EQ(CountLiveLogFiles(), 2);
AssertCountLiveLogFiles(2);
PutRandomData(0, 1, 100);
// [0, (1)] [0, 1]
ASSERT_EQ(CountLiveLogFiles(), 2);
AssertCountLiveLogFiles(2);
PutRandomData(2, 1, 100);
// [0, (1)] [0, 1, 2]
PutRandomData(2, 1000, 100);
WaitForFlush(2);
// [0, (1)] [0, 1, (2)] [2]
ASSERT_EQ(CountLiveLogFiles(), 3);
AssertCountLiveLogFiles(3);
PutRandomData(2, 1000, 100);
WaitForFlush(2);
// [0, (1)] [0, 1, (2)] [(2)] [2]
ASSERT_EQ(CountLiveLogFiles(), 4);
AssertCountLiveLogFiles(4);
PutRandomData(3, 1, 100);
// [0, (1)] [0, 1, (2)] [(2)] [2, 3]
PutRandomData(1, 1, 100);
// [0, (1)] [0, 1, (2)] [(2)] [1, 2, 3]
ASSERT_EQ(CountLiveLogFiles(), 4);
AssertCountLiveLogFiles(4);
PutRandomData(1, 1000, 100);
WaitForFlush(1);
// [0, (1)] [0, (1), (2)] [(2)] [(1), 2, 3] [1]
ASSERT_EQ(CountLiveLogFiles(), 5);
AssertCountLiveLogFiles(5);
PutRandomData(0, 1000, 100);
WaitForFlush(0);
// [(0), (1)] [(0), (1), (2)] [(2)] [(1), 2, 3] [1, (0)] [0]
// delete obsolete logs -->
// [(1), 2, 3] [1, (0)] [0]
ASSERT_EQ(CountLiveLogFiles(), 3);
AssertCountLiveLogFiles(3);
PutRandomData(0, 1000, 100);
WaitForFlush(0);
// [(1), 2, 3] [1, (0)], [(0)] [0]
ASSERT_EQ(CountLiveLogFiles(), 4);
AssertCountLiveLogFiles(4);
PutRandomData(1, 1000, 100);
WaitForFlush(1);
// [(1), 2, 3] [(1), (0)] [(0)] [0, (1)] [1]
ASSERT_EQ(CountLiveLogFiles(), 5);
AssertCountLiveLogFiles(5);
PutRandomData(2, 1000, 100);
WaitForFlush(2);
// [(1), (2), 3] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2]
ASSERT_EQ(CountLiveLogFiles(), 6);
AssertCountLiveLogFiles(6);
PutRandomData(3, 1000, 100);
WaitForFlush(3);
// [(1), (2), (3)] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2, (3)] [3]
// delete obsolete logs -->
// [0, (1)] [1, (2)], [2, (3)] [3]
ASSERT_EQ(CountLiveLogFiles(), 4);
AssertCountLiveLogFiles(4);
Close();
}
@ -681,72 +727,73 @@ TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) {
PutRandomData(0, 100, 1000);
WaitForFlush(0);
AssertNumberOfImmutableMemtables({0, 0, 0, 0});
ASSERT_EQ(CountLiveLogFiles(), 1);
AssertCountLiveLogFiles(1);
PutRandomData(1, 200, 1000);
env_->SleepForMicroseconds(micros_wait_for_flush);
AssertNumberOfImmutableMemtables({0, 1, 0, 0});
ASSERT_EQ(CountLiveLogFiles(), 2);
AssertCountLiveLogFiles(2);
PutRandomData(2, 1000, 1000);
env_->SleepForMicroseconds(micros_wait_for_flush);
AssertNumberOfImmutableMemtables({0, 1, 1, 0});
ASSERT_EQ(CountLiveLogFiles(), 3);
AssertCountLiveLogFiles(3);
PutRandomData(2, 1000, 1000);
env_->SleepForMicroseconds(micros_wait_for_flush);
AssertNumberOfImmutableMemtables({0, 1, 2, 0});
ASSERT_EQ(CountLiveLogFiles(), 4);
AssertCountLiveLogFiles(4);
PutRandomData(3, 90, 1000);
env_->SleepForMicroseconds(micros_wait_for_flush);
AssertNumberOfImmutableMemtables({0, 1, 2, 1});
ASSERT_EQ(CountLiveLogFiles(), 5);
AssertCountLiveLogFiles(5);
PutRandomData(3, 90, 1000);
env_->SleepForMicroseconds(micros_wait_for_flush);
AssertNumberOfImmutableMemtables({0, 1, 2, 2});
ASSERT_EQ(CountLiveLogFiles(), 6);
AssertCountLiveLogFiles(6);
PutRandomData(3, 90, 1000);
env_->SleepForMicroseconds(micros_wait_for_flush);
AssertNumberOfImmutableMemtables({0, 1, 2, 3});
ASSERT_EQ(CountLiveLogFiles(), 7);
AssertCountLiveLogFiles(7);
PutRandomData(0, 100, 1000);
WaitForFlush(0);
AssertNumberOfImmutableMemtables({0, 1, 2, 3});
ASSERT_EQ(CountLiveLogFiles(), 8);
AssertCountLiveLogFiles(8);
PutRandomData(2, 100, 10000);
WaitForFlush(2);
AssertNumberOfImmutableMemtables({0, 1, 0, 3});
ASSERT_EQ(CountLiveLogFiles(), 9);
AssertCountLiveLogFiles(9);
PutRandomData(3, 90, 1000);
WaitForFlush(3);
AssertNumberOfImmutableMemtables({0, 1, 0, 0});
ASSERT_EQ(CountLiveLogFiles(), 10);
AssertCountLiveLogFiles(10);
PutRandomData(3, 90, 1000);
env_->SleepForMicroseconds(micros_wait_for_flush);
AssertNumberOfImmutableMemtables({0, 1, 0, 1});
ASSERT_EQ(CountLiveLogFiles(), 11);
AssertCountLiveLogFiles(11);
PutRandomData(1, 200, 1000);
WaitForFlush(1);
AssertNumberOfImmutableMemtables({0, 0, 0, 1});
ASSERT_EQ(CountLiveLogFiles(), 5);
AssertCountLiveLogFiles(5);
PutRandomData(3, 240, 1000);
WaitForFlush(3);
PutRandomData(3, 300, 1000);
WaitForFlush(3);
AssertNumberOfImmutableMemtables({0, 0, 0, 0});
ASSERT_EQ(CountLiveLogFiles(), 12);
AssertCountLiveLogFiles(12);
PutRandomData(0, 100, 1000);
WaitForFlush(0);
AssertNumberOfImmutableMemtables({0, 0, 0, 0});
ASSERT_EQ(CountLiveLogFiles(), 12);
AssertCountLiveLogFiles(12);
PutRandomData(2, 3*100, 10000);
WaitForFlush(2);
AssertNumberOfImmutableMemtables({0, 0, 0, 0});
ASSERT_EQ(CountLiveLogFiles(), 12);
AssertCountLiveLogFiles(12);
PutRandomData(1, 2*200, 1000);
WaitForFlush(1);
AssertNumberOfImmutableMemtables({0, 0, 0, 0});
ASSERT_EQ(CountLiveLogFiles(), 7);
AssertCountLiveLogFiles(7);
Close();
}
#ifndef ROCKSDB_LITE // Cuckoo is not supported in lite
TEST_F(ColumnFamilyTest, MemtableNotSupportSnapshot) {
Open();
auto* s1 = dbfull()->GetSnapshot();
@ -767,6 +814,7 @@ TEST_F(ColumnFamilyTest, MemtableNotSupportSnapshot) {
ASSERT_TRUE(s3 == nullptr);
Close();
}
#endif // !ROCKSDB_LITE
TEST_F(ColumnFamilyTest, DifferentMergeOperators) {
Open();
@ -815,6 +863,7 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) {
default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
one.compaction_style = kCompactionStyleUniversal;
one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
@ -832,14 +881,14 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) {
for (int i = 0; i < one.level0_file_num_compaction_trigger - 1; ++i) {
PutRandomData(1, 11, 10000);
WaitForFlush(1);
ASSERT_EQ(ToString(i + 1), FilesPerLevel(1));
AssertFilesPerLevel(ToString(i + 1), 1);
}
// SETUP column family "two" -- level style with 4 levels
for (int i = 0; i < two.level0_file_num_compaction_trigger - 1; ++i) {
PutRandomData(2, 15, 10000);
WaitForFlush(2);
ASSERT_EQ(ToString(i + 1), FilesPerLevel(2));
AssertFilesPerLevel(ToString(i + 1), 2);
}
// TRIGGER compaction "one"
@ -852,16 +901,17 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) {
WaitForCompaction();
// VERIFY compaction "one"
ASSERT_EQ("1", FilesPerLevel(1));
AssertFilesPerLevel("1", 1);
// VERIFY compaction "two"
ASSERT_EQ("0,1", FilesPerLevel(2));
AssertFilesPerLevel("0,1", 2);
CompactAll(2);
ASSERT_EQ("0,1", FilesPerLevel(2));
AssertFilesPerLevel("0,1", 2);
Close();
}
#ifndef ROCKSDB_LITE // Tailing interator not supported
namespace {
std::string IterStatus(Iterator* iter) {
std::string result;
@ -918,7 +968,9 @@ TEST_F(ColumnFamilyTest, NewIteratorsTest) {
Destroy();
}
}
#endif // !ROCKSDB_LITE
#ifndef ROCKSDB_LITE // ReadOnlyDB is not supported
TEST_F(ColumnFamilyTest, ReadOnlyDBTest) {
Open();
CreateColumnFamiliesAndReopen({"one", "two", "three", "four"});
@ -968,6 +1020,7 @@ TEST_F(ColumnFamilyTest, ReadOnlyDBTest) {
s = OpenReadOnly({"one", "four"});
ASSERT_TRUE(!s.ok());
}
#endif // !ROCKSDB_LITE
TEST_F(ColumnFamilyTest, DontRollEmptyLogs) {
Open();
@ -983,7 +1036,7 @@ TEST_F(ColumnFamilyTest, DontRollEmptyLogs) {
}
for (int i = 0; i < 4; ++i) {
dbfull()->TEST_WaitForFlushMemTable(handles_[i]);
WaitForFlush(i);
}
int total_new_writable_files =
env_->GetNumberOfNewWritableFileCalls() - num_writable_file_start;
@ -1007,7 +1060,8 @@ TEST_F(ColumnFamilyTest, FlushStaleColumnFamilies) {
for (int i = 0; i < 2; ++i) {
PutRandomData(0, 100, 1000); // flush
WaitForFlush(0);
ASSERT_EQ(i + 1, CountLiveFiles());
AssertCountLiveFiles(i + 1);
}
// third flush. now, CF [two] should be detected as stale and flushed
// column family 1 should not be flushed since it's empty
@ -1016,7 +1070,7 @@ TEST_F(ColumnFamilyTest, FlushStaleColumnFamilies) {
WaitForFlush(2);
// 3 files for default column families, 1 file for column family [two], zero
// files for column family [one], because it's empty
ASSERT_EQ(4, CountLiveFiles());
AssertCountLiveFiles(4);
Close();
}

View File

@ -270,18 +270,18 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
LogFlush(db_options_.info_log);
}
// Will only lock the mutex_ and wait for completion if wait is true
// Will lock the mutex_, will wait for completion if wait is true
void DBImpl::CancelAllBackgroundWork(bool wait) {
InstrumentedMutexLock l(&mutex_);
shutting_down_.store(true, std::memory_order_release);
bg_cv_.SignalAll();
if (!wait) {
return;
}
// Wait for background work to finish
mutex_.Lock();
while (bg_compaction_scheduled_ || bg_flush_scheduled_) {
bg_cv_.Wait();
}
mutex_.Unlock();
}
DBImpl::~DBImpl() {
@ -299,12 +299,11 @@ DBImpl::~DBImpl() {
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
}
// CancelAllBackgroundWork called with false means we just set the
// shutdown marker, while holding the mutex_ here. After which we
// do a variant of the waiting after we release the lock and unschedule work
mutex_.Unlock();
// CancelAllBackgroundWork called with false means we just set the shutdown
// marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
CancelAllBackgroundWork(false);
mutex_.Unlock();
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
mutex_.Lock();
@ -2036,6 +2035,9 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
// Wait until the compaction completes
InstrumentedMutexLock l(&mutex_);
while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
bg_cv_.Wait();
}
if (!bg_error_.ok()) {

View File

@ -350,6 +350,9 @@ void DBIter::MergeValuesNewToOld() {
void DBIter::Prev() {
assert(valid_);
if (direction_ == kForward) {
if (!iter_->Valid()) {
iter_->SeekToLast();
}
FindPrevUserKey();
direction_ = kReverse;
}
@ -553,7 +556,7 @@ void DBIter::FindNextUserKey() {
ParsedInternalKey ikey;
FindParseableKey(&ikey, kForward);
while (iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) {
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
iter_->Next();
FindParseableKey(&ikey, kForward);
}
@ -568,7 +571,7 @@ void DBIter::FindPrevUserKey() {
ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse);
while (iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) {
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) >= 0) {
if (num_skipped >= max_skip_) {
num_skipped = 0;
IterKey last_key;
@ -664,7 +667,28 @@ void DBIter::SeekToLast() {
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->SeekToLast();
}
// When the iterate_upper_bound is set to a value,
// it will seek to the last key before the
// ReadOptions.iterate_upper_bound
if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
saved_key_.SetKey(*iterate_upper_bound_);
std::string last_key;
AppendInternalKey(&last_key,
ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber,
kValueTypeForSeek));
iter_->Seek(last_key);
if (!iter_->Valid()) {
iter_->SeekToLast();
} else {
iter_->Prev();
if (!iter_->Valid()) {
valid_ = false;
return;
}
}
}
PrevInternal();
}

View File

@ -11,6 +11,7 @@
#include "db/dbformat.h"
#include "rocksdb/comparator.h"
#include "rocksdb/options.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include "db/db_iter.h"
@ -184,6 +185,272 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
db_iter->Next();
ASSERT_TRUE(!db_iter->Valid());
}
// Test to check the SeekToLast() with iterate_upper_bound not set
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "val_a");
internal_iter->AddPut("b", "val_b");
internal_iter->AddPut("b", "val_b");
internal_iter->AddPut("c", "val_c");
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
10, options.max_sequential_skip_in_iterations));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "c");
}
// Test to check the SeekToLast() with iterate_upper_bound set
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "val_a");
internal_iter->AddPut("b", "val_b");
internal_iter->AddPut("c", "val_c");
internal_iter->AddPut("d", "val_d");
internal_iter->AddPut("e", "val_e");
internal_iter->AddPut("f", "val_f");
internal_iter->Finish();
Slice prefix("d");
ReadOptions ro;
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
10, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "c");
db_iter->Next();
ASSERT_TRUE(!db_iter->Valid());
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "c");
}
// Test to check the SeekToLast() iterate_upper_bound set to a key that
// is not Put yet
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "val_a");
internal_iter->AddPut("a", "val_a");
internal_iter->AddPut("b", "val_b");
internal_iter->AddPut("c", "val_c");
internal_iter->AddPut("d", "val_d");
internal_iter->Finish();
Slice prefix("z");
ReadOptions ro;
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
10, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "d");
db_iter->Next();
ASSERT_TRUE(!db_iter->Valid());
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "d");
db_iter->Prev();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "c");
}
// Test to check the SeekToLast() with iterate_upper_bound set to the
// first key
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "val_a");
internal_iter->AddPut("a", "val_a");
internal_iter->AddPut("a", "val_a");
internal_iter->AddPut("b", "val_b");
internal_iter->AddPut("b", "val_b");
internal_iter->Finish();
Slice prefix("a");
ReadOptions ro;
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
10, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound));
db_iter->SeekToLast();
ASSERT_TRUE(!db_iter->Valid());
}
// Test case to check SeekToLast with iterate_upper_bound set
// (same key put may times - SeekToLast should start with the
// maximum sequence id of the upper bound)
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "val_a");
internal_iter->AddPut("b", "val_b");
internal_iter->AddPut("c", "val_c");
internal_iter->AddPut("c", "val_c");
internal_iter->AddPut("c", "val_c");
internal_iter->AddPut("c", "val_c");
internal_iter->AddPut("c", "val_c");
internal_iter->AddPut("c", "val_c");
internal_iter->AddPut("c", "val_c");
internal_iter->Finish();
Slice prefix("c");
ReadOptions ro;
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
7, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound));
SetPerfLevel(kEnableCount);
ASSERT_TRUE(GetPerfLevel() == kEnableCount);
perf_context.Reset();
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(static_cast<int>(perf_context.internal_key_skipped_count), 1);
ASSERT_EQ(db_iter->key().ToString(), "b");
SetPerfLevel(kDisable);
}
// Test to check the SeekToLast() with the iterate_upper_bound set
// (Checking the value of the key which has sequence ids greater than
// and less that the iterator's sequence id)
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "val_a1");
internal_iter->AddPut("a", "val_a2");
internal_iter->AddPut("b", "val_b1");
internal_iter->AddPut("c", "val_c1");
internal_iter->AddPut("c", "val_c2");
internal_iter->AddPut("c", "val_c3");
internal_iter->AddPut("b", "val_b2");
internal_iter->AddPut("d", "val_d1");
internal_iter->Finish();
Slice prefix("c");
ReadOptions ro;
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
4, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
ASSERT_EQ(db_iter->value().ToString(), "val_b1");
}
// Test to check the SeekToLast() with the iterate_upper_bound set to the
// key that is deleted
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "val_a");
internal_iter->AddDeletion("a");
internal_iter->AddPut("b", "val_b");
internal_iter->AddPut("c", "val_c");
internal_iter->Finish();
Slice prefix("a");
ReadOptions ro;
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
10, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound));
db_iter->SeekToLast();
ASSERT_TRUE(!db_iter->Valid());
}
// Test to check the SeekToLast() with the iterate_upper_bound set
// (Deletion cases)
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "val_a");
internal_iter->AddPut("b", "val_b");
internal_iter->AddDeletion("b");
internal_iter->AddPut("c", "val_c");
internal_iter->Finish();
Slice prefix("c");
ReadOptions ro;
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
10, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
db_iter->Next();
ASSERT_TRUE(!db_iter->Valid());
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
}
// Test to check the SeekToLast() with iterate_upper_bound set
// (Deletion cases - Lot of internal keys after the upper_bound
// is deleted)
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "val_a");
internal_iter->AddPut("b", "val_b");
internal_iter->AddDeletion("c");
internal_iter->AddDeletion("d");
internal_iter->AddDeletion("e");
internal_iter->AddDeletion("f");
internal_iter->AddDeletion("g");
internal_iter->AddDeletion("h");
internal_iter->Finish();
Slice prefix("c");
ReadOptions ro;
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
7, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound));
SetPerfLevel(kEnableCount);
ASSERT_TRUE(GetPerfLevel() == kEnableCount);
perf_context.Reset();
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(static_cast<int>(perf_context.internal_delete_skipped_count), 0);
ASSERT_EQ(db_iter->key().ToString(), "b");
SetPerfLevel(kDisable);
}
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
@ -1401,6 +1668,81 @@ TEST_F(DBIteratorTest, DBIterator8) {
ASSERT_EQ(db_iter->value().ToString(), "0");
}
TEST_F(DBIteratorTest, DBIterator9) {
Options options;
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddMerge("a", "merge_1");
internal_iter->AddMerge("a", "merge_2");
internal_iter->AddMerge("b", "merge_3");
internal_iter->AddMerge("b", "merge_4");
internal_iter->AddMerge("d", "merge_5");
internal_iter->AddMerge("d", "merge_6");
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
10, options.max_sequential_skip_in_iterations));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
db_iter->Prev();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
ASSERT_EQ(db_iter->value().ToString(), "merge_3,merge_4");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "d");
ASSERT_EQ(db_iter->value().ToString(), "merge_5,merge_6");
db_iter->Seek("b");
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
ASSERT_EQ(db_iter->value().ToString(), "merge_3,merge_4");
db_iter->Prev();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
ASSERT_EQ(db_iter->value().ToString(), "merge_1,merge_2");
db_iter->Seek("c");
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "d");
ASSERT_EQ(db_iter->value().ToString(), "merge_5,merge_6");
db_iter->Prev();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
ASSERT_EQ(db_iter->value().ToString(), "merge_3,merge_4");
}
}
TEST_F(DBIteratorTest, DBIterator10) {
Options options;
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "1");
internal_iter->AddPut("b", "2");
internal_iter->AddPut("c", "3");
internal_iter->AddPut("d", "4");
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
10, options.max_sequential_skip_in_iterations));
db_iter->Seek("c");
ASSERT_TRUE(db_iter->Valid());
db_iter->Prev();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
ASSERT_EQ(db_iter->value().ToString(), "2");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "c");
ASSERT_EQ(db_iter->value().ToString(), "3");
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -78,21 +78,42 @@ static std::string RandomString(Random* rnd, int len) {
namespace anon {
class AtomicCounter {
private:
Env* env_;
port::Mutex mu_;
port::CondVar cond_count_;
int count_;
public:
AtomicCounter() : count_(0) { }
AtomicCounter(Env* env = NULL) : env_(env), cond_count_(&mu_), count_(0) {}
void Increment() {
MutexLock l(&mu_);
count_++;
cond_count_.SignalAll();
}
int Read() {
MutexLock l(&mu_);
return count_;
}
bool WaitFor(int count) {
MutexLock l(&mu_);
uint64_t start = env_->NowMicros();
while (count_ < count) {
uint64_t now = env_->NowMicros();
cond_count_.TimedWait(now + /*1s*/ 1 * 000 * 000);
if (env_->NowMicros() - start > /*10s*/ 10 * 000 * 000) {
return false;
}
if (count_ < count) {
GTEST_LOG_(WARNING) << "WaitFor is taking more time than usual";
}
}
return true;
}
void Reset() {
MutexLock l(&mu_);
count_ = 0;
cond_count_.SignalAll();
}
};
@ -165,7 +186,11 @@ class SpecialEnv : public EnvWrapper {
bool no_sleep_;
explicit SpecialEnv(Env* base)
: EnvWrapper(base), rnd_(301), addon_time_(0), no_sleep_(false) {
: EnvWrapper(base),
rnd_(301),
sleep_counter_(this),
addon_time_(0),
no_sleep_(false) {
delay_sstable_sync_.store(false, std::memory_order_release);
drop_writes_.store(false, std::memory_order_release);
no_space_.store(false, std::memory_order_release);
@ -7537,6 +7562,7 @@ TEST_F(DBTest, DropWrites) {
// Force out-of-space errors
env_->drop_writes_.store(true, std::memory_order_release);
env_->sleep_counter_.Reset();
env_->no_sleep_ = true;
for (int i = 0; i < 5; i++) {
if (option_config_ != kUniversalCompactionMultiLevel) {
for (int level = 0; level < dbfull()->NumberLevels(); level++) {
@ -7559,7 +7585,7 @@ TEST_F(DBTest, DropWrites) {
ASSERT_LT(CountFiles(), num_files + 3);
// Check that compaction attempts slept after errors
ASSERT_GE(env_->sleep_counter_.Read(), 5);
ASSERT_TRUE(env_->sleep_counter_.WaitFor(5));
} while (ChangeCompactOptions());
}
@ -10817,6 +10843,19 @@ TEST_F(DBTest, DBIteratorBoundTest) {
// should stop here...
ASSERT_TRUE(!iter->Valid());
}
// Testing SeekToLast with iterate_upper_bound set
{
ReadOptions ro;
Slice prefix("foo");
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("a")), 0);
}
// prefix is the first letter of the key
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
@ -11316,6 +11355,17 @@ TEST_F(DBTest, PreShutdownManualCompaction) {
}
}
TEST_F(DBTest, PreShutdownFlush) {
Options options = CurrentOptions();
options.max_background_flushes = 0;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "key", "value"));
CancelAllBackgroundWork(db_);
Status s =
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
ASSERT_TRUE(s.IsShutdownInProgress());
}
TEST_F(DBTest, PreShutdownMultipleCompaction) {
const int kTestKeySize = 16;
const int kTestValueSize = 984;
@ -14078,6 +14128,29 @@ TEST_F(DBTest, RowCache) {
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
}
TEST_F(DBTest, PrevAfterMerge) {
Options options;
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreatePutOperator();
DestroyAndReopen(options);
// write three entries with different keys using Merge()
WriteOptions wopts;
db_->Merge(wopts, "1", "data1");
db_->Merge(wopts, "2", "data2");
db_->Merge(wopts, "3", "data3");
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("2");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("2", it->key().ToString());
it->Prev();
ASSERT_TRUE(it->Valid());
ASSERT_EQ("1", it->key().ToString());
}
} // namespace rocksdb
#endif

View File

@ -81,7 +81,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
ParseInternalKey(keys_.back(), &orig_ikey);
bool hit_the_next_user_key = false;
std::string merge_result; // Temporary value for merge results
if (steps) {
++(*steps);
}
@ -118,6 +117,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// => change the entry type to kTypeValue for keys_.back()
// We are done! Return a success if the merge passes.
std::string merge_result;
Status s = TimedFullMerge(ikey.user_key, nullptr, operands_,
user_merge_operator_, stats, env_, logger_,
&merge_result);
@ -130,7 +130,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
orig_ikey.type = kTypeValue;
UpdateInternalKey(&original_key[0], original_key.size(),
orig_ikey.sequence, orig_ikey.type);
swap(operands_.back(), merge_result);
operands_.back() = std::move(merge_result);
}
// move iter to the next entry (before doing anything else)
@ -148,6 +148,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// => change the entry type to kTypeValue for keys_.back()
// We are done! Success!
const Slice val = iter->value();
std::string merge_result;
Status s =
TimedFullMerge(ikey.user_key, &val, operands_, user_merge_operator_,
stats, env_, logger_, &merge_result);
@ -160,7 +161,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
orig_ikey.type = kTypeValue;
UpdateInternalKey(&original_key[0], original_key.size(),
orig_ikey.sequence, orig_ikey.type);
swap(operands_.back(), merge_result);
operands_.back() = std::move(merge_result);
}
// move iter to the next entry
@ -210,6 +211,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
assert(kTypeMerge == orig_ikey.type);
assert(operands_.size() >= 1);
assert(operands_.size() == keys_.size());
std::string merge_result;
{
StopWatchNano timer(env_, stats != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
@ -224,7 +226,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
UpdateInternalKey(&original_key[0], original_key.size(),
orig_ikey.sequence, orig_ikey.type);
swap(operands_.back(),merge_result);
operands_.back() = std::move(merge_result);
} else {
RecordTick(stats, NUMBER_MERGE_FAILURES);
// Do nothing if not success_. Leave keys() and operands() as they are.
@ -237,6 +239,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
if (operands_.size() >= 2 &&
operands_.size() >= min_partial_merge_operands_) {
bool merge_success = false;
std::string merge_result;
{
StopWatchNano timer(env_, stats != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
@ -251,7 +254,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// Merging of operands (associative merge) was successful.
// Replace operands with the merge result
operands_.clear();
operands_.push_front(std::move(merge_result));
operands_.emplace_front(std::move(merge_result));
keys_.erase(keys_.begin(), keys_.end() - 1);
}
}

View File

@ -20,11 +20,11 @@ bool MergeOperator::PartialMergeMulti(const Slice& key,
Logger* logger) const {
assert(operand_list.size() >= 2);
// Simply loop through the operands
std::string temp_value;
Slice temp_slice(operand_list[0]);
for (size_t i = 1; i < operand_list.size(); ++i) {
auto& operand = operand_list[i];
std::string temp_value;
if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) {
return false;
}
@ -48,9 +48,9 @@ bool AssociativeMergeOperator::FullMerge(
// Simply loop through the operands
Slice temp_existing;
std::string temp_value;
for (const auto& operand : operand_list) {
Slice value(operand);
std::string temp_value;
if (!Merge(key, existing_value, value, &temp_value, logger)) {
return false;
}

View File

@ -7,6 +7,7 @@
#include <memory>
#include <iostream>
#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
@ -41,6 +42,7 @@ class CountMergeOperator : public AssociativeMergeOperator {
const Slice& value,
std::string* new_value,
Logger* logger) const override {
assert(new_value->empty());
++num_merge_operator_calls;
if (existing_value == nullptr) {
new_value->assign(value.data(), value.size());
@ -59,6 +61,7 @@ class CountMergeOperator : public AssociativeMergeOperator {
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const override {
assert(new_value->empty());
++num_partial_merge_calls;
return mergeOperator_->PartialMergeMulti(key, operand_list, new_value,
logger);
@ -498,6 +501,7 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) {
int main(int argc, char *argv[]) {
//TODO: Make this test like a general rocksdb unit-test
rocksdb::port::InstallStackTraceHandler();
runTest(argc, test::TmpDir() + "/merge_testdb");
runTest(argc, test::TmpDir() + "/merge_testdbttl", true); // Run test on TTL database
printf("Passed all tests!\n");

View File

@ -695,12 +695,14 @@ void Version::AddIterators(const ReadOptions& read_options,
return;
}
auto* arena = merge_iter_builder->GetArena();
// Merge all level zero files together since they may overlap
for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
const auto& file = storage_info_.LevelFilesBrief(0).files[i];
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr,
false, merge_iter_builder->GetArena()));
false, arena));
}
// For levels > 0, we can use a concatenating iterator that sequentially
@ -708,14 +710,16 @@ void Version::AddIterators(const ReadOptions& read_options,
// lazily.
for (int level = 1; level < storage_info_.num_non_empty_levels(); level++) {
if (storage_info_.LevelFilesBrief(level).num_files != 0) {
merge_iter_builder->AddIterator(NewTwoLevelIterator(
new LevelFileIteratorState(
auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState));
auto* state = new (mem) LevelFileIteratorState(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(), false /* for_compaction */,
cfd_->ioptions()->prefix_extractor != nullptr),
new LevelFileNumIterator(cfd_->internal_comparator(),
&storage_info_.LevelFilesBrief(level)),
merge_iter_builder->GetArena()));
cfd_->ioptions()->prefix_extractor != nullptr);
mem = arena->AllocateAligned(sizeof(LevelFileNumIterator));
auto* first_level_iter = new (mem) LevelFileNumIterator(
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level));
merge_iter_builder->AddIterator(
NewTwoLevelIterator(state, first_level_iter, arena, false));
}
}
}

View File

@ -54,7 +54,8 @@ class MergeOperator {
// merge operation semantics
// existing: (IN) null indicates that the key does not exist before this op
// operand_list:(IN) the sequence of merge operations to apply, front() first.
// new_value:(OUT) Client is responsible for filling the merge result here
// new_value:(OUT) Client is responsible for filling the merge result here.
// The string that new_value is pointing to will be empty.
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success.
@ -80,6 +81,8 @@ class MergeOperator {
// DB::Merge(key, *new_value) would yield the same result as a call
// to DB::Merge(key, left_op) followed by DB::Merge(key, right_op).
//
// The string that new_value is pointing to will be empty.
//
// The default implementation of PartialMergeMulti will use this function
// as a helper, for backward compatibility. Any successor class of
// MergeOperator should either implement PartialMerge or PartialMergeMulti,
@ -116,6 +119,8 @@ class MergeOperator {
// the same result as subquential individual calls to DB::Merge(key, operand)
// for each operand in operand_list from front() to back().
//
// The string that new_value is pointing to will be empty.
//
// The PartialMergeMulti function will be called only when the list of
// operands are long enough. The minimum amount of operands that will be
// passed to the function are specified by the "min_partial_merge_operands"
@ -147,7 +152,8 @@ class AssociativeMergeOperator : public MergeOperator {
// key: (IN) The key that's associated with this merge operation.
// existing_value:(IN) null indicates the key does not exist before this op
// value: (IN) the value to update/merge the existing_value with
// new_value: (OUT) Client is responsible for filling the merge result here
// new_value: (OUT) Client is responsible for filling the merge result
// here. The string that new_value is pointing to will be empty.
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success.

View File

@ -74,8 +74,13 @@ enum Tickers : uint32_t {
NUMBER_KEYS_READ,
// Number keys updated, if inplace update is enabled
NUMBER_KEYS_UPDATED,
// Bytes written / read
// The number of uncompressed bytes issued by DB::Put(), DB::Delete(),
// DB::Merge(), and DB::Write().
BYTES_WRITTEN,
// The number of uncompressed bytes read from DB::Get(). It could be
// either from memtables, cache, or table files.
// For the number of logical bytes read from DB::MultiGet(),
// please use NUMBER_MULTIGET_BYTES_READ.
BYTES_READ,
NO_FILE_CLOSES,
NO_FILE_OPENS,

View File

@ -368,11 +368,19 @@ void Java_org_rocksdb_WBWIRocksIterator_entry1(
const rocksdb::WriteEntry& we = it->Entry();
jobject jwe = rocksdb::WBWIRocksIteratorJni::getWriteEntry(env, jobj);
rocksdb::WriteEntryJni::setWriteType(env, jwe, we.type);
rocksdb::WriteEntryJni::setKey(env, jwe, &we.key);
char* buf = new char[we.key.size()];
memcpy(buf, we.key.data(), we.key.size());
auto* key_slice = new rocksdb::Slice(buf, we.key.size());
rocksdb::WriteEntryJni::setKey(env, jwe, key_slice);
if (we.type == rocksdb::kDeleteRecord || we.type == rocksdb::kLogDataRecord) {
// set native handle of value slice to null if no value available
rocksdb::WriteEntryJni::setValue(env, jwe, NULL);
rocksdb::WriteEntryJni::setValue(env, jwe, nullptr);
} else {
rocksdb::WriteEntryJni::setValue(env, jwe, &we.value);
char* value_buf = new char[we.value.size()];
memcpy(value_buf, we.value.data(), we.value.size());
auto* value_slice = new rocksdb::Slice(value_buf, we.value.size());
rocksdb::WriteEntryJni::setValue(env, jwe, value_slice);
}
}

View File

@ -601,48 +601,73 @@ public class RocksDBTest {
RocksDB db = null;
Options opt = null;
try {
final int NUM_KEYS_PER_L0_FILE = 100;
final int KEY_SIZE = 20;
final int VALUE_SIZE = 300;
final int L0_FILE_SIZE =
NUM_KEYS_PER_L0_FILE * (KEY_SIZE + VALUE_SIZE);
final int NUM_L0_FILES = 10;
final int TEST_SCALE = 5;
final int KEY_INTERVAL = 100;
opt = new Options().
setCreateIfMissing(true).
setCompactionStyle(CompactionStyle.LEVEL).
setNumLevels(4).
setWriteBufferSize(100<<10).
setLevelZeroFileNumCompactionTrigger(3).
setTargetFileSizeBase(200 << 10).
setNumLevels(5).
// a slightly bigger write buffer than L0 file
// so that we can ensure manual flush always
// go before background flush happens.
setWriteBufferSize(L0_FILE_SIZE * 2).
// Disable auto L0 -> L1 compaction
setLevelZeroFileNumCompactionTrigger(20).
setTargetFileSizeBase(L0_FILE_SIZE * 100).
setTargetFileSizeMultiplier(1).
setMaxBytesForLevelBase(500 << 10).
setMaxBytesForLevelMultiplier(1).
setDisableAutoCompactions(false);
// open database
// To disable auto compaction
setMaxBytesForLevelBase(NUM_L0_FILES * L0_FILE_SIZE * 100).
setMaxBytesForLevelMultiplier(2).
setDisableAutoCompactions(true);
db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath());
// fill database with key/value pairs
byte[] b = new byte[10000];
for (int i = 0; i < 200; i++) {
rand.nextBytes(b);
db.put((String.valueOf(i)).getBytes(), b);
byte[] value = new byte[VALUE_SIZE];
int int_key = 0;
for (int round = 0; round < 5; ++round) {
int initial_key = int_key;
for (int f = 1; f <= NUM_L0_FILES; ++f) {
for (int i = 0; i < NUM_KEYS_PER_L0_FILE; ++i) {
int_key += KEY_INTERVAL;
rand.nextBytes(value);
db.put(String.format("%020d", int_key).getBytes(),
value);
}
db.flush(new FlushOptions().setWaitForFlush(true));
db.close();
opt.setTargetFileSizeBase(Long.MAX_VALUE).
setTargetFileSizeMultiplier(1).
setMaxBytesForLevelBase(Long.MAX_VALUE).
setMaxBytesForLevelMultiplier(1).
setDisableAutoCompactions(true);
db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath());
db.compactRange(true, 0, 0);
for (int i = 0; i < 4; i++) {
if (i == 0) {
// Make sure we do create one more L0 files.
assertThat(
db.getProperty("rocksdb.num-files-at-level" + i)).
isEqualTo("1");
} else {
assertThat(
db.getProperty("rocksdb.num-files-at-level" + i)).
isEqualTo("0");
db.getProperty("rocksdb.num-files-at-level0")).
isEqualTo("" + f);
}
// Compact all L0 files we just created
db.compactRange(
String.format("%020d", initial_key).getBytes(),
String.format("%020d", int_key - 1).getBytes());
// Making sure there isn't any L0 files.
assertThat(
db.getProperty("rocksdb.num-files-at-level0")).
isEqualTo("0");
// Making sure there are some L1 files.
// Here we only use != 0 instead of a specific number
// as we don't want the test make any assumption on
// how compaction works.
assertThat(
db.getProperty("rocksdb.num-files-at-level1")).
isNotEqualTo("0");
// Because we only compacted those keys we issued
// in this round, there shouldn't be any L1 -> L2
// compaction. So we expect zero L2 files here.
assertThat(
db.getProperty("rocksdb.num-files-at-level2")).
isEqualTo("0");
}
} finally {
if (db != null) {
@ -662,6 +687,14 @@ public class RocksDBTest {
List<ColumnFamilyHandle> columnFamilyHandles =
new ArrayList<>();
try {
final int NUM_KEYS_PER_L0_FILE = 100;
final int KEY_SIZE = 20;
final int VALUE_SIZE = 300;
final int L0_FILE_SIZE =
NUM_KEYS_PER_L0_FILE * (KEY_SIZE + VALUE_SIZE);
final int NUM_L0_FILES = 10;
final int TEST_SCALE = 5;
final int KEY_INTERVAL = 100;
opt = new DBOptions().
setCreateIfMissing(true).
setCreateMissingColumnFamilies(true);
@ -672,62 +705,73 @@ public class RocksDBTest {
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(
"new_cf".getBytes(),
new ColumnFamilyOptions().
setDisableAutoCompactions(true).
setCompactionStyle(CompactionStyle.LEVEL).
setNumLevels(4).
setWriteBufferSize(100 << 10).
setLevelZeroFileNumCompactionTrigger(3).
setTargetFileSizeBase(200 << 10).
setNumLevels(5).
// a slightly bigger write buffer than L0 file
// so that we can ensure manual flush always
// go before background flush happens.
setWriteBufferSize(L0_FILE_SIZE * 2).
// Disable auto L0 -> L1 compaction
setLevelZeroFileNumCompactionTrigger(20).
setTargetFileSizeBase(L0_FILE_SIZE * 100).
setTargetFileSizeMultiplier(1).
setMaxBytesForLevelBase(500 << 10).
setMaxBytesForLevelMultiplier(1).
setDisableAutoCompactions(false)));
// To disable auto compaction
setMaxBytesForLevelBase(NUM_L0_FILES * L0_FILE_SIZE * 100).
setMaxBytesForLevelMultiplier(2).
setDisableAutoCompactions(true)));
// open database
db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath(),
columnFamilyDescriptors,
columnFamilyHandles);
// fill database with key/value pairs
byte[] b = new byte[10000];
for (int i = 0; i < 200; i++) {
rand.nextBytes(b);
byte[] value = new byte[VALUE_SIZE];
int int_key = 0;
for (int round = 0; round < 5; ++round) {
int initial_key = int_key;
for (int f = 1; f <= NUM_L0_FILES; ++f) {
for (int i = 0; i < NUM_KEYS_PER_L0_FILE; ++i) {
int_key += KEY_INTERVAL;
rand.nextBytes(value);
db.put(columnFamilyHandles.get(1),
String.valueOf(i).getBytes(), b);
String.format("%020d", int_key).getBytes(),
value);
}
db.flush(new FlushOptions().setWaitForFlush(true),
columnFamilyHandles.get(1));
// free column families
for (ColumnFamilyHandle handle : columnFamilyHandles) {
handle.dispose();
// Make sure we do create one more L0 files.
assertThat(
db.getProperty(columnFamilyHandles.get(1),
"rocksdb.num-files-at-level0")).
isEqualTo("" + f);
}
// clear column family handles for reopen
columnFamilyHandles.clear();
db.close();
columnFamilyDescriptors.get(1).
columnFamilyOptions().
setTargetFileSizeBase(Long.MAX_VALUE).
setTargetFileSizeMultiplier(1).
setMaxBytesForLevelBase(Long.MAX_VALUE).
setMaxBytesForLevelMultiplier(1).
setDisableAutoCompactions(true);
// reopen database
db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath(),
columnFamilyDescriptors,
columnFamilyHandles);
// compact new column family
db.compactRange(columnFamilyHandles.get(1), true, 0, 0);
// check if new column family is compacted to level zero
for (int i = 0; i < 4; i++) {
if (i == 0) {
assertThat(db.getProperty(columnFamilyHandles.get(1),
"rocksdb.num-files-at-level" + i)).
isEqualTo("1");
} else {
assertThat(db.getProperty(columnFamilyHandles.get(1),
"rocksdb.num-files-at-level" + i)).
// Compact all L0 files we just created
db.compactRange(
columnFamilyHandles.get(1),
String.format("%020d", initial_key).getBytes(),
String.format("%020d", int_key - 1).getBytes());
// Making sure there isn't any L0 files.
assertThat(
db.getProperty(columnFamilyHandles.get(1),
"rocksdb.num-files-at-level0")).
isEqualTo("0");
// Making sure there are some L1 files.
// Here we only use != 0 instead of a specific number
// as we don't want the test make any assumption on
// how compaction works.
assertThat(
db.getProperty(columnFamilyHandles.get(1),
"rocksdb.num-files-at-level1")).
isNotEqualTo("0");
// Because we only compacted those keys we issued
// in this round, there shouldn't be any L1 -> L2
// compaction. So we expect zero L2 files here.
assertThat(
db.getProperty(columnFamilyHandles.get(1),
"rocksdb.num-files-at-level2")).
isEqualTo("0");
}
}
} finally {
for (ColumnFamilyHandle handle : columnFamilyHandles) {

View File

@ -231,6 +231,34 @@ public class WriteBatchWithIndexTest {
}
}
@Test
public void zeroByteTests() {
final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
byte[] zeroByteValue = new byte[] { 0, 0 };
//add zero byte value
wbwi.put(zeroByteValue, zeroByteValue);
ByteBuffer buffer = ByteBuffer.allocateDirect(zeroByteValue.length);
buffer.put(zeroByteValue);
WBWIRocksIterator.WriteEntry[] expected = {
new WBWIRocksIterator.WriteEntry(WBWIRocksIterator.WriteType.PUT,
new DirectSlice(buffer, zeroByteValue.length),
new DirectSlice(buffer, zeroByteValue.length))
};
WBWIRocksIterator it = null;
try {
it = wbwi.newIterator();
it.seekToFirst();
assertThat(it.entry().equals(expected[0])).isTrue();
} finally {
if(it != null) {
it.dispose();
}
}
}
private byte[] toArray(final ByteBuffer buf) {
final byte[] ary = new byte[buf.remaining()];
buf.get(ary);

View File

@ -359,7 +359,7 @@ void Block::SetBlockPrefixIndex(BlockPrefixIndex* prefix_index) {
}
size_t Block::ApproximateMemoryUsage() const {
size_t usage = size();
size_t usage = usable_size();
if (hash_index_) {
usage += hash_index_->ApproximateMemoryUsage();
}

View File

@ -10,6 +10,9 @@
#pragma once
#include <stddef.h>
#include <stdint.h>
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
#include <malloc.h>
#endif
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
@ -37,6 +40,14 @@ class Block {
size_t size() const { return size_; }
const char* data() const { return data_; }
bool cachable() const { return contents_.cachable; }
size_t usable_size() const {
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
if (contents_.allocation.get() != nullptr) {
return malloc_usable_size(contents_.allocation.get());
}
#endif // ROCKSDB_MALLOC_USABLE_SIZE
return size_;
}
uint32_t NumRestarts() const;
CompressionType compression_type() const {
return contents_.compression_type;

View File

@ -704,8 +704,8 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
(end - r->compressed_cache_key_prefix));
// Insert into compressed block cache.
cache_handle = block_cache_compressed->Insert(key, block, block->size(),
&DeleteCachedBlock);
cache_handle = block_cache_compressed->Insert(
key, block, block->usable_size(), &DeleteCachedBlock);
block_cache_compressed->Release(cache_handle);
// Invalidate OS cache.

View File

@ -147,6 +147,8 @@ class BlockBasedTable::IndexReader {
// The size of the index.
virtual size_t size() const = 0;
// Memory usage of the index block
virtual size_t usable_size() const = 0;
// Report an approximation of how much memory has been used other than memory
// that was allocated in block cache.
@ -187,6 +189,9 @@ class BinarySearchIndexReader : public IndexReader {
}
virtual size_t size() const override { return index_block_->size(); }
virtual size_t usable_size() const override {
return index_block_->usable_size();
}
virtual size_t ApproximateMemoryUsage() const override {
assert(index_block_);
@ -295,6 +300,9 @@ class HashIndexReader : public IndexReader {
}
virtual size_t size() const override { return index_block_->size(); }
virtual size_t usable_size() const override {
return index_block_->usable_size();
}
virtual size_t ApproximateMemoryUsage() const override {
assert(index_block_);
@ -702,9 +710,9 @@ Status BlockBasedTable::GetDataBlockFromCache(
assert(block->value->compression_type() == kNoCompression);
if (block_cache != nullptr && block->value->cachable() &&
read_options.fill_cache) {
block->cache_handle =
block_cache->Insert(block_cache_key, block->value,
block->value->size(), &DeleteCachedEntry<Block>);
block->cache_handle = block_cache->Insert(block_cache_key, block->value,
block->value->usable_size(),
&DeleteCachedEntry<Block>);
assert(reinterpret_cast<Block*>(
block_cache->Value(block->cache_handle)) == block->value);
}
@ -747,7 +755,7 @@ Status BlockBasedTable::PutDataBlockToCache(
if (block_cache_compressed != nullptr && raw_block != nullptr &&
raw_block->cachable()) {
auto cache_handle = block_cache_compressed->Insert(
compressed_block_cache_key, raw_block, raw_block->size(),
compressed_block_cache_key, raw_block, raw_block->usable_size(),
&DeleteCachedEntry<Block>);
block_cache_compressed->Release(cache_handle);
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS);
@ -759,8 +767,8 @@ Status BlockBasedTable::PutDataBlockToCache(
// insert into uncompressed block cache
assert((block->value->compression_type() == kNoCompression));
if (block_cache != nullptr && block->value->cachable()) {
block->cache_handle =
block_cache->Insert(block_cache_key, block->value, block->value->size(),
block->cache_handle = block_cache->Insert(block_cache_key, block->value,
block->value->usable_size(),
&DeleteCachedEntry<Block>);
RecordTick(statistics, BLOCK_CACHE_ADD);
assert(reinterpret_cast<Block*>(block_cache->Value(block->cache_handle)) ==
@ -913,7 +921,8 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options,
}
}
cache_handle = block_cache->Insert(key, index_reader, index_reader->size(),
cache_handle =
block_cache->Insert(key, index_reader, index_reader->usable_size(),
&DeleteCachedEntry<IndexReader>);
RecordTick(statistics, BLOCK_CACHE_ADD);
}

View File

@ -22,11 +22,17 @@ namespace {
class TwoLevelIterator: public Iterator {
public:
explicit TwoLevelIterator(TwoLevelIteratorState* state,
Iterator* first_level_iter);
Iterator* first_level_iter,
bool need_free_iter_and_state);
virtual ~TwoLevelIterator() {
first_level_iter_.DeleteIter(false);
first_level_iter_.DeleteIter(!need_free_iter_and_state_);
second_level_iter_.DeleteIter(false);
if (need_free_iter_and_state_) {
delete state_;
} else {
state_->~TwoLevelIteratorState();
}
}
virtual void Seek(const Slice& target) override;
@ -65,9 +71,10 @@ class TwoLevelIterator: public Iterator {
void SetSecondLevelIterator(Iterator* iter);
void InitDataBlock();
std::unique_ptr<TwoLevelIteratorState> state_;
TwoLevelIteratorState* state_;
IteratorWrapper first_level_iter_;
IteratorWrapper second_level_iter_; // May be nullptr
bool need_free_iter_and_state_;
Status status_;
// If second_level_iter is non-nullptr, then "data_block_handle_" holds the
// "index_value" passed to block_function_ to create the second_level_iter.
@ -75,8 +82,11 @@ class TwoLevelIterator: public Iterator {
};
TwoLevelIterator::TwoLevelIterator(TwoLevelIteratorState* state,
Iterator* first_level_iter)
: state_(state), first_level_iter_(first_level_iter) {}
Iterator* first_level_iter,
bool need_free_iter_and_state)
: state_(state),
first_level_iter_(first_level_iter),
need_free_iter_and_state_(need_free_iter_and_state) {}
void TwoLevelIterator::Seek(const Slice& target) {
if (state_->check_prefix_may_match &&
@ -186,12 +196,15 @@ void TwoLevelIterator::InitDataBlock() {
} // namespace
Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state,
Iterator* first_level_iter, Arena* arena) {
Iterator* first_level_iter, Arena* arena,
bool need_free_iter_and_state) {
if (arena == nullptr) {
return new TwoLevelIterator(state, first_level_iter);
return new TwoLevelIterator(state, first_level_iter,
need_free_iter_and_state);
} else {
auto mem = arena->AllocateAligned(sizeof(TwoLevelIterator));
return new (mem) TwoLevelIterator(state, first_level_iter);
return new (mem)
TwoLevelIterator(state, first_level_iter, need_free_iter_and_state);
}
}

View File

@ -43,8 +43,11 @@ struct TwoLevelIteratorState {
// arena: If not null, the arena is used to allocate the Iterator.
// When destroying the iterator, the destructor will destroy
// all the states but those allocated in arena.
// need_free_iter_and_state: free `state` and `first_level_iter` if
// true. Otherwise, just call destructor.
extern Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state,
Iterator* first_level_iter,
Arena* arena = nullptr);
Arena* arena = nullptr,
bool need_free_iter_and_state = true);
} // namespace rocksdb

View File

@ -11,6 +11,11 @@ int main() {
}
#else
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <gflags/gflags.h>
#include <iostream>
@ -87,7 +92,7 @@ int main(int argc, char** argv) {
status = env->GetAbsolutePath(argv[1], &abspath);
snprintf(json, sizeof(json),
"{ \"database-path\": \"%s\", \"hostname\": \"%s\", "
"\"creation-time\": %ld }",
"\"creation-time\": %" PRIi64 " }",
abspath.c_str(), hostname, timesec);
}

View File

@ -36,6 +36,7 @@ std::shared_ptr<DB> OpenNormalDb(char delim_char) {
return std::shared_ptr<DB>(db);
}
#ifndef ROCKSDB_LITE // TtlDb is not supported in Lite
// Open a TtlDB with a non-associative StringAppendTESTOperator
std::shared_ptr<DB> OpenTtlDb(char delim_char) {
DBWithTTL* db;
@ -45,6 +46,7 @@ std::shared_ptr<DB> OpenTtlDb(char delim_char) {
EXPECT_OK(DBWithTTL::Open(options, kDbName, &db, 123456));
return std::shared_ptr<DB>(db);
}
#endif // !ROCKSDB_LITE
} // namespace
/// StringLists represents a set of string-lists, each with a key-index.
@ -585,12 +587,14 @@ int main(int argc, char** argv) {
result = RUN_ALL_TESTS();
}
#ifndef ROCKSDB_LITE // TtlDb is not supported in Lite
// Run with TTL
{
fprintf(stderr, "Running tests with ttl db and generic operator.\n");
StringAppendOperatorTest::SetOpenDbFunction(&OpenTtlDb);
result |= RUN_ALL_TESTS();
}
#endif // !ROCKSDB_LITE
return result;
}