New stable, fixed-length cache keys (#9126)

Summary:
This change standardizes on a new 16-byte cache key format for
block cache (incl compressed and secondary) and persistent cache (but
not table cache and row cache).

The goal is a really fast cache key with practically ideal stability and
uniqueness properties without external dependencies (e.g. from FileSystem).
A fixed key size of 16 bytes should enable future optimizations to the
concurrent hash table for block cache, which is a heavy CPU user /
bottleneck, but there appears to be measurable performance improvement
even with no changes to LRUCache.

This change replaces a lot of disjointed and ugly code handling cache
keys with calls to a simple, clean new internal API (cache_key.h).
(Preserving the old cache key logic under an option would be very ugly
and likely negate the performance gain of the new approach. Complete
replacement carries some inherent risk, but I think that's acceptable
with sufficient analysis and testing.)

The scheme for encoding new cache keys is complicated but explained
in cache_key.cc.

Also: EndianSwapValue is moved to math.h to be next to other bit
operations. (Explains some new include "math.h".) ReverseBits operation
added and unit tests added to hash_test for both.

Fixes https://github.com/facebook/rocksdb/issues/7405 (presuming a root cause)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9126

Test Plan:
### Basic correctness
Several tests needed updates to work with the new functionality, mostly
because we are no longer relying on filesystem for stable cache keys
so table builders & readers need more context info to agree on cache
keys. This functionality is so core, a huge number of existing tests
exercise the cache key functionality.

### Performance
Create db with
`TEST_TMPDIR=/dev/shm ./db_bench -bloom_bits=10 -benchmarks=fillrandom -num=3000000 -partition_index_and_filters`
And test performance with
`TEST_TMPDIR=/dev/shm ./db_bench -readonly -use_existing_db -bloom_bits=10 -benchmarks=readrandom -num=3000000 -duration=30 -cache_index_and_filter_blocks -cache_size=250000 -threads=4`
using DEBUG_LEVEL=0 and simultaneous before & after runs.
Before ops/sec, avg over 100 runs: 121924
After ops/sec, avg over 100 runs: 125385 (+2.8%)

### Collision probability
I have built a tool, ./cache_bench -stress_cache_key to broadly simulate host-wide cache activity
over many months, by making some pessimistic simplifying assumptions:
* Every generated file has a cache entry for every byte offset in the file (contiguous range of cache keys)
* All of every file is cached for its entire lifetime

We use a simple table with skewed address assignment and replacement on address collision
to simulate files coming & going, with quite a variance (super-Poisson) in ages. Some output
with `./cache_bench -stress_cache_key -sck_keep_bits=40`:

```
Total cache or DBs size: 32TiB  Writing 925.926 MiB/s or 76.2939TiB/day
Multiply by 9.22337e+18 to correct for simulation losses (but still assume whole file cached)
```

These come from default settings of 2.5M files per day of 32 MB each, and
`-sck_keep_bits=40` means that to represent a single file, we are only keeping 40 bits of
the 128-bit cache key.  With file size of 2\*\*25 contiguous keys (pessimistic), our simulation
is about 2\*\*(128-40-25) or about 9 billion billion times more prone to collision than reality.

More default assumptions, relatively pessimistic:
* 100 DBs in same process (doesn't matter much)
* Re-open DB in same process (new session ID related to old session ID) on average
every 100 files generated
* Restart process (all new session IDs unrelated to old) 24 times per day

After enough data, we get a result at the end:

```
(keep 40 bits)  17 collisions after 2 x 90 days, est 10.5882 days between (9.76592e+19 corrected)
```

If we believe the (pessimistic) simulation and the mathematical generalization, we would need to run a billion machines all for 97 billion days to expect a cache key collision. To help verify that our generalization ("corrected") is robust, we can make our simulation more precise with `-sck_keep_bits=41` and `42`, which takes more running time to get enough data:

```
(keep 41 bits)  16 collisions after 4 x 90 days, est 22.5 days between (1.03763e+20 corrected)
(keep 42 bits)  19 collisions after 10 x 90 days, est 47.3684 days between (1.09224e+20 corrected)
```

The generalized prediction still holds. With the `-sck_randomize` option, we can see that we are beating "random" cache keys (except offsets still non-randomized) by a modest amount (roughly 20x less collision prone than random), which should make us reasonably comfortable even in "degenerate" cases:

```
197 collisions after 1 x 90 days, est 0.456853 days between (4.21372e+18 corrected)
```

I've run other tests to validate other conditions behave as expected, never behaving "worse than random" unless we start chopping off structured data.

Reviewed By: zhichao-cao

Differential Revision: D33171746

Pulled By: pdillinger

fbshipit-source-id: f16a57e369ed37be5e7e33525ace848d0537c88f
This commit is contained in:
Peter Dillinger 2021-12-16 17:13:55 -08:00 committed by Facebook GitHub Bot
parent 9918e1ee5a
commit 0050a73a4f
36 changed files with 1008 additions and 432 deletions

View File

@ -636,6 +636,7 @@ find_package(Threads REQUIRED)
set(SOURCES
cache/cache.cc
cache/cache_entry_roles.cc
cache/cache_key.cc
cache/cache_reservation_manager.cc
cache/clock_cache.cc
cache/lru_cache.cc

View File

@ -9,15 +9,18 @@
* Fixed a bug affecting custom memtable factories which are not registered with the `ObjectRegistry`. The bug could result in failure to save the OPTIONS file.
* Fixed a bug causing two duplicate entries to be appended to a file opened in non-direct mode and tracked by `FaultInjectionTestFS`.
* Fixed a bug in TableOptions.prepopulate_block_cache to support block-based filters also.
* Block cache keys no longer use `FSRandomAccessFile::GetUniqueId()` (previously used when available), so a filesystem recycling unique ids can no longer lead to incorrect result or crash (#7405). For files generated by RocksDB >= 6.24, the cache keys are stable across DB::Open and DB directory move / copy / import / export / migration, etc. Although collisions are still theoretically possible, they are (a) impossible in many common cases, (b) not dependent on environmental factors, and (c) much less likely than a CPU miscalculation while executing RocksDB.
### Behavior Changes
* MemTableList::TrimHistory now use allocated bytes when max_write_buffer_size_to_maintain > 0(default in TrasactionDB, introduced in PR#5022) Fix #8371.
### Public API change
* Extend WriteBatch::AssignTimestamp and AssignTimestamps API so that both functions can accept an optional `checker` argument that performs additional checking on timestamp sizes.
* Introduce a new EventListener callback that will be called upon the end of automatic error recovery.
### Performance Improvements
* Replaced map property `TableProperties::properties_offsets` with uint64_t property `external_sst_file_global_seqno_offset` to save table properties's memory.
* Block cache accesses are faster by RocksDB using cache keys of fixed size (16 bytes).
### Java API Changes
* Removed Java API `TableProperties.getPropertiesOffsets()` as it exposed internal details to external users.

View File

@ -143,6 +143,7 @@ cpp_library(
srcs = [
"cache/cache.cc",
"cache/cache_entry_roles.cc",
"cache/cache_key.cc",
"cache/cache_reservation_manager.cc",
"cache/clock_cache.cc",
"cache/lru_cache.cc",
@ -472,6 +473,7 @@ cpp_library(
srcs = [
"cache/cache.cc",
"cache/cache_entry_roles.cc",
"cache/cache_key.cc",
"cache/cache_reservation_manager.cc",
"cache/clock_cache.cc",
"cache/lru_cache.cc",

View File

@ -5,11 +5,14 @@
#ifdef GFLAGS
#include <cinttypes>
#include <cstddef>
#include <cstdio>
#include <limits>
#include <memory>
#include <set>
#include <sstream>
#include "db/db_impl/db_impl.h"
#include "monitoring/histogram.h"
#include "port/port.h"
#include "rocksdb/cache.h"
@ -18,6 +21,8 @@
#include "rocksdb/env.h"
#include "rocksdb/secondary_cache.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/table_properties.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/cachable_entry.h"
#include "util/coding.h"
#include "util/gflags_compat.h"
@ -73,6 +78,36 @@ static class std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache> secondary_cache;
DEFINE_bool(use_clock_cache, false, "");
// ## BEGIN stress_cache_key sub-tool options ##
DEFINE_bool(stress_cache_key, false,
"If true, run cache key stress test instead");
DEFINE_uint32(sck_files_per_day, 2500000,
"(-stress_cache_key) Simulated files generated per day");
DEFINE_uint32(sck_duration, 90,
"(-stress_cache_key) Number of days to simulate in each run");
DEFINE_uint32(
sck_min_collision, 15,
"(-stress_cache_key) Keep running until this many collisions seen");
DEFINE_uint32(
sck_file_size_mb, 32,
"(-stress_cache_key) Simulated file size in MiB, for accounting purposes");
DEFINE_uint32(sck_reopen_nfiles, 100,
"(-stress_cache_key) Re-opens DB average every n files");
DEFINE_uint32(
sck_restarts_per_day, 24,
"(-stress_cache_key) Simulated process restarts per day (across DBs)");
DEFINE_uint32(sck_db_count, 100,
"(-stress_cache_key) Parallel DBs in operation");
DEFINE_uint32(sck_table_bits, 20,
"(-stress_cache_key) Log2 number of tracked files");
DEFINE_uint32(sck_keep_bits, 50,
"(-stress_cache_key) Number of cache key bits to keep");
DEFINE_bool(sck_randomize, false,
"(-stress_cache_key) Randomize (hash) cache key");
DEFINE_bool(sck_footer_unique_id, false,
"(-stress_cache_key) Simulate using proposed footer unique id");
// ## END stress_cache_key sub-tool options ##
namespace ROCKSDB_NAMESPACE {
class CacheBench;
@ -548,9 +583,195 @@ class CacheBench {
}
};
// TODO: better description (see PR #9126 for some info)
class StressCacheKey {
public:
void Run() {
if (FLAGS_sck_footer_unique_id) {
FLAGS_sck_db_count = 1;
}
uint64_t mb_per_day =
uint64_t{FLAGS_sck_files_per_day} * FLAGS_sck_file_size_mb;
printf("Total cache or DBs size: %gTiB Writing %g MiB/s or %gTiB/day\n",
FLAGS_sck_file_size_mb / 1024.0 / 1024.0 *
std::pow(2.0, FLAGS_sck_table_bits),
mb_per_day / 86400.0, mb_per_day / 1024.0 / 1024.0);
multiplier_ = std::pow(2.0, 128 - FLAGS_sck_keep_bits) /
(FLAGS_sck_file_size_mb * 1024.0 * 1024.0);
printf(
"Multiply by %g to correct for simulation losses (but still assume "
"whole file cached)\n",
multiplier_);
restart_nfiles_ = FLAGS_sck_files_per_day / FLAGS_sck_restarts_per_day;
double without_ejection =
std::pow(1.414214, FLAGS_sck_keep_bits) / FLAGS_sck_files_per_day;
printf(
"Without ejection, expect random collision after %g days (%g "
"corrected)\n",
without_ejection, without_ejection * multiplier_);
double with_full_table =
std::pow(2.0, FLAGS_sck_keep_bits - FLAGS_sck_table_bits) /
FLAGS_sck_files_per_day;
printf(
"With ejection and full table, expect random collision after %g "
"days (%g corrected)\n",
with_full_table, with_full_table * multiplier_);
collisions_ = 0;
for (int i = 1; collisions_ < FLAGS_sck_min_collision; i++) {
RunOnce();
if (collisions_ == 0) {
printf(
"No collisions after %d x %u days "
" \n",
i, FLAGS_sck_duration);
} else {
double est = 1.0 * i * FLAGS_sck_duration / collisions_;
printf("%" PRIu64
" collisions after %d x %u days, est %g days between (%g "
"corrected) \n",
collisions_, i, FLAGS_sck_duration, est, est * multiplier_);
}
}
}
void RunOnce() {
const size_t db_count = FLAGS_sck_db_count;
dbs_.reset(new TableProperties[db_count]{});
const size_t table_mask = (size_t{1} << FLAGS_sck_table_bits) - 1;
table_.reset(new uint64_t[table_mask + 1]{});
if (FLAGS_sck_keep_bits > 64) {
FLAGS_sck_keep_bits = 64;
}
uint32_t shift_away = 64 - FLAGS_sck_keep_bits;
uint32_t shift_away_b = shift_away / 3;
uint32_t shift_away_a = shift_away - shift_away_b;
process_count_ = 0;
session_count_ = 0;
ResetProcess();
Random64 r{std::random_device{}()};
uint64_t max_file_count =
uint64_t{FLAGS_sck_files_per_day} * FLAGS_sck_duration;
uint64_t file_count = 0;
uint32_t report_count = 0;
uint32_t collisions_this_run = 0;
// Round robin through DBs
for (size_t db_i = 0;; ++db_i) {
if (db_i >= db_count) {
db_i = 0;
}
if (file_count >= max_file_count) {
break;
}
if (!FLAGS_sck_footer_unique_id && r.OneIn(FLAGS_sck_reopen_nfiles)) {
ResetSession(db_i);
} else if (r.OneIn(restart_nfiles_)) {
ResetProcess();
}
OffsetableCacheKey ock;
dbs_[db_i].orig_file_number += 1;
// skip some file numbers, unless 1 DB so that that can simulate
// better (DB-independent) unique IDs
if (db_count > 1) {
dbs_[db_i].orig_file_number += (r.Next() & 3);
}
BlockBasedTable::SetupBaseCacheKey(&dbs_[db_i], "", 42, 42, &ock);
CacheKey ck = ock.WithOffset(0);
uint64_t stripped;
if (FLAGS_sck_randomize) {
stripped = GetSliceHash64(ck.AsSlice()) >> shift_away;
} else if (FLAGS_sck_footer_unique_id) {
uint32_t a = DecodeFixed32(ck.AsSlice().data() + 4) >> shift_away_a;
uint32_t b = DecodeFixed32(ck.AsSlice().data() + 12) >> shift_away_b;
stripped = (uint64_t{a} << 32) + b;
} else {
uint32_t a = DecodeFixed32(ck.AsSlice().data()) << shift_away_a;
uint32_t b = DecodeFixed32(ck.AsSlice().data() + 12) >> shift_away_b;
stripped = (uint64_t{a} << 32) + b;
}
if (stripped == 0) {
// Unlikely, but we need to exclude tracking this value
printf("Hit Zero! \n");
continue;
}
file_count++;
uint64_t h = NPHash64(reinterpret_cast<char*>(&stripped), 8);
// Skew lifetimes
size_t pos =
std::min(Lower32of64(h) & table_mask, Upper32of64(h) & table_mask);
if (table_[pos] == stripped) {
collisions_this_run++;
// To predict probability of no collisions, we have to get rid of
// correlated collisions, which this takes care of:
ResetProcess();
} else {
// Replace
table_[pos] = stripped;
}
if (++report_count == FLAGS_sck_files_per_day) {
report_count = 0;
// Estimate fill %
size_t incr = table_mask / 1000;
size_t sampled_count = 0;
for (size_t i = 0; i <= table_mask; i += incr) {
if (table_[i] != 0) {
sampled_count++;
}
}
// Report
printf(
"%" PRIu64 " days, %" PRIu64 " proc, %" PRIu64
" sess, %u coll, occ %g%%, ejected %g%% \r",
file_count / FLAGS_sck_files_per_day, process_count_,
session_count_, collisions_this_run, 100.0 * sampled_count / 1000.0,
100.0 * (1.0 - sampled_count / 1000.0 * table_mask / file_count));
fflush(stdout);
}
}
collisions_ += collisions_this_run;
}
void ResetSession(size_t i) {
dbs_[i].db_session_id = DBImpl::GenerateDbSessionId(nullptr);
session_count_++;
}
void ResetProcess() {
process_count_++;
DBImpl::TEST_ResetDbSessionIdGen();
for (size_t i = 0; i < FLAGS_sck_db_count; ++i) {
ResetSession(i);
}
if (FLAGS_sck_footer_unique_id) {
dbs_[0].orig_file_number = 0;
}
}
private:
// Use db_session_id and orig_file_number from TableProperties
std::unique_ptr<TableProperties[]> dbs_;
std::unique_ptr<uint64_t[]> table_;
uint64_t process_count_ = 0;
uint64_t session_count_ = 0;
uint64_t collisions_ = 0;
uint32_t restart_nfiles_ = 0;
double multiplier_ = 0.0;
};
int cache_bench_tool(int argc, char** argv) {
ParseCommandLineFlags(&argc, &argv, true);
if (FLAGS_stress_cache_key) {
// Alternate tool
StressCacheKey().Run();
return 0;
}
if (FLAGS_threads <= 0) {
fprintf(stderr, "threads number <= 0\n");
exit(1);

View File

@ -11,6 +11,7 @@
#include <mutex>
#include "cache/cache_helpers.h"
#include "cache/cache_key.h"
#include "port/lang.h"
#include "rocksdb/cache.h"
#include "rocksdb/status.h"
@ -112,13 +113,7 @@ class CacheEntryStatsCollector {
// entry in cache until all refs are destroyed.
static Status GetShared(Cache *cache, SystemClock *clock,
std::shared_ptr<CacheEntryStatsCollector> *ptr) {
std::array<uint64_t, 3> cache_key_data{
{// First 16 bytes == md5 of class name
0x7eba5a8fb5437c90U, 0x8ca68c9b11655855U,
// Last 8 bytes based on a function pointer to make unique for each
// template instantiation
reinterpret_cast<uint64_t>(&CacheEntryStatsCollector::GetShared)}};
Slice cache_key = GetSlice(&cache_key_data);
const Slice &cache_key = GetCacheKey();
Cache::Handle *h = cache->Lookup(cache_key);
if (h == nullptr) {
@ -166,6 +161,13 @@ class CacheEntryStatsCollector {
delete static_cast<CacheEntryStatsCollector *>(value);
}
static const Slice &GetCacheKey() {
// For each template instantiation
static CacheKey ckey = CacheKey::CreateUniqueForProcessLifetime();
static Slice ckey_slice = ckey.AsSlice();
return ckey_slice;
}
std::mutex saved_mutex_;
Stats saved_stats_;

271
cache/cache_key.cc vendored Normal file
View File

@ -0,0 +1,271 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "cache/cache_key.h"
#include <algorithm>
#include <atomic>
#include "rocksdb/cache.h"
#include "table/unique_id_impl.h"
#include "util/hash.h"
#include "util/math.h"
namespace ROCKSDB_NAMESPACE {
// Value space plan for CacheKey:
//
// session_etc64_ | offset_etc64_ | Only generated by
// ---------------+---------------+------------------------------------------
// 0 | 0 | Reserved for "empty" CacheKey()
// 0 | > 0, < 1<<63 | CreateUniqueForCacheLifetime
// 0 | >= 1<<63 | CreateUniqueForProcessLifetime
// > 0 | any | OffsetableCacheKey.WithOffset
CacheKey CacheKey::CreateUniqueForCacheLifetime(Cache *cache) {
// +1 so that we can reserve all zeros for "unset" cache key
uint64_t id = cache->NewId() + 1;
// Ensure we don't collide with CreateUniqueForProcessLifetime
assert((id >> 63) == 0U);
return CacheKey(0, id);
}
CacheKey CacheKey::CreateUniqueForProcessLifetime() {
// To avoid colliding with CreateUniqueForCacheLifetime, assuming
// Cache::NewId counts up from zero, here we count down from UINT64_MAX.
// If this ever becomes a point of contention, we could use CoreLocalArray.
static std::atomic<uint64_t> counter{UINT64_MAX};
uint64_t id = counter.fetch_sub(1, std::memory_order_relaxed);
// Ensure we don't collide with CreateUniqueForCacheLifetime
assert((id >> 63) == 1U);
return CacheKey(0, id);
}
// Value plan for CacheKeys from OffsetableCacheKey, assuming that
// db_session_ids are generated from a base_session_id and
// session_id_counter (by SemiStructuredUniqueIdGen+EncodeSessionId
// in DBImpl::GenerateDbSessionId):
//
// Conceptual inputs:
// db_id (unstructured, from GenerateRawUniqueId or equiv)
// * could be shared between cloned DBs but rare
// * could be constant, if session id suffices
// base_session_id (unstructured, from GenerateRawUniqueId)
// session_id_counter (structured)
// * usually much smaller than 2**24
// file_number (structured)
// * usually smaller than 2**24
// offset_in_file (structured, might skip lots of values)
// * usually smaller than 2**32
// max_offset determines placement of file_number to prevent
// overlapping with offset
//
// Outputs come from bitwise-xor of the constituent pieces, low bits on left:
//
// |------------------------- session_etc64 -------------------------|
// | +++++++++++++++ base_session_id (lower 64 bits) +++++++++++++++ |
// |-----------------------------------------------------------------|
// | session_id_counter ...| |
// |-----------------------------------------------------------------|
// | | ... file_number |
// | | overflow & meta |
// |-----------------------------------------------------------------|
//
//
// |------------------------- offset_etc64 --------------------------|
// | hash of: ++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
// | * base_session_id (upper ~39 bits) |
// | * db_id (~122 bits entropy) |
// |-----------------------------------------------------------------|
// | offset_in_file ............... | |
// |-----------------------------------------------------------------|
// | | file_number, 0-3 |
// | | lower bytes |
// |-----------------------------------------------------------------|
//
// Based on max_offset, a maximal number of bytes 0..3 is chosen for
// including from lower bits of file_number in offset_etc64. The choice
// is encoded in two bits of metadata going into session_etc64, though
// the common case of 3 bytes is encoded as 0 so that session_etc64
// is unmodified by file_number concerns in the common case.
//
// There is nothing preventing "file number overflow & meta" from meeting
// and overlapping with session_id_counter, but reaching such a case requires
// an intractable combination of large file offsets (thus at least some large
// files), large file numbers (thus large number of files generated), and
// large number of session IDs generated in a single process. A trillion each
// (2**40) of session ids, offsets, and file numbers comes to 120 bits.
// With two bits of metadata and byte granularity, this is on the verge of
// overlap, but even in the overlap case, it doesn't seem likely that
// a file from billions of files or session ids ago will still be live
// or cached.
//
// In fact, if our SST files are all < 4TB (see
// BlockBasedTable::kMaxFileSizeStandardEncoding), then SST files generated
// in a single process are guaranteed to have unique cache keys, unless/until
// number session ids * max file number = 2**86, e.g. 1 trillion DB::Open in
// a single process and 64 trillion files generated. Even at that point, to
// see a collision we would need a miraculous re-synchronization of session
// id and file number, along with a live file or stale cache entry from
// trillions of files ago.
//
// How https://github.com/pdillinger/unique_id applies here:
// Every bit of output always includes "unstructured" uniqueness bits and
// often combines with "structured" uniqueness bits. The "unstructured" bits
// change infrequently: only when we cannot guarantee our state tracking for
// "structured" uniqueness hasn't been cloned. Using a static
// SemiStructuredUniqueIdGen for db_session_ids, this means we only get an
// "all new" session id when a new process uses RocksDB. (Between processes,
// we don't know if a DB or other persistent storage has been cloned.) Within
// a process, only the session_lower of the db_session_id changes
// incrementally ("structured" uniqueness).
//
// This basically means that our offsets, counters and file numbers allow us
// to do somewhat "better than random" (birthday paradox) while in the
// degenerate case of completely new session for each tiny file, we still
// have strong uniqueness properties from the birthday paradox, with ~103
// bit session IDs or up to 128 bits entropy with different DB IDs sharing a
// cache.
//
// More collision probability analysis:
// Suppose a RocksDB host generates (generously) 2 GB/s (10TB data, 17 DWPD)
// with average process/session lifetime of (pessimistically) 4 minutes.
// In 180 days (generous allowable data lifespan), we generate 31 million GB
// of data, or 2^55 bytes, and 2^16 "all new" session IDs.
//
// First, suppose this is in a single DB (lifetime 180 days):
// 128 bits cache key size
// - 55 <- ideal size for byte offsets + file numbers
// - 2 <- bits for offsets and file numbers not exactly powers of two
// - 2 <- bits for file number encoding metadata
// + 2 <- bits saved not using byte offsets in BlockBasedTable::GetCacheKey
// ----
// 71 <- bits remaining for distinguishing session IDs
// The probability of a collision in 71 bits of session ID data is less than
// 1 in 2**(71 - (2 * 16)), or roughly 1 in a trillion. And this assumes all
// data from the last 180 days is in cache for potential collision, and that
// cache keys under each session id exhaustively cover the remaining 57 bits
// while in reality they'll only cover a small fraction of it.
//
// Although data could be transferred between hosts, each host has its own
// cache and we are already assuming a high rate of "all new" session ids.
// So this doesn't really change the collision calculation. Across a fleet
// of 1 million, each with <1 in a trillion collision possibility,
// fleetwide collision probability is <1 in a million.
//
// Now suppose we have many DBs per host, say 2**10, with same host-wide write
// rate and process/session lifetime. File numbers will be ~10 bits smaller
// and we will have 2**10 times as many session IDs because of simultaneous
// lifetimes. So now collision chance is less than 1 in 2**(81 - (2 * 26)),
// or roughly 1 in a billion.
//
// Suppose instead we generated random or hashed cache keys for each
// (compressed) block. For 1KB compressed block size, that is 2^45 cache keys
// in 180 days. Collision probability is more easily estimated at roughly
// 1 in 2**(128 - (2 * 45)) or roughly 1 in a trillion (assuming all
// data from the last 180 days is in cache, but NOT the other assumption
// for the 1 in a trillion estimate above).
//
// Conclusion: Burning through session IDs, particularly "all new" IDs that
// only arise when a new process is started, is the only way to have a
// plausible chance of cache key collision. When processes live for hours
// or days, the chance of a cache key collision seems more plausibly due
// to bad hardware than to bad luck in random session ID data.
//
OffsetableCacheKey::OffsetableCacheKey(const std::string &db_id,
const std::string &db_session_id,
uint64_t file_number,
uint64_t max_offset) {
#ifndef NDEBUG
max_offset_ = max_offset;
#endif
// Closely related to GetSstInternalUniqueId, but only need 128 bits and
// need to include an offset within the file.
// See also https://github.com/pdillinger/unique_id for background.
uint64_t session_upper = 0; // Assignment to appease clang-analyze
uint64_t session_lower = 0; // Assignment to appease clang-analyze
{
Status s = DecodeSessionId(db_session_id, &session_upper, &session_lower);
if (!s.ok()) {
// A reasonable fallback in case malformed
Hash2x64(db_session_id.data(), db_session_id.size(), &session_upper,
&session_lower);
}
}
// Hash the session upper (~39 bits entropy) and DB id (120+ bits entropy)
// for more global uniqueness entropy.
// (It is possible that many DBs descended from one common DB id are copied
// around and proliferate, in which case session id is critical, but it is
// more common for different DBs to have different DB ids.)
uint64_t db_hash = Hash64(db_id.data(), db_id.size(), session_upper);
// This establishes the db+session id part of the cache key.
//
// Exactly preserve (in common cases; see modifiers below) session lower to
// ensure that session ids generated during the same process lifetime are
// guaranteed unique.
//
// We put this first for CommonPrefixSlice(), so that a small-ish set of
// cache key prefixes to cover entries relevant to any DB.
session_etc64_ = session_lower;
// This provides extra entopy in case of different DB id or process
// generating a session id, but is also partly/variably obscured by
// file_number and offset (see below).
offset_etc64_ = db_hash;
// Into offset_etc64_ we are (eventually) going to pack & xor in an offset and
// a file_number, but we might need the file_number to overflow into
// session_etc64_. (There must only be one session_etc64_ value per
// file, and preferably shared among many files.)
//
// Figure out how many bytes of file_number we are going to be able to
// pack in with max_offset, though our encoding will only support packing
// in up to 3 bytes of file_number. (16M file numbers is enough for a new
// file number every second for half a year.)
int file_number_bytes_in_offset_etc =
(63 - FloorLog2(max_offset | 0x100000000U)) / 8;
int file_number_bits_in_offset_etc = file_number_bytes_in_offset_etc * 8;
// Assert two bits of metadata
assert(file_number_bytes_in_offset_etc >= 0 &&
file_number_bytes_in_offset_etc <= 3);
// Assert we couldn't have used a larger allowed number of bytes (shift
// would chop off bytes).
assert(file_number_bytes_in_offset_etc == 3 ||
(max_offset << (file_number_bits_in_offset_etc + 8) >>
(file_number_bits_in_offset_etc + 8)) != max_offset);
uint64_t mask = (uint64_t{1} << (file_number_bits_in_offset_etc)) - 1;
// Pack into high bits of etc so that offset can go in low bits of etc
// TODO: could be EndianSwapValue?
uint64_t offset_etc_modifier = ReverseBits(file_number & mask);
assert(offset_etc_modifier << file_number_bits_in_offset_etc == 0U);
// Overflow and 3 - byte count (likely both zero) go into session_id part
uint64_t session_etc_modifier =
(file_number >> file_number_bits_in_offset_etc << 2) |
static_cast<uint64_t>(3 - file_number_bytes_in_offset_etc);
// Packed into high bits to minimize interference with session id counter.
session_etc_modifier = ReverseBits(session_etc_modifier);
// Assert session_id part is only modified in extreme cases
assert(session_etc_modifier == 0 || file_number > /*3 bytes*/ 0xffffffU ||
max_offset > /*5 bytes*/ 0xffffffffffU);
// Xor in the modifiers
session_etc64_ ^= session_etc_modifier;
offset_etc64_ ^= offset_etc_modifier;
// Although DBImpl guarantees (in recent versions) that session_lower is not
// zero, that's not entirely sufficient to guarantee that session_etc64_ is
// not zero (so that the 0 case can be used by CacheKey::CreateUnique*)
if (session_etc64_ == 0U) {
session_etc64_ = session_upper | 1U;
}
assert(session_etc64_ != 0);
}
} // namespace ROCKSDB_NAMESPACE

132
cache/cache_key.h vendored Normal file
View File

@ -0,0 +1,132 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cstdint>
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/slice.h"
namespace ROCKSDB_NAMESPACE {
class Cache;
// A standard holder for fixed-size block cache keys (and for related caches).
// They are created through one of these, each using its own range of values:
// * CacheKey::CreateUniqueForCacheLifetime
// * CacheKey::CreateUniqueForProcessLifetime
// * Default ctor ("empty" cache key)
// * OffsetableCacheKey->WithOffset
//
// The first two use atomic counters to guarantee uniqueness over the given
// lifetime and the last uses a form of universally unique identifier for
// uniqueness with very high probabilty (and guaranteed for files generated
// during a single process lifetime).
//
// CacheKeys are currently used by calling AsSlice() to pass as a key to
// Cache. For performance, the keys are endianness-dependent (though otherwise
// portable). (Persistable cache entries are not intended to cross platforms.)
class CacheKey {
public:
// For convenience, constructs an "empty" cache key that is never returned
// by other means.
inline CacheKey() : session_etc64_(), offset_etc64_() {}
inline bool IsEmpty() const {
return (session_etc64_ == 0) & (offset_etc64_ == 0);
}
// Use this cache key as a Slice (byte order is endianness-dependent)
inline Slice AsSlice() const {
static_assert(sizeof(*this) == 16, "Standardized on 16-byte cache key");
assert(!IsEmpty());
return Slice(reinterpret_cast<const char *>(this), sizeof(*this));
}
// Create a CacheKey that is unique among others associated with this Cache
// instance. Depends on Cache::NewId. This is useful for block cache
// "reservations".
static CacheKey CreateUniqueForCacheLifetime(Cache *cache);
// Create a CacheKey that is unique among others for the lifetime of this
// process. This is useful for saving in a static data member so that
// different DB instances can agree on a cache key for shared entities,
// such as for CacheEntryStatsCollector.
static CacheKey CreateUniqueForProcessLifetime();
protected:
friend class OffsetableCacheKey;
CacheKey(uint64_t session_etc64, uint64_t offset_etc64)
: session_etc64_(session_etc64), offset_etc64_(offset_etc64) {}
uint64_t session_etc64_;
uint64_t offset_etc64_;
};
// A file-specific generator of cache keys, sometimes referred to as the
// "base" cache key for a file because all the cache keys for various offsets
// within the file are computed using simple arithmetic. The basis for the
// general approach is dicussed here: https://github.com/pdillinger/unique_id
// Heavily related to GetUniqueIdFromTableProperties.
//
// If the db_id, db_session_id, and file_number come from the file's table
// properties, then the keys will be stable across DB::Open/Close, backup/
// restore, import/export, etc.
//
// This class "is a" CacheKey only privately so that it is not misused as
// a ready-to-use CacheKey.
class OffsetableCacheKey : private CacheKey {
public:
// For convenience, constructs an "empty" cache key that should not be used.
inline OffsetableCacheKey() : CacheKey() {}
// Constructs an OffsetableCacheKey with the given information about a file.
// max_offset is based on file size (see WithOffset) and is required here to
// choose an appropriate (sub-)encoding. This constructor never generates an
// "empty" base key.
OffsetableCacheKey(const std::string &db_id, const std::string &db_session_id,
uint64_t file_number, uint64_t max_offset);
inline bool IsEmpty() const {
bool result = session_etc64_ == 0;
assert(!(offset_etc64_ > 0 && result));
return result;
}
// Construct a CacheKey for an offset within a file, which must be
// <= max_offset provided in constructor. An offset is not necessarily a
// byte offset if a smaller unique identifier of keyable offsets is used.
//
// This class was designed to make this hot code extremely fast.
inline CacheKey WithOffset(uint64_t offset) const {
assert(!IsEmpty());
assert(offset <= max_offset_);
return CacheKey(session_etc64_, offset_etc64_ ^ offset);
}
// The "common prefix" is a shared prefix for all the returned CacheKeys,
// that also happens to usually be the same among many files in the same DB,
// so is efficient and highly accurate (not perfectly) for DB-specific cache
// dump selection (but not file-specific).
static constexpr size_t kCommonPrefixSize = 8;
inline Slice CommonPrefixSlice() const {
static_assert(sizeof(session_etc64_) == kCommonPrefixSize,
"8 byte common prefix expected");
assert(!IsEmpty());
assert(&this->session_etc64_ == static_cast<const void *>(this));
return Slice(reinterpret_cast<const char *>(this), kCommonPrefixSize);
}
// For any max_offset <= this value, the same encoding scheme is guaranteed.
static constexpr uint64_t kMaxOffsetStandardEncoding = 0xffffffffffU;
private:
#ifndef NDEBUG
uint64_t max_offset_ = 0;
#endif
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -28,8 +28,6 @@ CacheReservationManager::CacheReservationManager(std::shared_ptr<Cache> cache,
memory_used_(0) {
assert(cache != nullptr);
cache_ = cache;
std::memset(cache_key_, 0, kCacheKeyPrefixSize + kMaxVarint64Length);
EncodeVarint64(cache_key_, cache_->NewId());
}
CacheReservationManager::~CacheReservationManager() {
@ -152,10 +150,8 @@ Slice CacheReservationManager::GetNextCacheKey() {
// underlying cache_key_ that is shared among other keys generated from this
// fucntion. Therefore please make sure the previous keys are saved/copied
// before calling this function.
std::memset(cache_key_ + kCacheKeyPrefixSize, 0, kMaxVarint64Length);
char* end =
EncodeVarint64(cache_key_ + kCacheKeyPrefixSize, next_cache_key_id_++);
return Slice(cache_key_, static_cast<std::size_t>(end - cache_key_));
cache_key_ = CacheKey::CreateUniqueForCacheLifetime(cache_.get());
return cache_key_.AsSlice();
}
template <CacheEntryRole R>

View File

@ -158,10 +158,6 @@ class CacheReservationManager
private:
static constexpr std::size_t kSizeDummyEntry = 256 * 1024;
// The key will be longer than keys for blocks in SST files so they won't
// conflict.
static const std::size_t kCacheKeyPrefixSize =
BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length;
Slice GetNextCacheKey();
template <CacheEntryRole R>
@ -173,9 +169,7 @@ class CacheReservationManager
std::atomic<std::size_t> cache_allocated_size_;
std::size_t memory_used_;
std::vector<Cache::Handle *> dummy_handles_;
std::uint64_t next_cache_key_id_ = 0;
// The non-prefix part will be updated according to the ID to use.
char cache_key_[kCacheKeyPrefixSize + kMaxVarint64Length];
CacheKey cache_key_;
};
// CacheReservationHandle is for managing the lifetime of a cache reservation

View File

@ -26,8 +26,6 @@ class CacheReservationManagerTest : public ::testing::Test {
CacheReservationManager::GetDummyEntrySize();
static constexpr std::size_t kCacheCapacity = 4096 * kSizeDummyEntry;
static constexpr int kNumShardBits = 0; // 2^0 shard
static const std::size_t kCacheKeyPrefixSize =
BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length;
static constexpr std::size_t kMetaDataChargeOverhead = 10000;
std::shared_ptr<Cache> cache = NewLRUCache(kCacheCapacity, kNumShardBits);
@ -39,22 +37,6 @@ class CacheReservationManagerTest : public ::testing::Test {
};
TEST_F(CacheReservationManagerTest, GenerateCacheKey) {
// The first cache reservation manager owning the cache will have
// cache->NewId() = 1
constexpr std::size_t kCacheNewId = 1;
// The first key generated inside of cache reservation manager will have
// next_cache_key_id = 0
constexpr std::size_t kCacheKeyId = 0;
char expected_cache_key[kCacheKeyPrefixSize + kMaxVarint64Length];
std::memset(expected_cache_key, 0, kCacheKeyPrefixSize + kMaxVarint64Length);
EncodeVarint64(expected_cache_key, kCacheNewId);
char* end =
EncodeVarint64(expected_cache_key + kCacheKeyPrefixSize, kCacheKeyId);
Slice expected_cache_key_slice(
expected_cache_key, static_cast<std::size_t>(end - expected_cache_key));
std::size_t new_mem_used = 1 * kSizeDummyEntry;
Status s =
test_cache_rev_mng
@ -65,7 +47,17 @@ TEST_F(CacheReservationManagerTest, GenerateCacheKey) {
ASSERT_LT(cache->GetPinnedUsage(),
1 * kSizeDummyEntry + kMetaDataChargeOverhead);
Cache::Handle* handle = cache->Lookup(expected_cache_key_slice);
// Next unique Cache key
CacheKey ckey = CacheKey::CreateUniqueForCacheLifetime(cache.get());
// Back it up to the one used by CRM (using CacheKey implementation details)
using PairU64 = std::pair<uint64_t, uint64_t>;
auto& ckey_pair = *reinterpret_cast<PairU64*>(&ckey);
ckey_pair.second--;
// Specific key (subject to implementation details)
EXPECT_EQ(ckey_pair, PairU64(0, 2));
Cache::Handle* handle = cache->Lookup(ckey.AsSlice());
EXPECT_NE(handle, nullptr)
<< "Failed to generate the cache key for the dummy entry correctly";
// Clean up the returned handle from Lookup() to prevent memory leak

View File

@ -8,6 +8,7 @@
#include <string>
#include <vector>
#include "cache/cache_key.h"
#include "db/db_test_util.h"
#include "file/sst_file_manager_impl.h"
#include "port/port.h"
@ -233,7 +234,10 @@ class TestSecondaryCache : public SecondaryCache {
void ResetInjectFailure() { inject_failure_ = false; }
void SetDbSessionId(const std::string& db_session_id) {
db_session_id_ = db_session_id;
// NOTE: we assume the file is smaller than kMaxFileSizeStandardEncoding
// for this to work, but that's safe in a test.
auto base = OffsetableCacheKey("unknown", db_session_id, 1, 1);
ckey_prefix_ = base.CommonPrefixSlice().ToString();
}
Status Insert(const Slice& key, void* value,
@ -241,7 +245,7 @@ class TestSecondaryCache : public SecondaryCache {
if (inject_failure_) {
return Status::Corruption("Insertion Data Corrupted");
}
assert(IsDbSessionIdAsKeyPrefix(key) == true);
EXPECT_TRUE(IsDbSessionLowerAsKeyPrefix(key));
size_t size;
char* buf;
Status s;
@ -317,18 +321,8 @@ class TestSecondaryCache : public SecondaryCache {
uint32_t num_lookups() { return num_lookups_; }
bool IsDbSessionIdAsKeyPrefix(const Slice& key) {
if (db_session_id_.size() == 0) {
return true;
}
if (key.size() < 20) {
return false;
}
std::string s_key = key.ToString();
if (s_key.substr(0, 20) != db_session_id_) {
return false;
}
return true;
bool IsDbSessionLowerAsKeyPrefix(const Slice& key) {
return key.starts_with(ckey_prefix_);
}
private:
@ -373,7 +367,7 @@ class TestSecondaryCache : public SecondaryCache {
uint32_t num_inserts_;
uint32_t num_lookups_;
bool inject_failure_;
std::string db_session_id_;
std::string ckey_prefix_;
ResultMap result_map_;
};

View File

@ -1589,7 +1589,8 @@ TEST_P(DBBlockCacheKeyTest, StableCacheKeys) {
// This is a "control" side of the test that also ensures safely degraded
// behavior on old files.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PropertyBlockBuilder::AddTableProperty:Start", [&](void* arg) {
"BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey",
[&](void* arg) {
TableProperties* props = reinterpret_cast<TableProperties*>(arg);
props->orig_file_number = 0;
});
@ -1649,11 +1650,7 @@ TEST_P(DBBlockCacheKeyTest, StableCacheKeys) {
}
if (exclude_file_numbers_) {
// FIXME(peterd): figure out where these extra two ADDs are coming from
options.statistics->recordTick(BLOCK_CACHE_INDEX_ADD,
uint64_t{0} - uint64_t{2});
options.statistics->recordTick(BLOCK_CACHE_FILTER_ADD,
uint64_t{0} - uint64_t{2});
// FIXME(peterd): figure out where these extra ADDs are coming from
options.statistics->recordTick(BLOCK_CACHE_COMPRESSED_ADD,
uint64_t{0} - uint64_t{2});
}
@ -1708,14 +1705,6 @@ TEST_P(DBBlockCacheKeyTest, StableCacheKeys) {
IngestExternalFileOptions ingest_opts;
ASSERT_OK(db_->IngestExternalFile(handles_[1], {external}, ingest_opts));
if (exclude_file_numbers_) {
// FIXME(peterd): figure out where these extra two ADDs are coming from
options.statistics->recordTick(BLOCK_CACHE_INDEX_ADD,
uint64_t{0} - uint64_t{2});
options.statistics->recordTick(BLOCK_CACHE_FILTER_ADD,
uint64_t{0} - uint64_t{2});
}
perform_gets();
verify_stats();
#endif // !ROCKSDB_LITE

View File

@ -3948,16 +3948,25 @@ Status DBImpl::GetDbSessionId(std::string& session_id) const {
return Status::OK();
}
namespace {
SemiStructuredUniqueIdGen* DbSessionIdGen() {
static SemiStructuredUniqueIdGen gen;
return &gen;
}
} // namespace
void DBImpl::TEST_ResetDbSessionIdGen() { DbSessionIdGen()->Reset(); }
std::string DBImpl::GenerateDbSessionId(Env*) {
// See SemiStructuredUniqueIdGen for its desirable properties.
static SemiStructuredUniqueIdGen gen;
auto gen = DbSessionIdGen();
uint64_t lo, hi;
gen.GenerateNext(&hi, &lo);
gen->GenerateNext(&hi, &lo);
if (lo == 0) {
// Avoid emitting session ID with lo==0, so that SST unique
// IDs can be more easily ensured non-zero
gen.GenerateNext(&hi, &lo);
gen->GenerateNext(&hi, &lo);
assert(lo != 0);
}
return EncodeSessionId(hi, lo);

View File

@ -1128,10 +1128,12 @@ class DBImpl : public DB {
State state_;
};
static void TEST_ResetDbSessionIdGen();
static std::string GenerateDbSessionId(Env* env);
protected:
const std::string dbname_;
// TODO(peterd): unify with VersionSet::db_id_
std::string db_id_;
// db_session_id_ is an identifier that gets reset
// every time the DB is opened

View File

@ -38,7 +38,8 @@ Status ExternalSstFileIngestionJob::Prepare(
// Read the information of files we are ingesting
for (const std::string& file_path : external_files_paths) {
IngestedFileInfo file_to_ingest;
status = GetIngestedFileInfo(file_path, &file_to_ingest, sv);
status =
GetIngestedFileInfo(file_path, next_file_number++, &file_to_ingest, sv);
if (!status.ok()) {
return status;
}
@ -102,7 +103,6 @@ Status ExternalSstFileIngestionJob::Prepare(
// Copy/Move external files into DB
std::unordered_set<size_t> ingestion_path_ids;
for (IngestedFileInfo& f : files_to_ingest_) {
f.fd = FileDescriptor(next_file_number++, 0, f.file_size);
f.copy_file = false;
const std::string path_outside_db = f.external_file_path;
const std::string path_inside_db =
@ -543,8 +543,8 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
}
Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
const std::string& external_file, IngestedFileInfo* file_to_ingest,
SuperVersion* sv) {
const std::string& external_file, uint64_t new_file_number,
IngestedFileInfo* file_to_ingest, SuperVersion* sv) {
file_to_ingest->external_file_path = external_file;
// Get external file size
@ -554,6 +554,10 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
return status;
}
// Assign FD with number
file_to_ingest->fd =
FileDescriptor(new_file_number, 0, file_to_ingest->file_size);
// Create TableReader for external file
std::unique_ptr<TableReader> table_reader;
std::unique_ptr<FSRandomAccessFile> sst_file;
@ -568,9 +572,14 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));
status = cfd_->ioptions()->table_factory->NewTableReader(
TableReaderOptions(*cfd_->ioptions(),
sv->mutable_cf_options.prefix_extractor.get(),
env_options_, cfd_->internal_comparator()),
TableReaderOptions(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor.get(),
env_options_, cfd_->internal_comparator(),
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1,
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
std::move(sst_file_reader), file_to_ingest->file_size, &table_reader);
if (!status.ok()) {
return status;

View File

@ -138,6 +138,7 @@ class ExternalSstFileIngestionJob {
// Open the external file and populate `file_to_ingest` with all the
// external information we need to ingest this file.
Status GetIngestedFileInfo(const std::string& external_file,
uint64_t new_file_number,
IngestedFileInfo* file_to_ingest,
SuperVersion* sv);

View File

@ -27,7 +27,8 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
for (const auto& file_metadata : metadata_) {
const auto file_path = file_metadata.db_path + "/" + file_metadata.name;
IngestedFileInfo file_to_import;
status = GetIngestedFileInfo(file_path, &file_to_import, sv);
status =
GetIngestedFileInfo(file_path, next_file_number++, &file_to_import, sv);
if (!status.ok()) {
return status;
}
@ -86,8 +87,6 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
// Copy/Move external files into DB
auto hardlink_files = import_options_.move_files;
for (auto& f : files_to_import_) {
f.fd = FileDescriptor(next_file_number++, 0, f.file_size);
const auto path_outside_db = f.external_file_path;
const auto path_inside_db = TableFileName(
cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
@ -198,8 +197,8 @@ void ImportColumnFamilyJob::Cleanup(const Status& status) {
}
Status ImportColumnFamilyJob::GetIngestedFileInfo(
const std::string& external_file, IngestedFileInfo* file_to_import,
SuperVersion* sv) {
const std::string& external_file, uint64_t new_file_number,
IngestedFileInfo* file_to_import, SuperVersion* sv) {
file_to_import->external_file_path = external_file;
// Get external file size
@ -209,6 +208,10 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
return status;
}
// Assign FD with number
file_to_import->fd =
FileDescriptor(new_file_number, 0, file_to_import->file_size);
// Create TableReader for external file
std::unique_ptr<TableReader> table_reader;
std::unique_ptr<FSRandomAccessFile> sst_file;
@ -223,9 +226,14 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));
status = cfd_->ioptions()->table_factory->NewTableReader(
TableReaderOptions(*cfd_->ioptions(),
sv->mutable_cf_options.prefix_extractor.get(),
env_options_, cfd_->internal_comparator()),
TableReaderOptions(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor.get(),
env_options_, cfd_->internal_comparator(),
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1,
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
std::move(sst_file_reader), file_to_import->file_size, &table_reader);
if (!status.ok()) {
return status;

View File

@ -56,6 +56,7 @@ class ImportColumnFamilyJob {
// Open the external file and populate `file_to_import` with all the
// external information we need to import this file.
Status GetIngestedFileInfo(const std::string& external_file,
uint64_t new_file_number,
IngestedFileInfo* file_to_import,
SuperVersion* sv);

View File

@ -1091,6 +1091,8 @@ class VersionSet {
#endif // ROCKSDB_LITE
const std::string& DbSessionId() const { return db_session_id_; }
// Return the current manifest file number
uint64_t manifest_file_number() const { return manifest_file_number_; }

View File

@ -141,9 +141,10 @@ void TEST_GenerateRawUniqueId(uint64_t* a, uint64_t* b, bool exclude_port_uuid,
}
#endif
SemiStructuredUniqueIdGen::SemiStructuredUniqueIdGen() : counter_{} {
void SemiStructuredUniqueIdGen::Reset() {
saved_process_id_ = port::GetProcessID();
GenerateRawUniqueId(&base_upper_, &base_lower_);
counter_ = 0;
}
void SemiStructuredUniqueIdGen::GenerateNext(uint64_t* upper, uint64_t* lower) {

4
env/unique_id_gen.h vendored
View File

@ -53,7 +53,9 @@ void TEST_GenerateRawUniqueId(uint64_t* a, uint64_t* b, bool exclude_port_uuid,
class SemiStructuredUniqueIdGen {
public:
// Initializes with random starting state (from GenerateRawUniqueId)
SemiStructuredUniqueIdGen();
SemiStructuredUniqueIdGen() { Reset(); }
// Re-initializes, but not thread safe
void Reset();
// Assuming no fork(), `lower` is guaranteed unique from one call
// to the next (thread safe).

1
src.mk
View File

@ -2,6 +2,7 @@
LIB_SOURCES = \
cache/cache.cc \
cache/cache_entry_roles.cc \
cache/cache_key.cc \
cache/cache_reservation_manager.cc \
cache/clock_cache.cc \
cache/lru_cache.cc \

View File

@ -22,6 +22,7 @@
#include <utility>
#include "cache/cache_entry_roles.h"
#include "cache/cache_key.h"
#include "cache/cache_reservation_manager.h"
#include "db/dbformat.h"
#include "index_builder.h"
@ -321,10 +322,7 @@ struct BlockBasedTableBuilder::Rep {
compression_dict_buffer_cache_res_mgr;
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
char cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
size_t cache_key_prefix_size;
char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
size_t compressed_cache_key_prefix_size;
OffsetableCacheKey base_cache_key;
const TableFileCreationReason reason;
BlockHandle pending_handle; // Handle to add to index block
@ -436,8 +434,6 @@ struct BlockBasedTableBuilder::Rep {
: State::kUnbuffered),
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
!table_opt.block_align),
cache_key_prefix_size(0),
compressed_cache_key_prefix_size(0),
reason(tbo.reason),
flush_block_policy(
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
@ -887,7 +883,16 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
rep_->filter_builder->StartBlock(0);
}
SetupCacheKeyPrefix(tbo);
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey",
const_cast<TableProperties*>(&rep_->props));
// Extremely large files use atypical cache key encoding, and we don't
// know ahead of time how big the file will be. But assuming it's less
// than 4TB, we will correctly predict the cache keys.
BlockBasedTable::SetupBaseCacheKey(
&rep_->props, tbo.db_session_id, tbo.cur_file_num,
BlockBasedTable::kMaxFileSizeStandardEncoding, &rep_->base_cache_key);
if (rep_->IsParallelCompressionEnabled()) {
StartParallelCompression();
@ -1408,25 +1413,6 @@ void DeleteEntryCached(const Slice& /*key*/, void* value) {
}
} // namespace
// Helper function to setup the cache key's prefix for the Table.
void BlockBasedTableBuilder::SetupCacheKeyPrefix(
const TableBuilderOptions& tbo) {
// FIXME: Unify with BlockBasedTable::SetupCacheKeyPrefix
if (rep_->table_options.block_cache.get() != nullptr) {
BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
rep_->table_options.block_cache.get(), rep_->file->writable_file(),
&rep_->cache_key_prefix[0], &rep_->cache_key_prefix_size,
tbo.db_session_id, tbo.cur_file_num);
}
if (rep_->table_options.block_cache_compressed.get() != nullptr) {
BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
rep_->table_options.block_cache_compressed.get(),
rep_->file->writable_file(), &rep_->compressed_cache_key_prefix[0],
&rep_->compressed_cache_key_prefix_size, tbo.db_session_id,
tbo.cur_file_num);
}
}
//
// Make a copy of the block contents and insert into compressed block cache
//
@ -1450,15 +1436,10 @@ Status BlockBasedTableBuilder::InsertBlockInCompressedCache(
block_contents_to_cache->is_raw_block = true;
#endif // NDEBUG
// make cache key by appending the file offset to the cache prefix id
char* end = EncodeVarint64(
r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
handle->offset());
Slice key(r->compressed_cache_key_prefix,
static_cast<size_t>(end - r->compressed_cache_key_prefix));
CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle);
s = block_cache_compressed->Insert(
key, block_contents_to_cache,
key.AsSlice(), block_contents_to_cache,
block_contents_to_cache->ApproximateMemoryUsage(),
&DeleteEntryCached<BlockContents>);
if (s.ok()) {
@ -1511,11 +1492,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
memcpy(buf.get(), block_contents.data(), size);
BlockContents results(std::move(buf), size);
char
cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
Slice key = BlockBasedTable::GetCacheKey(rep_->cache_key_prefix,
rep_->cache_key_prefix_size,
*handle, cache_key);
CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle);
const size_t read_amp_bytes_per_bit =
rep_->table_options.read_amp_bytes_per_bit;
@ -1532,7 +1509,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
assert(block_holder->own_bytes());
size_t charge = block_holder->ApproximateMemoryUsage();
s = block_cache->Insert(
key, block_holder.get(),
key.AsSlice(), block_holder.get(),
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), charge,
nullptr, Cache::Priority::LOW);

View File

@ -16,6 +16,7 @@
#include <vector>
#include "cache/cache_entry_roles.h"
#include "cache/cache_key.h"
#include "cache/sharded_cache.h"
#include "db/compaction/compaction_picker.h"
#include "db/dbformat.h"
@ -79,8 +80,6 @@ BlockBasedTable::~BlockBasedTable() {
delete rep_;
}
std::atomic<uint64_t> BlockBasedTable::next_cache_key_id_(0);
namespace {
// Read the block identified by "handle" from "file".
// The only relevant option is options.verify_checksums for now.
@ -393,42 +392,6 @@ Status BlockBasedTable::InsertEntryToCache(
return s;
}
// Helper function to setup the cache key's prefix for the Table.
void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep,
const std::string& db_session_id,
uint64_t file_num) {
assert(kMaxCacheKeyPrefixSize >= 10);
rep->cache_key_prefix_size = 0;
rep->compressed_cache_key_prefix_size = 0;
if (rep->table_options.block_cache != nullptr) {
GenerateCachePrefix<Cache, FSRandomAccessFile>(
rep->table_options.block_cache.get(), rep->file->file(),
&rep->cache_key_prefix[0], &rep->cache_key_prefix_size, db_session_id,
file_num);
}
if (rep->table_options.block_cache_compressed != nullptr) {
GenerateCachePrefix<Cache, FSRandomAccessFile>(
rep->table_options.block_cache_compressed.get(), rep->file->file(),
&rep->compressed_cache_key_prefix[0],
&rep->compressed_cache_key_prefix_size, db_session_id, file_num);
}
if (rep->table_options.persistent_cache != nullptr) {
char persistent_cache_key_prefix[kMaxCacheKeyPrefixSize];
size_t persistent_cache_key_prefix_size = 0;
GenerateCachePrefix<PersistentCache, FSRandomAccessFile>(
rep->table_options.persistent_cache.get(), rep->file->file(),
&persistent_cache_key_prefix[0], &persistent_cache_key_prefix_size,
db_session_id, file_num);
rep->persistent_cache_options =
PersistentCacheOptions(rep->table_options.persistent_cache,
std::string(persistent_cache_key_prefix,
persistent_cache_key_prefix_size),
rep->ioptions.stats);
}
}
namespace {
// Return True if table_properties has `user_prop_name` has a `true` value
// or it doesn't contain this property (for backward compatible).
@ -523,16 +486,62 @@ Status GetGlobalSequenceNumber(const TableProperties& table_properties,
}
} // namespace
Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix,
size_t cache_key_prefix_size,
const BlockHandle& handle, char* cache_key) {
assert(cache_key != nullptr);
assert(cache_key_prefix_size != 0);
assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize);
memcpy(cache_key, cache_key_prefix, cache_key_prefix_size);
char* end =
EncodeVarint64(cache_key + cache_key_prefix_size, handle.offset());
return Slice(cache_key, static_cast<size_t>(end - cache_key));
void BlockBasedTable::SetupBaseCacheKey(const TableProperties* properties,
const std::string& cur_db_session_id,
uint64_t cur_file_number,
uint64_t file_size,
OffsetableCacheKey* out_base_cache_key,
bool* out_is_stable) {
// Use a stable cache key if sufficient data is in table properties
std::string db_session_id;
uint64_t file_num;
std::string db_id;
if (properties && !properties->db_session_id.empty() &&
properties->orig_file_number > 0) {
// (Newer SST file case)
// We must have both properties to get a stable unique id because
// CreateColumnFamilyWithImport or IngestExternalFiles can change the
// file numbers on a file.
db_session_id = properties->db_session_id;
file_num = properties->orig_file_number;
// Less critical, populated in earlier release than above
db_id = properties->db_id;
if (out_is_stable) {
*out_is_stable = true;
}
} else {
// (Old SST file case)
// We use (unique) cache keys based on current identifiers. These are at
// least stable across table file close and re-open, but not across
// different DBs nor DB close and re-open.
db_session_id = cur_db_session_id;
file_num = cur_file_number;
// Plumbing through the DB ID to here would be annoying, and of limited
// value because of the case of VersionSet::Recover opening some table
// files and later setting the DB ID. So we just rely on uniqueness
// level provided by session ID.
db_id = "unknown";
if (out_is_stable) {
*out_is_stable = false;
}
}
// Too many tests to update to get these working
// assert(file_num > 0);
// assert(!db_session_id.empty());
// assert(!db_id.empty());
// Minimum block size is 5 bytes; therefore we can trim off two lower bits
// from offets. See GetCacheKey.
*out_base_cache_key = OffsetableCacheKey(db_id, db_session_id, file_num,
/*max_offset*/ file_size >> 2);
}
CacheKey BlockBasedTable::GetCacheKey(const OffsetableCacheKey& base_cache_key,
const BlockHandle& handle) {
// Minimum block size is 5 bytes; therefore we can trim off two lower bits
// from offet.
return base_cache_key.WithOffset(handle.offset() >> 2);
}
Status BlockBasedTable::Open(
@ -653,24 +662,13 @@ Status BlockBasedTable::Open(
return s;
}
// With properties loaded, we can set up portable/stable cache keys if
// necessary info is available
std::string db_session_id;
uint64_t file_num;
if (rep->table_properties && !rep->table_properties->db_session_id.empty() &&
rep->table_properties->orig_file_number > 0) {
// We must have both properties to get a stable unique id because
// CreateColumnFamilyWithImport or IngestExternalFiles can change the
// file numbers on a file.
db_session_id = rep->table_properties->db_session_id;
file_num = rep->table_properties->orig_file_number;
} else {
// We have to use transient (but unique) cache keys based on current
// identifiers.
db_session_id = cur_db_session_id;
file_num = cur_file_num;
}
SetupCacheKeyPrefix(rep, db_session_id, file_num);
// With properties loaded, we can set up portable/stable cache keys
SetupBaseCacheKey(rep->table_properties.get(), cur_db_session_id,
cur_file_num, file_size, &rep->base_cache_key);
rep->persistent_cache_options =
PersistentCacheOptions(rep->table_options.persistent_cache,
rep->base_cache_key, rep->ioptions.stats);
s = new_table->ReadRangeDelBlock(ro, prefetch_buffer.get(),
metaindex_iter.get(), internal_comparator,
@ -1121,8 +1119,7 @@ Status BlockBasedTable::ReadMetaIndexBlock(
template <typename TBlocklike>
Status BlockBasedTable::GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const Slice& cache_key, Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, CachableEntry<TBlocklike>* block,
const UncompressionDict& uncompression_dict, BlockType block_type,
const bool wait, GetContext* get_context) const {
@ -1151,9 +1148,10 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Lookup uncompressed cache first
if (block_cache != nullptr) {
assert(!cache_key.empty());
Cache::Handle* cache_handle = nullptr;
cache_handle = GetEntryFromCache(
rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
rep_->ioptions.lowest_used_cache_tier, block_cache, cache_key,
block_type, wait, get_context,
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), create_cb,
priority);
@ -1172,19 +1170,19 @@ Status BlockBasedTable::GetDataBlockFromCache(
return s;
}
assert(!compressed_block_cache_key.empty());
assert(!cache_key.empty());
BlockContents contents;
if (rep_->ioptions.lowest_used_cache_tier ==
CacheTier::kNonVolatileBlockTier) {
Cache::CreateCallback create_cb_special = GetCreateCallback<BlockContents>(
read_amp_bytes_per_bit, statistics, using_zstd, filter_policy);
block_cache_compressed_handle = block_cache_compressed->Lookup(
compressed_block_cache_key,
cache_key,
BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
create_cb_special, priority, true);
} else {
block_cache_compressed_handle =
block_cache_compressed->Lookup(compressed_block_cache_key, statistics);
block_cache_compressed->Lookup(cache_key, statistics);
}
// if we found in the compressed cache, then uncompress and insert into
@ -1223,7 +1221,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
size_t charge = block_holder->ApproximateMemoryUsage();
Cache::Handle* cache_handle = nullptr;
s = InsertEntryToCache(
rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
rep_->ioptions.lowest_used_cache_tier, block_cache, cache_key,
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type),
block_holder, charge, &cache_handle, priority);
if (s.ok()) {
@ -1248,8 +1246,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
template <typename TBlocklike>
Status BlockBasedTable::PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const Slice& cache_key, Cache* block_cache, Cache* block_cache_compressed,
CachableEntry<TBlocklike>* cached_block, BlockContents* raw_block_contents,
CompressionType raw_block_comp_type,
const UncompressionDict& uncompression_dict,
@ -1304,9 +1301,8 @@ Status BlockBasedTable::PutDataBlockToCache(
if (block_cache_compressed != nullptr &&
raw_block_comp_type != kNoCompression && raw_block_contents != nullptr &&
raw_block_contents->own_bytes()) {
#ifndef NDEBUG
assert(raw_block_contents->is_raw_block);
#endif // NDEBUG
assert(!cache_key.empty());
// We cannot directly put raw_block_contents because this could point to
// an object in the stack.
@ -1314,7 +1310,7 @@ Status BlockBasedTable::PutDataBlockToCache(
new BlockContents(std::move(*raw_block_contents)));
s = InsertEntryToCache(
rep_->ioptions.lowest_used_cache_tier, block_cache_compressed,
compressed_block_cache_key,
cache_key,
BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
block_cont_for_comp_cache,
block_cont_for_comp_cache->ApproximateMemoryUsage(), nullptr,
@ -1335,7 +1331,7 @@ Status BlockBasedTable::PutDataBlockToCache(
size_t charge = block_holder->ApproximateMemoryUsage();
Cache::Handle* cache_handle = nullptr;
s = InsertEntryToCache(
rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
rep_->ioptions.lowest_used_cache_tier, block_cache, cache_key,
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type),
block_holder, charge, &cache_handle, priority);
if (s.ok()) {
@ -1446,27 +1442,17 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
//
// If either block cache is enabled, we'll try to read from it.
Status s;
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
Slice key /* key to the block cache */;
Slice ckey /* key to the compressed block cache */;
CacheKey key_data;
Slice key;
bool is_cache_hit = false;
if (block_cache != nullptr || block_cache_compressed != nullptr) {
// create key for block cache
if (block_cache != nullptr) {
key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
handle, cache_key);
}
if (block_cache_compressed != nullptr) {
ckey = GetCacheKey(rep_->compressed_cache_key_prefix,
rep_->compressed_cache_key_prefix_size, handle,
compressed_cache_key);
}
key_data = GetCacheKey(rep_->base_cache_key, handle);
key = key_data.AsSlice();
if (!contents) {
s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
ro, block_entry, uncompression_dict, block_type,
s = GetDataBlockFromCache(key, block_cache, block_cache_compressed, ro,
block_entry, uncompression_dict, block_type,
wait, get_context);
// Value could still be null at this point, so check the cache handle
// and update the read pattern for prefetching
@ -1532,8 +1518,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// If filling cache is allowed and a cache is configured, try to put the
// block to the cache.
s = PutDataBlockToCache(
key, ckey, block_cache, block_cache_compressed, block_entry,
contents, raw_block_comp_type, uncompression_dict,
key, block_cache, block_cache_compressed, block_entry, contents,
raw_block_comp_type, uncompression_dict,
GetMemoryAllocator(rep_->table_options), block_type, get_context);
}
}
@ -3077,12 +3063,9 @@ bool BlockBasedTable::TEST_BlockInCache(const BlockHandle& handle) const {
return false;
}
char cache_key_storage[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
Slice cache_key =
GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, handle,
cache_key_storage);
CacheKey key = GetCacheKey(rep_->base_cache_key, handle);
Cache::Handle* const cache_handle = cache->Lookup(cache_key);
Cache::Handle* const cache_handle = cache->Lookup(key.AsSlice());
if (cache_handle == nullptr) {
return false;
}
@ -3257,7 +3240,8 @@ uint64_t BlockBasedTable::ApproximateSize(const Slice& start, const Slice& end,
bool BlockBasedTable::TEST_FilterBlockInCache() const {
assert(rep_ != nullptr);
return TEST_BlockInCache(rep_->filter_handle);
return rep_->filter_type != Rep::FilterType::kNoFilter &&
TEST_BlockInCache(rep_->filter_handle);
}
bool BlockBasedTable::TEST_IndexBlockInCache() const {

View File

@ -11,8 +11,10 @@
#include <cstdint>
#include "cache/cache_key.h"
#include "db/range_tombstone_fragmenter.h"
#include "file/filename.h"
#include "rocksdb/table_properties.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_type.h"
@ -62,9 +64,6 @@ class BlockBasedTable : public TableReader {
static const std::string kFilterBlockPrefix;
static const std::string kFullFilterBlockPrefix;
static const std::string kPartitionedFilterBlockPrefix;
// The longest prefix of the cache key used to identify blocks.
// For Posix files the unique ID is three varints.
static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length * 3 + 1;
// All the below fields control iterator readahead
static const size_t kInitAutoReadaheadSize = 8 * 1024;
@ -220,9 +219,20 @@ class BlockBasedTable : public TableReader {
class IndexReaderCommon;
static Slice GetCacheKey(const char* cache_key_prefix,
size_t cache_key_prefix_size,
const BlockHandle& handle, char* cache_key);
// Maximum SST file size that uses standard CacheKey encoding scheme.
// See GetCacheKey to explain << 2. + 3 is permitted because it is trimmed
// off by >> 2 in GetCacheKey.
static constexpr uint64_t kMaxFileSizeStandardEncoding =
(OffsetableCacheKey::kMaxOffsetStandardEncoding << 2) + 3;
static void SetupBaseCacheKey(const TableProperties* properties,
const std::string& cur_db_session_id,
uint64_t cur_file_number, uint64_t file_size,
OffsetableCacheKey* out_base_cache_key,
bool* out_is_stable = nullptr);
static CacheKey GetCacheKey(const OffsetableCacheKey& base_cache_key,
const BlockHandle& handle);
static void UpdateCacheInsertionMetrics(BlockType block_type,
GetContext* get_context, size_t usage,
@ -291,7 +301,6 @@ class BlockBasedTable : public TableReader {
private:
friend class MockedBlockBasedTable;
friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test;
static std::atomic<uint64_t> next_cache_key_id_;
BlockCacheTracer* const block_cache_tracer_;
void UpdateCacheHitMetrics(BlockType block_type, GetContext* get_context,
@ -385,12 +394,13 @@ class BlockBasedTable : public TableReader {
// @param uncompression_dict Data for presetting the compression library's
// dictionary.
template <typename TBlocklike>
Status GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, CachableEntry<TBlocklike>* block,
const UncompressionDict& uncompression_dict, BlockType block_type,
const bool wait, GetContext* get_context) const;
Status GetDataBlockFromCache(const Slice& cache_key, Cache* block_cache,
Cache* block_cache_compressed,
const ReadOptions& read_options,
CachableEntry<TBlocklike>* block,
const UncompressionDict& uncompression_dict,
BlockType block_type, const bool wait,
GetContext* get_context) const;
// Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then
@ -403,9 +413,8 @@ class BlockBasedTable : public TableReader {
// @param uncompression_dict Data for presetting the compression library's
// dictionary.
template <typename TBlocklike>
Status PutDataBlockToCache(const Slice& block_cache_key,
const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
Status PutDataBlockToCache(const Slice& cache_key, Cache* block_cache,
Cache* block_cache_compressed,
CachableEntry<TBlocklike>* cached_block,
BlockContents* raw_block_contents,
CompressionType raw_block_comp_type,
@ -483,40 +492,6 @@ class BlockBasedTable : public TableReader {
bool use_cache, bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context);
static void SetupCacheKeyPrefix(Rep* rep, const std::string& db_session_id,
uint64_t cur_file_num);
// Generate a cache key prefix from the file
template <typename TCache, typename TFile>
static void GenerateCachePrefix(TCache* cc, TFile* file, char* buffer,
size_t* size,
const std::string& db_session_id,
uint64_t cur_file_num) {
// generate an id from the file
*size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize);
// If the prefix wasn't generated or was too long,
// create one based on the DbSessionId and curent file number if they
// are set. Otherwise, created from NewId()
if (cc != nullptr && *size == 0) {
if (db_session_id.size() == 20) {
// db_session_id is 20 bytes as defined.
memcpy(buffer, db_session_id.c_str(), 20);
char* end;
if (cur_file_num != 0) {
end = EncodeVarint64(buffer + 20, cur_file_num);
} else {
end = EncodeVarint64(buffer + 20, cc->NewId());
}
// kMaxVarint64Length is 10 therefore, the prefix is at most 30 bytes.
*size = static_cast<size_t>(end - buffer);
} else {
char* end = EncodeVarint64(buffer, cc->NewId());
*size = static_cast<size_t>(end - buffer);
}
}
}
// Size of all data blocks, maybe approximate
uint64_t GetApproximateDataSize();
@ -585,12 +560,7 @@ struct BlockBasedTable::Rep {
const InternalKeyComparator& internal_comparator;
Status status;
std::unique_ptr<RandomAccessFileReader> file;
char cache_key_prefix[kMaxCacheKeyPrefixSize];
// SIZE_MAX -> assert not used without re-assignment
size_t cache_key_prefix_size = SIZE_MAX;
char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize];
// SIZE_MAX -> assert not used without re-assignment
size_t compressed_cache_key_prefix_size = SIZE_MAX;
OffsetableCacheKey base_cache_key;
PersistentCacheOptions persistent_cache_options;
// Footer contains the fixed table information

View File

@ -78,28 +78,13 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
block_contents_pinned);
if (!block.IsCached()) {
if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
// insert a dummy record to block cache to track the memory usage
if (!ro.fill_cache) {
Cache* const block_cache = rep_->table_options.block_cache.get();
if (block_cache) {
// insert a dummy record to block cache to track the memory usage
Cache::Handle* cache_handle = nullptr;
// There are two other types of cache keys: 1) SST cache key added in
// `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
// `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
// from SST cache key(31 bytes), and use non-zero prefix to
// differentiate from `write_buffer_manager`
const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
// Prefix: use rep_->cache_key_prefix padded by 0s
memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
assert(rep_->cache_key_prefix_size != 0);
assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
next_cache_key_id_++);
assert(end - cache_key <=
static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
s = block_cache->Insert(unique_key, nullptr,
CacheKey key = CacheKey::CreateUniqueForCacheLifetime(block_cache);
s = block_cache->Insert(key.AsSlice(), nullptr,
block.GetValue()->ApproximateMemoryUsage(),
nullptr, &cache_handle);
@ -109,6 +94,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
cache_handle);
}
}
}
} else {
iter->SetCacheHandle(block.GetCacheHandle());
}
@ -150,36 +136,23 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro,
iter, block_contents_pinned);
if (!block.IsCached()) {
if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
// insert a dummy record to block cache to track the memory usage
if (!ro.fill_cache) {
Cache* const block_cache = rep_->table_options.block_cache.get();
if (block_cache) {
// insert a dummy record to block cache to track the memory usage
Cache::Handle* cache_handle = nullptr;
// There are two other types of cache keys: 1) SST cache key added in
// `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
// `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
// from SST cache key(31 bytes), and use non-zero prefix to
// differentiate from `write_buffer_manager`
const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
// Prefix: use rep_->cache_key_prefix padded by 0s
memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
assert(rep_->cache_key_prefix_size != 0);
assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
next_cache_key_id_++);
assert(end - cache_key <=
static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
s = block_cache->Insert(unique_key, nullptr,
CacheKey key = CacheKey::CreateUniqueForCacheLifetime(block_cache);
s = block_cache->Insert(key.AsSlice(), nullptr,
block.GetValue()->ApproximateMemoryUsage(),
nullptr, &cache_handle);
if (s.ok()) {
assert(cache_handle != nullptr);
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
cache_handle);
}
}
}
} else {
iter->SetCacheHandle(block.GetCacheHandle());
}

View File

@ -17,13 +17,10 @@ void PersistentCacheHelper::InsertRawPage(
assert(cache_options.persistent_cache);
assert(cache_options.persistent_cache->IsCompressed());
// construct the page key
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(),
cache_options.key_prefix.size(),
handle, cache_key);
// insert content to cache
cache_options.persistent_cache->Insert(key, data, size)
CacheKey key =
BlockBasedTable::GetCacheKey(cache_options.base_cache_key, handle);
cache_options.persistent_cache->Insert(key.AsSlice(), data, size)
.PermitUncheckedError();
}
@ -36,14 +33,11 @@ void PersistentCacheHelper::InsertUncompressedPage(
// (1) content is cacheable
// (2) content is not compressed
// construct the page key
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(),
cache_options.key_prefix.size(),
handle, cache_key);
// insert block contents to page cache
CacheKey key =
BlockBasedTable::GetCacheKey(cache_options.base_cache_key, handle);
cache_options.persistent_cache
->Insert(key, contents.data.data(), contents.data.size())
->Insert(key.AsSlice(), contents.data.data(), contents.data.size())
.PermitUncheckedError();
;
}
@ -57,14 +51,12 @@ Status PersistentCacheHelper::LookupRawPage(
assert(cache_options.persistent_cache);
assert(cache_options.persistent_cache->IsCompressed());
// construct the page key
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(),
cache_options.key_prefix.size(),
handle, cache_key);
// Lookup page
CacheKey key =
BlockBasedTable::GetCacheKey(cache_options.base_cache_key, handle);
size_t size;
Status s = cache_options.persistent_cache->Lookup(key, raw_data, &size);
Status s =
cache_options.persistent_cache->Lookup(key.AsSlice(), raw_data, &size);
if (!s.ok()) {
// cache miss
RecordTick(cache_options.statistics, PERSISTENT_CACHE_MISS);
@ -90,15 +82,13 @@ Status PersistentCacheHelper::LookupUncompressedPage(
return Status::NotFound();
}
// construct the page key
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(),
cache_options.key_prefix.size(),
handle, cache_key);
// Lookup page
CacheKey key =
BlockBasedTable::GetCacheKey(cache_options.base_cache_key, handle);
std::unique_ptr<char[]> data;
size_t size;
Status s = cache_options.persistent_cache->Lookup(key, &data, &size);
Status s =
cache_options.persistent_cache->Lookup(key.AsSlice(), &data, &size);
if (!s.ok()) {
// cache miss
RecordTick(cache_options.statistics, PERSISTENT_CACHE_MISS);

View File

@ -6,6 +6,7 @@
#include <string>
#include "cache/cache_key.h"
#include "monitoring/statistics.h"
#include "rocksdb/persistent_cache.h"
@ -19,15 +20,15 @@ struct PersistentCacheOptions {
PersistentCacheOptions() {}
explicit PersistentCacheOptions(
const std::shared_ptr<PersistentCache>& _persistent_cache,
const std::string _key_prefix, Statistics* const _statistics)
const OffsetableCacheKey& _base_cache_key, Statistics* const _statistics)
: persistent_cache(_persistent_cache),
key_prefix(_key_prefix),
base_cache_key(_base_cache_key),
statistics(_statistics) {}
virtual ~PersistentCacheOptions() {}
std::shared_ptr<PersistentCache> persistent_cache;
std::string key_prefix;
OffsetableCacheKey base_cache_key;
Statistics* statistics = nullptr;
static const PersistentCacheOptions kEmpty;

View File

@ -195,7 +195,7 @@ class Constructor {
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_comparator,
std::vector<std::string>* keys, stl_wrappers::KVMap* kvmap) {
last_internal_key_ = &internal_comparator;
last_internal_comparator_ = &internal_comparator;
*kvmap = data_;
keys->clear();
for (const auto& kv : data_) {
@ -227,7 +227,7 @@ class Constructor {
virtual bool AnywayDeleteIterator() const { return false; }
protected:
const InternalKeyComparator* last_internal_key_;
const InternalKeyComparator* last_internal_comparator_;
private:
stl_wrappers::KVMap data_;
@ -403,19 +403,8 @@ class TableConstructor : public Constructor {
// Open the table
uniq_id_ = cur_uniq_id_++;
std::unique_ptr<FSRandomAccessFile> source(new test::StringSource(
TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads));
file_reader_.reset(new RandomAccessFileReader(std::move(source), "test"));
const bool kSkipFilters = true;
const bool kImmortal = true;
return ioptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions,
internal_comparator, !kSkipFilters, !kImmortal,
false, level_, largest_seqno_, &block_cache_tracer_,
moptions.write_buffer_size, "", uniq_id_),
std::move(file_reader_), TEST_GetSink()->contents().size(),
&table_reader_);
return Reopen(ioptions, moptions);
}
InternalIterator* NewIterator(
@ -449,7 +438,10 @@ class TableConstructor : public Constructor {
file_reader_.reset(new RandomAccessFileReader(std::move(source), "test"));
return ioptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions,
*last_internal_key_),
*last_internal_comparator_, /*skip_filters*/ false,
/*immortal*/ false, false, level_, largest_seqno_,
&block_cache_tracer_, moptions.write_buffer_size, "",
uniq_id_),
std::move(file_reader_), TEST_GetSink()->contents().size(),
&table_reader_);
}

View File

@ -98,36 +98,4 @@ inline uint64_t DecodeFixed64(const char* ptr) {
}
}
// Swaps between big and little endian. Can be used to in combination
// with the little-endian encoding/decoding functions to encode/decode
// big endian.
template <typename T>
inline T EndianSwapValue(T v) {
static_assert(std::is_integral<T>::value, "non-integral type");
#ifdef _MSC_VER
if (sizeof(T) == 2) {
return static_cast<T>(_byteswap_ushort(static_cast<uint16_t>(v)));
} else if (sizeof(T) == 4) {
return static_cast<T>(_byteswap_ulong(static_cast<uint32_t>(v)));
} else if (sizeof(T) == 8) {
return static_cast<T>(_byteswap_uint64(static_cast<uint64_t>(v)));
}
#else
if (sizeof(T) == 2) {
return static_cast<T>(__builtin_bswap16(static_cast<uint16_t>(v)));
} else if (sizeof(T) == 4) {
return static_cast<T>(__builtin_bswap32(static_cast<uint32_t>(v)));
} else if (sizeof(T) == 8) {
return static_cast<T>(__builtin_bswap64(static_cast<uint64_t>(v)));
}
#endif
// Recognized by clang as bswap, but not by gcc :(
T ret_val = 0;
for (size_t i = 0; i < sizeof(T); ++i) {
ret_val |= ((v >> (8 * i)) & 0xff) << (8 * (sizeof(T) - 1 - i));
}
return ret_val;
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -12,6 +12,7 @@
#include "rocksdb/status.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/math.h"
namespace ROCKSDB_NAMESPACE {

View File

@ -10,18 +10,21 @@
#include "util/hash.h"
#include <cstring>
#include <type_traits>
#include <vector>
#include "test_util/testharness.h"
#include "util/coding.h"
#include "util/coding_lean.h"
#include "util/hash128.h"
#include "util/math.h"
#include "util/math128.h"
using ROCKSDB_NAMESPACE::BijectiveHash2x64;
using ROCKSDB_NAMESPACE::BijectiveUnhash2x64;
using ROCKSDB_NAMESPACE::DecodeFixed64;
using ROCKSDB_NAMESPACE::EncodeFixed32;
using ROCKSDB_NAMESPACE::EndianSwapValue;
using ROCKSDB_NAMESPACE::GetSliceHash64;
using ROCKSDB_NAMESPACE::Hash;
using ROCKSDB_NAMESPACE::Hash128;
@ -29,6 +32,7 @@ using ROCKSDB_NAMESPACE::Hash2x64;
using ROCKSDB_NAMESPACE::Hash64;
using ROCKSDB_NAMESPACE::Lower32of64;
using ROCKSDB_NAMESPACE::Lower64of128;
using ROCKSDB_NAMESPACE::ReverseBits;
using ROCKSDB_NAMESPACE::Slice;
using ROCKSDB_NAMESPACE::Unsigned128;
using ROCKSDB_NAMESPACE::Upper32of64;
@ -620,6 +624,18 @@ static void test_BitOps() {
EXPECT_EQ(BitParity(vm1), i & 1);
EXPECT_EQ(BitParity(vm1 & everyOtherBit), ((i + 1) / 2) & 1);
// EndianSwapValue
T ev = T{1} << (((sizeof(T) - 1 - (i / 8)) * 8) + i % 8);
EXPECT_EQ(EndianSwapValue(v), ev);
// ReverseBits
EXPECT_EQ(ReverseBits(v), static_cast<T>(T{1} << (8 * sizeof(T) - 1 - i)));
#ifdef HAVE_UINT128_EXTENSION // Uses multiplication
if (std::is_unsigned<T>::value) { // Technical UB on signed type
T rv = T{1} << (8 * sizeof(T) - 1 - i);
EXPECT_EQ(ReverseBits(vm1), static_cast<T>(rv * ~T{1}));
}
#endif
vm1 = (vm1 << 1) | 1;
}
}

View File

@ -185,4 +185,50 @@ inline int BitParity(T v) {
#endif
}
// Swaps between big and little endian. Can be used in combination with the
// little-endian encoding/decoding functions in coding_lean.h and coding.h to
// encode/decode big endian.
template <typename T>
inline T EndianSwapValue(T v) {
static_assert(std::is_integral<T>::value, "non-integral type");
#ifdef _MSC_VER
if (sizeof(T) == 2) {
return static_cast<T>(_byteswap_ushort(static_cast<uint16_t>(v)));
} else if (sizeof(T) == 4) {
return static_cast<T>(_byteswap_ulong(static_cast<uint32_t>(v)));
} else if (sizeof(T) == 8) {
return static_cast<T>(_byteswap_uint64(static_cast<uint64_t>(v)));
}
#else
if (sizeof(T) == 2) {
return static_cast<T>(__builtin_bswap16(static_cast<uint16_t>(v)));
} else if (sizeof(T) == 4) {
return static_cast<T>(__builtin_bswap32(static_cast<uint32_t>(v)));
} else if (sizeof(T) == 8) {
return static_cast<T>(__builtin_bswap64(static_cast<uint64_t>(v)));
}
#endif
// Recognized by clang as bswap, but not by gcc :(
T ret_val = 0;
for (std::size_t i = 0; i < sizeof(T); ++i) {
ret_val |= ((v >> (8 * i)) & 0xff) << (8 * (sizeof(T) - 1 - i));
}
return ret_val;
}
// Reverses the order of bits in an integral value
template <typename T>
inline T ReverseBits(T v) {
T r = EndianSwapValue(v);
const T kHighestByte = T{1} << ((sizeof(T) - 1) * 8);
const T kEveryByte = kHighestByte | (kHighestByte / 255);
r = ((r & (kEveryByte * 0x0f)) << 4) | ((r >> 4) & (kEveryByte * 0x0f));
r = ((r & (kEveryByte * 0x33)) << 2) | ((r >> 2) & (kEveryByte * 0x33));
r = ((r & (kEveryByte * 0x55)) << 1) | ((r >> 1) & (kEveryByte * 0x55));
return r;
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -218,6 +218,18 @@ inline int BitParity(Unsigned128 v) {
return BitParity(Lower64of128(v) ^ Upper64of128(v));
}
template <>
inline Unsigned128 EndianSwapValue(Unsigned128 v) {
return (Unsigned128{EndianSwapValue(Lower64of128(v))} << 64) |
EndianSwapValue(Upper64of128(v));
}
template <>
inline Unsigned128 ReverseBits(Unsigned128 v) {
return (Unsigned128{ReverseBits(Lower64of128(v))} << 64) |
ReverseBits(Upper64of128(v));
}
template <typename T>
struct IsUnsignedUpTo128
: std::integral_constant<bool, std::is_unsigned<T>::value ||

View File

@ -45,6 +45,7 @@
#include "util/channel.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/math.h"
#include "util/string_util.h"
#include "utilities/backupable/backupable_db_impl.h"
#include "utilities/checkpoint/checkpoint_impl.h"

View File

@ -3,6 +3,8 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "cache/cache_key.h"
#include "table/block_based/block_based_table_reader.h"
#ifndef ROCKSDB_LITE
#include "utilities/cache_dump_load_impl.h"
@ -33,8 +35,21 @@ Status CacheDumperImpl::SetDumpFilter(std::vector<DB*> db_list) {
return s;
}
for (auto id = ptc.begin(); id != ptc.end(); id++) {
assert(id->second->db_session_id.size() == 20);
prefix_filter_.insert(id->second->db_session_id);
OffsetableCacheKey base;
// We only want to save cache entries that are portable to another
// DB::Open, so only save entries with stable keys.
bool is_stable;
// WART: if the file is extremely large (> kMaxFileSizeStandardEncoding)
// then the prefix will be different. But this should not be a concern
// in practice because that limit is currently 4TB on a single file.
BlockBasedTable::SetupBaseCacheKey(
id->second.get(), /*cur_db_session_id*/ "", /*cur_file_num*/ 0,
/*file_size*/ 42, &base, &is_stable);
if (is_stable) {
Slice prefix_slice = base.CommonPrefixSlice();
assert(prefix_slice.size() == OffsetableCacheKey::kCommonPrefixSize);
prefix_filter_.insert(prefix_slice.ToString());
}
}
}
return s;
@ -82,16 +97,13 @@ IOStatus CacheDumperImpl::DumpCacheEntriesToWriter() {
// Check if we need to filter out the block based on its key
bool CacheDumperImpl::ShouldFilterOut(const Slice& key) {
// Since now we use db_session_id as the prefix, the prefix size is 20. If
// Anything changes in the future, we need to update it here.
bool filter_out = true;
size_t prefix_size = 20;
Slice key_prefix(key.data(), prefix_size);
std::string prefix = key_prefix.ToString();
if (prefix_filter_.find(prefix) != prefix_filter_.end()) {
filter_out = false;
if (key.size() < OffsetableCacheKey::kCommonPrefixSize) {
return /*filter out*/ true;
}
return filter_out;
Slice key_prefix(key.data(), OffsetableCacheKey::kCommonPrefixSize);
std::string prefix = key_prefix.ToString();
// Filter out if not found
return prefix_filter_.find(prefix) == prefix_filter_.end();
}
// This is the callback function which will be applied to