2016-02-09 23:12:00 +00:00
|
|
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
2017-07-15 23:03:42 +00:00
|
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
|
|
// (found in the LICENSE.Apache file in the root directory).
|
Add experimental API MarkForCompaction()
Summary:
Some Mongo+Rocks datasets in Parse's environment are not doing compactions very frequently. During the quiet period (with no IO), we'd like to schedule compactions so that our reads become faster. Also, aggressively compacting during quiet periods helps when write bursts happen. In addition, we also want to compact files that are containing deleted key ranges (like old oplog keys).
All of this is currently not possible with CompactRange() because it's single-threaded and blocks all other compactions from happening. Running CompactRange() risks an issue of blocking writes because we generate too much Level 0 files before the compaction is over. Stopping writes is very dangerous because they hold transaction locks. We tried running manual compaction once on Mongo+Rocks and everything fell apart.
MarkForCompaction() solves all of those problems. This is very light-weight manual compaction. It is lower priority than automatic compactions, which means it shouldn't interfere with background process keeping the LSM tree clean. However, if no automatic compactions need to be run (or we have extra background threads available), we will start compacting files that are marked for compaction.
Test Plan: added a new unit test
Reviewers: yhchiang, rven, MarkCallaghan, sdong
Reviewed By: sdong
Subscribers: yoshinorim, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D37083
2015-04-17 23:44:45 +00:00
|
|
|
|
|
|
|
#include "rocksdb/experimental.h"
|
|
|
|
|
KeySegmentsExtractor and prototype higher-dimensional filtering (#12075)
Summary:
This change contains a prototype new API for "higher dimensional" filtering of read queries. Existing filters treat keys as one-dimensional, either as distinct points (whole key) or as contiguous ranges in comparator order (prefix filters). The proposed KeySegmentsExtractor allows treating keys as multi-dimensional for filtering purposes even though they still have a single total order across dimensions. For example, consider these keys in different LSM levels:
L0:
abc_0123
abc_0150
def_0114
ghi_0134
L1:
abc_0045
bcd_0091
def_0077
xyz_0080
If we get a range query for [def_0100, def_0200), a prefix filter (up to the underscore) will tell us that both levels are potentially relevant. However, if each SST file stores a simple range of the values for the second segment of the key, we would see that L1 only has [0045, 0091] which (under certain required assumptions) we are sure does not overlap with the given range query. Thus, we can filter out processing or reading any index or data blocks from L1 for the query.
This kind of case shows up with time-ordered data but is more general than filtering based on user timestamp. See https://github.com/facebook/rocksdb/issues/11332 . Here the "time" segments of the keys are meaningfully ordered with respect to each other even when the previous segment is different, so summarizing data along an alternate dimension of the key like this can work well for filtering.
This prototype implementation simply leverages existing APIs for user table properties and table filtering, which is not very CPU efficient. Eventually, we expect to create a native implementation. However, I have put some significant
thought and engineering into the new APIs overall, which I expect to be close to refined enough for production.
For details, see new public APIs in experimental.h. For a detailed example, see the new unit test in db_bloom_filter_test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12075
Test Plan: Unit test included
Reviewed By: jowlyzhang
Differential Revision: D53619406
Pulled By: pdillinger
fbshipit-source-id: 9e6e7b82b4db8d815db76a6ab340e90db2c191f2
2024-02-15 23:39:55 +00:00
|
|
|
#include <cstddef>
|
|
|
|
#include <cstdint>
|
|
|
|
#include <functional>
|
|
|
|
#include <memory>
|
|
|
|
#include <string>
|
|
|
|
#include <vector>
|
|
|
|
|
2019-05-31 18:52:59 +00:00
|
|
|
#include "db/db_impl/db_impl.h"
|
2022-03-18 23:35:51 +00:00
|
|
|
#include "db/version_util.h"
|
|
|
|
#include "logging/logging.h"
|
KeySegmentsExtractor and prototype higher-dimensional filtering (#12075)
Summary:
This change contains a prototype new API for "higher dimensional" filtering of read queries. Existing filters treat keys as one-dimensional, either as distinct points (whole key) or as contiguous ranges in comparator order (prefix filters). The proposed KeySegmentsExtractor allows treating keys as multi-dimensional for filtering purposes even though they still have a single total order across dimensions. For example, consider these keys in different LSM levels:
L0:
abc_0123
abc_0150
def_0114
ghi_0134
L1:
abc_0045
bcd_0091
def_0077
xyz_0080
If we get a range query for [def_0100, def_0200), a prefix filter (up to the underscore) will tell us that both levels are potentially relevant. However, if each SST file stores a simple range of the values for the second segment of the key, we would see that L1 only has [0045, 0091] which (under certain required assumptions) we are sure does not overlap with the given range query. Thus, we can filter out processing or reading any index or data blocks from L1 for the query.
This kind of case shows up with time-ordered data but is more general than filtering based on user timestamp. See https://github.com/facebook/rocksdb/issues/11332 . Here the "time" segments of the keys are meaningfully ordered with respect to each other even when the previous segment is different, so summarizing data along an alternate dimension of the key like this can work well for filtering.
This prototype implementation simply leverages existing APIs for user table properties and table filtering, which is not very CPU efficient. Eventually, we expect to create a native implementation. However, I have put some significant
thought and engineering into the new APIs overall, which I expect to be close to refined enough for production.
For details, see new public APIs in experimental.h. For a detailed example, see the new unit test in db_bloom_filter_test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12075
Test Plan: Unit test included
Reviewed By: jowlyzhang
Differential Revision: D53619406
Pulled By: pdillinger
fbshipit-source-id: 9e6e7b82b4db8d815db76a6ab340e90db2c191f2
2024-02-15 23:39:55 +00:00
|
|
|
#include "util/atomic.h"
|
Add experimental API MarkForCompaction()
Summary:
Some Mongo+Rocks datasets in Parse's environment are not doing compactions very frequently. During the quiet period (with no IO), we'd like to schedule compactions so that our reads become faster. Also, aggressively compacting during quiet periods helps when write bursts happen. In addition, we also want to compact files that are containing deleted key ranges (like old oplog keys).
All of this is currently not possible with CompactRange() because it's single-threaded and blocks all other compactions from happening. Running CompactRange() risks an issue of blocking writes because we generate too much Level 0 files before the compaction is over. Stopping writes is very dangerous because they hold transaction locks. We tried running manual compaction once on Mongo+Rocks and everything fell apart.
MarkForCompaction() solves all of those problems. This is very light-weight manual compaction. It is lower priority than automatic compactions, which means it shouldn't interfere with background process keeping the LSM tree clean. However, if no automatic compactions need to be run (or we have extra background threads available), we will start compacting files that are marked for compaction.
Test Plan: added a new unit test
Reviewers: yhchiang, rven, MarkCallaghan, sdong
Reviewed By: sdong
Subscribers: yoshinorim, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D37083
2015-04-17 23:44:45 +00:00
|
|
|
|
2024-03-04 18:08:32 +00:00
|
|
|
namespace ROCKSDB_NAMESPACE::experimental {
|
Add experimental API MarkForCompaction()
Summary:
Some Mongo+Rocks datasets in Parse's environment are not doing compactions very frequently. During the quiet period (with no IO), we'd like to schedule compactions so that our reads become faster. Also, aggressively compacting during quiet periods helps when write bursts happen. In addition, we also want to compact files that are containing deleted key ranges (like old oplog keys).
All of this is currently not possible with CompactRange() because it's single-threaded and blocks all other compactions from happening. Running CompactRange() risks an issue of blocking writes because we generate too much Level 0 files before the compaction is over. Stopping writes is very dangerous because they hold transaction locks. We tried running manual compaction once on Mongo+Rocks and everything fell apart.
MarkForCompaction() solves all of those problems. This is very light-weight manual compaction. It is lower priority than automatic compactions, which means it shouldn't interfere with background process keeping the LSM tree clean. However, if no automatic compactions need to be run (or we have extra background threads available), we will start compacting files that are marked for compaction.
Test Plan: added a new unit test
Reviewers: yhchiang, rven, MarkCallaghan, sdong
Reviewed By: sdong
Subscribers: yoshinorim, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D37083
2015-04-17 23:44:45 +00:00
|
|
|
|
|
|
|
Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family,
|
|
|
|
const Slice* begin, const Slice* end) {
|
2017-07-28 23:23:50 +00:00
|
|
|
if (db == nullptr) {
|
|
|
|
return Status::InvalidArgument("DB is empty");
|
Add experimental API MarkForCompaction()
Summary:
Some Mongo+Rocks datasets in Parse's environment are not doing compactions very frequently. During the quiet period (with no IO), we'd like to schedule compactions so that our reads become faster. Also, aggressively compacting during quiet periods helps when write bursts happen. In addition, we also want to compact files that are containing deleted key ranges (like old oplog keys).
All of this is currently not possible with CompactRange() because it's single-threaded and blocks all other compactions from happening. Running CompactRange() risks an issue of blocking writes because we generate too much Level 0 files before the compaction is over. Stopping writes is very dangerous because they hold transaction locks. We tried running manual compaction once on Mongo+Rocks and everything fell apart.
MarkForCompaction() solves all of those problems. This is very light-weight manual compaction. It is lower priority than automatic compactions, which means it shouldn't interfere with background process keeping the LSM tree clean. However, if no automatic compactions need to be run (or we have extra background threads available), we will start compacting files that are marked for compaction.
Test Plan: added a new unit test
Reviewers: yhchiang, rven, MarkCallaghan, sdong
Reviewed By: sdong
Subscribers: yoshinorim, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D37083
2015-04-17 23:44:45 +00:00
|
|
|
}
|
Implement DB::PromoteL0 method
Summary:
This diff implements a new `DB` method `PromoteL0` which moves all files in L0
to a given level skipping compaction, provided that the files have disjoint
ranges and all levels up to the target level are empty.
This method provides finer-grain control for trivial compactions, and it is
useful for bulk-loading pre-sorted keys. Compared to D34797, it does not change
the semantics of an existing operation, which can impact existing code.
PromoteL0 is designed to work well in combination with the proposed
`GetSstFileWriter`/`AddFile` interface, enabling to "design" the level structure
by populating one level at a time. Such fine-grained control can be very useful
for static or mostly-static databases.
Test Plan: `make check`
Reviewers: IslamAbdelRahman, philipp, MarkCallaghan, yhchiang, igor, sdong
Reviewed By: sdong
Subscribers: dhruba
Differential Revision: https://reviews.facebook.net/D37107
2015-04-23 19:10:36 +00:00
|
|
|
|
2017-07-28 23:23:50 +00:00
|
|
|
return db->SuggestCompactRange(column_family, begin, end);
|
Add experimental API MarkForCompaction()
Summary:
Some Mongo+Rocks datasets in Parse's environment are not doing compactions very frequently. During the quiet period (with no IO), we'd like to schedule compactions so that our reads become faster. Also, aggressively compacting during quiet periods helps when write bursts happen. In addition, we also want to compact files that are containing deleted key ranges (like old oplog keys).
All of this is currently not possible with CompactRange() because it's single-threaded and blocks all other compactions from happening. Running CompactRange() risks an issue of blocking writes because we generate too much Level 0 files before the compaction is over. Stopping writes is very dangerous because they hold transaction locks. We tried running manual compaction once on Mongo+Rocks and everything fell apart.
MarkForCompaction() solves all of those problems. This is very light-weight manual compaction. It is lower priority than automatic compactions, which means it shouldn't interfere with background process keeping the LSM tree clean. However, if no automatic compactions need to be run (or we have extra background threads available), we will start compacting files that are marked for compaction.
Test Plan: added a new unit test
Reviewers: yhchiang, rven, MarkCallaghan, sdong
Reviewed By: sdong
Subscribers: yoshinorim, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D37083
2015-04-17 23:44:45 +00:00
|
|
|
}
|
|
|
|
|
Implement DB::PromoteL0 method
Summary:
This diff implements a new `DB` method `PromoteL0` which moves all files in L0
to a given level skipping compaction, provided that the files have disjoint
ranges and all levels up to the target level are empty.
This method provides finer-grain control for trivial compactions, and it is
useful for bulk-loading pre-sorted keys. Compared to D34797, it does not change
the semantics of an existing operation, which can impact existing code.
PromoteL0 is designed to work well in combination with the proposed
`GetSstFileWriter`/`AddFile` interface, enabling to "design" the level structure
by populating one level at a time. Such fine-grained control can be very useful
for static or mostly-static databases.
Test Plan: `make check`
Reviewers: IslamAbdelRahman, philipp, MarkCallaghan, yhchiang, igor, sdong
Reviewed By: sdong
Subscribers: dhruba
Differential Revision: https://reviews.facebook.net/D37107
2015-04-23 19:10:36 +00:00
|
|
|
Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, int target_level) {
|
2017-07-28 23:23:50 +00:00
|
|
|
if (db == nullptr) {
|
Implement DB::PromoteL0 method
Summary:
This diff implements a new `DB` method `PromoteL0` which moves all files in L0
to a given level skipping compaction, provided that the files have disjoint
ranges and all levels up to the target level are empty.
This method provides finer-grain control for trivial compactions, and it is
useful for bulk-loading pre-sorted keys. Compared to D34797, it does not change
the semantics of an existing operation, which can impact existing code.
PromoteL0 is designed to work well in combination with the proposed
`GetSstFileWriter`/`AddFile` interface, enabling to "design" the level structure
by populating one level at a time. Such fine-grained control can be very useful
for static or mostly-static databases.
Test Plan: `make check`
Reviewers: IslamAbdelRahman, philipp, MarkCallaghan, yhchiang, igor, sdong
Reviewed By: sdong
Subscribers: dhruba
Differential Revision: https://reviews.facebook.net/D37107
2015-04-23 19:10:36 +00:00
|
|
|
return Status::InvalidArgument("Didn't recognize DB object");
|
|
|
|
}
|
2017-07-28 23:23:50 +00:00
|
|
|
return db->PromoteL0(column_family, target_level);
|
Implement DB::PromoteL0 method
Summary:
This diff implements a new `DB` method `PromoteL0` which moves all files in L0
to a given level skipping compaction, provided that the files have disjoint
ranges and all levels up to the target level are empty.
This method provides finer-grain control for trivial compactions, and it is
useful for bulk-loading pre-sorted keys. Compared to D34797, it does not change
the semantics of an existing operation, which can impact existing code.
PromoteL0 is designed to work well in combination with the proposed
`GetSstFileWriter`/`AddFile` interface, enabling to "design" the level structure
by populating one level at a time. Such fine-grained control can be very useful
for static or mostly-static databases.
Test Plan: `make check`
Reviewers: IslamAbdelRahman, philipp, MarkCallaghan, yhchiang, igor, sdong
Reviewed By: sdong
Subscribers: dhruba
Differential Revision: https://reviews.facebook.net/D37107
2015-04-23 19:10:36 +00:00
|
|
|
}
|
|
|
|
|
Add experimental API MarkForCompaction()
Summary:
Some Mongo+Rocks datasets in Parse's environment are not doing compactions very frequently. During the quiet period (with no IO), we'd like to schedule compactions so that our reads become faster. Also, aggressively compacting during quiet periods helps when write bursts happen. In addition, we also want to compact files that are containing deleted key ranges (like old oplog keys).
All of this is currently not possible with CompactRange() because it's single-threaded and blocks all other compactions from happening. Running CompactRange() risks an issue of blocking writes because we generate too much Level 0 files before the compaction is over. Stopping writes is very dangerous because they hold transaction locks. We tried running manual compaction once on Mongo+Rocks and everything fell apart.
MarkForCompaction() solves all of those problems. This is very light-weight manual compaction. It is lower priority than automatic compactions, which means it shouldn't interfere with background process keeping the LSM tree clean. However, if no automatic compactions need to be run (or we have extra background threads available), we will start compacting files that are marked for compaction.
Test Plan: added a new unit test
Reviewers: yhchiang, rven, MarkCallaghan, sdong
Reviewed By: sdong
Subscribers: yoshinorim, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D37083
2015-04-17 23:44:45 +00:00
|
|
|
|
|
|
|
Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end) {
|
|
|
|
return SuggestCompactRange(db, db->DefaultColumnFamily(), begin, end);
|
|
|
|
}
|
|
|
|
|
2022-03-18 23:35:51 +00:00
|
|
|
Status UpdateManifestForFilesState(
|
|
|
|
const DBOptions& db_opts, const std::string& db_name,
|
|
|
|
const std::vector<ColumnFamilyDescriptor>& column_families,
|
|
|
|
const UpdateManifestForFilesStateOptions& opts) {
|
Group SST write in flush, compaction and db open with new stats (#11910)
Summary:
## Context/Summary
Similar to https://github.com/facebook/rocksdb/pull/11288, https://github.com/facebook/rocksdb/pull/11444, categorizing SST/blob file write according to different io activities allows more insight into the activity.
For that, this PR does the following:
- Tag different write IOs by passing down and converting WriteOptions to IOOptions
- Add new SST_WRITE_MICROS histogram in WritableFileWriter::Append() and breakdown FILE_WRITE_{FLUSH|COMPACTION|DB_OPEN}_MICROS
Some related code refactory to make implementation cleaner:
- Blob stats
- Replace high-level write measurement with low-level WritableFileWriter::Append() measurement for BLOB_DB_BLOB_FILE_WRITE_MICROS. This is to make FILE_WRITE_{FLUSH|COMPACTION|DB_OPEN}_MICROS include blob file. As a consequence, this introduces some behavioral changes on it, see HISTORY and db bench test plan below for more info.
- Fix bugs where BLOB_DB_BLOB_FILE_SYNCED/BLOB_DB_BLOB_FILE_BYTES_WRITTEN include file failed to sync and bytes failed to write.
- Refactor WriteOptions constructor for easier construction with io_activity and rate_limiter_priority
- Refactor DBImpl::~DBImpl()/BlobDBImpl::Close() to bypass thread op verification
- Build table
- TableBuilderOptions now includes Read/WriteOpitons so BuildTable() do not need to take these two variables
- Replace the io_priority passed into BuildTable() with TableBuilderOptions::WriteOpitons::rate_limiter_priority. Similar for BlobFileBuilder.
This parameter is used for dynamically changing file io priority for flush, see https://github.com/facebook/rocksdb/pull/9988?fbclid=IwAR1DtKel6c-bRJAdesGo0jsbztRtciByNlvokbxkV6h_L-AE9MACzqRTT5s for more
- Update ThreadStatus::FLUSH_BYTES_WRITTEN to use io_activity to track flush IO in flush job and db open instead of io_priority
## Test
### db bench
Flush
```
./db_bench --statistics=1 --benchmarks=fillseq --num=100000 --write_buffer_size=100
rocksdb.sst.write.micros P50 : 1.830863 P95 : 4.094720 P99 : 6.578947 P100 : 26.000000 COUNT : 7875 SUM : 20377
rocksdb.file.write.flush.micros P50 : 1.830863 P95 : 4.094720 P99 : 6.578947 P100 : 26.000000 COUNT : 7875 SUM : 20377
rocksdb.file.write.compaction.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0
rocksdb.file.write.db.open.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0
```
compaction, db oopen
```
Setup: ./db_bench --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench
Run:./db_bench --statistics=1 --benchmarks=compact --db=../db_bench --use_existing_db=1
rocksdb.sst.write.micros P50 : 2.675325 P95 : 9.578788 P99 : 18.780000 P100 : 314.000000 COUNT : 638 SUM : 3279
rocksdb.file.write.flush.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0
rocksdb.file.write.compaction.micros P50 : 2.757353 P95 : 9.610687 P99 : 19.316667 P100 : 314.000000 COUNT : 615 SUM : 3213
rocksdb.file.write.db.open.micros P50 : 2.055556 P95 : 3.925000 P99 : 9.000000 P100 : 9.000000 COUNT : 23 SUM : 66
```
blob stats - just to make sure they aren't broken by this PR
```
Integrated Blob DB
Setup: ./db_bench --enable_blob_files=1 --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench
Run:./db_bench --enable_blob_files=1 --statistics=1 --benchmarks=compact --db=../db_bench --use_existing_db=1
pre-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 7.298246 P95 : 9.771930 P99 : 9.991813 P100 : 16.000000 COUNT : 235 SUM : 1600
rocksdb.blobdb.blob.file.synced COUNT : 1
rocksdb.blobdb.blob.file.bytes.written COUNT : 34842
post-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 2.000000 P95 : 2.829360 P99 : 2.993779 P100 : 9.000000 COUNT : 707 SUM : 1614
- COUNT is higher and values are smaller as it includes header and footer write
- COUNT is 3X higher due to each Append() count as one post-PR, while in pre-PR, 3 Append()s counts as one. See https://github.com/facebook/rocksdb/pull/11910/files#diff-32b811c0a1c000768cfb2532052b44dc0b3bf82253f3eab078e15ff201a0dabfL157-L164
rocksdb.blobdb.blob.file.synced COUNT : 1 (stay the same)
rocksdb.blobdb.blob.file.bytes.written COUNT : 34842 (stay the same)
```
```
Stacked Blob DB
Run: ./db_bench --use_blob_db=1 --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench
pre-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 12.808042 P95 : 19.674497 P99 : 28.539683 P100 : 51.000000 COUNT : 10000 SUM : 140876
rocksdb.blobdb.blob.file.synced COUNT : 8
rocksdb.blobdb.blob.file.bytes.written COUNT : 1043445
post-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 1.657370 P95 : 2.952175 P99 : 3.877519 P100 : 24.000000 COUNT : 30001 SUM : 67924
- COUNT is higher and values are smaller as it includes header and footer write
- COUNT is 3X higher due to each Append() count as one post-PR, while in pre-PR, 3 Append()s counts as one. See https://github.com/facebook/rocksdb/pull/11910/files#diff-32b811c0a1c000768cfb2532052b44dc0b3bf82253f3eab078e15ff201a0dabfL157-L164
rocksdb.blobdb.blob.file.synced COUNT : 8 (stay the same)
rocksdb.blobdb.blob.file.bytes.written COUNT : 1043445 (stay the same)
```
### Rehearsal CI stress test
Trigger 3 full runs of all our CI stress tests
### Performance
Flush
```
TEST_TMPDIR=/dev/shm ./db_basic_bench_pre_pr --benchmark_filter=ManualFlush/key_num:524288/per_key_size:256 --benchmark_repetitions=1000
-- default: 1 thread is used to run benchmark; enable_statistics = true
Pre-pr: avg 507515519.3 ns
497686074,499444327,500862543,501389862,502994471,503744435,504142123,504224056,505724198,506610393,506837742,506955122,507695561,507929036,508307733,508312691,508999120,509963561,510142147,510698091,510743096,510769317,510957074,511053311,511371367,511409911,511432960,511642385,511691964,511730908,
Post-pr: avg 511971266.5 ns, regressed 0.88%
502744835,506502498,507735420,507929724,508313335,509548582,509994942,510107257,510715603,511046955,511352639,511458478,512117521,512317380,512766303,512972652,513059586,513804934,513808980,514059409,514187369,514389494,514447762,514616464,514622882,514641763,514666265,514716377,514990179,515502408,
```
Compaction
```
TEST_TMPDIR=/dev/shm ./db_basic_bench_{pre|post}_pr --benchmark_filter=ManualCompaction/comp_style:0/max_data:134217728/per_key_size:256/enable_statistics:1 --benchmark_repetitions=1000
-- default: 1 thread is used to run benchmark
Pre-pr: avg 495346098.30 ns
492118301,493203526,494201411,494336607,495269217,495404950,496402598,497012157,497358370,498153846
Post-pr: avg 504528077.20, regressed 1.85%. "ManualCompaction" include flush so the isolated regression for compaction should be around 1.85-0.88 = 0.97%
502465338,502485945,502541789,502909283,503438601,504143885,506113087,506629423,507160414,507393007
```
Put with WAL (in case passing WriteOptions slows down this path even without collecting SST write stats)
```
TEST_TMPDIR=/dev/shm ./db_basic_bench_pre_pr --benchmark_filter=DBPut/comp_style:0/max_data:107374182400/per_key_size:256/enable_statistics:1/wal:1 --benchmark_repetitions=1000
-- default: 1 thread is used to run benchmark
Pre-pr: avg 3848.10 ns
3814,3838,3839,3848,3854,3854,3854,3860,3860,3860
Post-pr: avg 3874.20 ns, regressed 0.68%
3863,3867,3871,3874,3875,3877,3877,3877,3880,3881
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/11910
Reviewed By: ajkr
Differential Revision: D49788060
Pulled By: hx235
fbshipit-source-id: 79e73699cda5be3b66461687e5147c2484fc5eff
2023-12-29 23:29:23 +00:00
|
|
|
// TODO: plumb Env::IOActivity, Env::IOPriority
|
2023-04-21 16:07:18 +00:00
|
|
|
const ReadOptions read_options;
|
Group SST write in flush, compaction and db open with new stats (#11910)
Summary:
## Context/Summary
Similar to https://github.com/facebook/rocksdb/pull/11288, https://github.com/facebook/rocksdb/pull/11444, categorizing SST/blob file write according to different io activities allows more insight into the activity.
For that, this PR does the following:
- Tag different write IOs by passing down and converting WriteOptions to IOOptions
- Add new SST_WRITE_MICROS histogram in WritableFileWriter::Append() and breakdown FILE_WRITE_{FLUSH|COMPACTION|DB_OPEN}_MICROS
Some related code refactory to make implementation cleaner:
- Blob stats
- Replace high-level write measurement with low-level WritableFileWriter::Append() measurement for BLOB_DB_BLOB_FILE_WRITE_MICROS. This is to make FILE_WRITE_{FLUSH|COMPACTION|DB_OPEN}_MICROS include blob file. As a consequence, this introduces some behavioral changes on it, see HISTORY and db bench test plan below for more info.
- Fix bugs where BLOB_DB_BLOB_FILE_SYNCED/BLOB_DB_BLOB_FILE_BYTES_WRITTEN include file failed to sync and bytes failed to write.
- Refactor WriteOptions constructor for easier construction with io_activity and rate_limiter_priority
- Refactor DBImpl::~DBImpl()/BlobDBImpl::Close() to bypass thread op verification
- Build table
- TableBuilderOptions now includes Read/WriteOpitons so BuildTable() do not need to take these two variables
- Replace the io_priority passed into BuildTable() with TableBuilderOptions::WriteOpitons::rate_limiter_priority. Similar for BlobFileBuilder.
This parameter is used for dynamically changing file io priority for flush, see https://github.com/facebook/rocksdb/pull/9988?fbclid=IwAR1DtKel6c-bRJAdesGo0jsbztRtciByNlvokbxkV6h_L-AE9MACzqRTT5s for more
- Update ThreadStatus::FLUSH_BYTES_WRITTEN to use io_activity to track flush IO in flush job and db open instead of io_priority
## Test
### db bench
Flush
```
./db_bench --statistics=1 --benchmarks=fillseq --num=100000 --write_buffer_size=100
rocksdb.sst.write.micros P50 : 1.830863 P95 : 4.094720 P99 : 6.578947 P100 : 26.000000 COUNT : 7875 SUM : 20377
rocksdb.file.write.flush.micros P50 : 1.830863 P95 : 4.094720 P99 : 6.578947 P100 : 26.000000 COUNT : 7875 SUM : 20377
rocksdb.file.write.compaction.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0
rocksdb.file.write.db.open.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0
```
compaction, db oopen
```
Setup: ./db_bench --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench
Run:./db_bench --statistics=1 --benchmarks=compact --db=../db_bench --use_existing_db=1
rocksdb.sst.write.micros P50 : 2.675325 P95 : 9.578788 P99 : 18.780000 P100 : 314.000000 COUNT : 638 SUM : 3279
rocksdb.file.write.flush.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0
rocksdb.file.write.compaction.micros P50 : 2.757353 P95 : 9.610687 P99 : 19.316667 P100 : 314.000000 COUNT : 615 SUM : 3213
rocksdb.file.write.db.open.micros P50 : 2.055556 P95 : 3.925000 P99 : 9.000000 P100 : 9.000000 COUNT : 23 SUM : 66
```
blob stats - just to make sure they aren't broken by this PR
```
Integrated Blob DB
Setup: ./db_bench --enable_blob_files=1 --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench
Run:./db_bench --enable_blob_files=1 --statistics=1 --benchmarks=compact --db=../db_bench --use_existing_db=1
pre-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 7.298246 P95 : 9.771930 P99 : 9.991813 P100 : 16.000000 COUNT : 235 SUM : 1600
rocksdb.blobdb.blob.file.synced COUNT : 1
rocksdb.blobdb.blob.file.bytes.written COUNT : 34842
post-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 2.000000 P95 : 2.829360 P99 : 2.993779 P100 : 9.000000 COUNT : 707 SUM : 1614
- COUNT is higher and values are smaller as it includes header and footer write
- COUNT is 3X higher due to each Append() count as one post-PR, while in pre-PR, 3 Append()s counts as one. See https://github.com/facebook/rocksdb/pull/11910/files#diff-32b811c0a1c000768cfb2532052b44dc0b3bf82253f3eab078e15ff201a0dabfL157-L164
rocksdb.blobdb.blob.file.synced COUNT : 1 (stay the same)
rocksdb.blobdb.blob.file.bytes.written COUNT : 34842 (stay the same)
```
```
Stacked Blob DB
Run: ./db_bench --use_blob_db=1 --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench
pre-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 12.808042 P95 : 19.674497 P99 : 28.539683 P100 : 51.000000 COUNT : 10000 SUM : 140876
rocksdb.blobdb.blob.file.synced COUNT : 8
rocksdb.blobdb.blob.file.bytes.written COUNT : 1043445
post-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 1.657370 P95 : 2.952175 P99 : 3.877519 P100 : 24.000000 COUNT : 30001 SUM : 67924
- COUNT is higher and values are smaller as it includes header and footer write
- COUNT is 3X higher due to each Append() count as one post-PR, while in pre-PR, 3 Append()s counts as one. See https://github.com/facebook/rocksdb/pull/11910/files#diff-32b811c0a1c000768cfb2532052b44dc0b3bf82253f3eab078e15ff201a0dabfL157-L164
rocksdb.blobdb.blob.file.synced COUNT : 8 (stay the same)
rocksdb.blobdb.blob.file.bytes.written COUNT : 1043445 (stay the same)
```
### Rehearsal CI stress test
Trigger 3 full runs of all our CI stress tests
### Performance
Flush
```
TEST_TMPDIR=/dev/shm ./db_basic_bench_pre_pr --benchmark_filter=ManualFlush/key_num:524288/per_key_size:256 --benchmark_repetitions=1000
-- default: 1 thread is used to run benchmark; enable_statistics = true
Pre-pr: avg 507515519.3 ns
497686074,499444327,500862543,501389862,502994471,503744435,504142123,504224056,505724198,506610393,506837742,506955122,507695561,507929036,508307733,508312691,508999120,509963561,510142147,510698091,510743096,510769317,510957074,511053311,511371367,511409911,511432960,511642385,511691964,511730908,
Post-pr: avg 511971266.5 ns, regressed 0.88%
502744835,506502498,507735420,507929724,508313335,509548582,509994942,510107257,510715603,511046955,511352639,511458478,512117521,512317380,512766303,512972652,513059586,513804934,513808980,514059409,514187369,514389494,514447762,514616464,514622882,514641763,514666265,514716377,514990179,515502408,
```
Compaction
```
TEST_TMPDIR=/dev/shm ./db_basic_bench_{pre|post}_pr --benchmark_filter=ManualCompaction/comp_style:0/max_data:134217728/per_key_size:256/enable_statistics:1 --benchmark_repetitions=1000
-- default: 1 thread is used to run benchmark
Pre-pr: avg 495346098.30 ns
492118301,493203526,494201411,494336607,495269217,495404950,496402598,497012157,497358370,498153846
Post-pr: avg 504528077.20, regressed 1.85%. "ManualCompaction" include flush so the isolated regression for compaction should be around 1.85-0.88 = 0.97%
502465338,502485945,502541789,502909283,503438601,504143885,506113087,506629423,507160414,507393007
```
Put with WAL (in case passing WriteOptions slows down this path even without collecting SST write stats)
```
TEST_TMPDIR=/dev/shm ./db_basic_bench_pre_pr --benchmark_filter=DBPut/comp_style:0/max_data:107374182400/per_key_size:256/enable_statistics:1/wal:1 --benchmark_repetitions=1000
-- default: 1 thread is used to run benchmark
Pre-pr: avg 3848.10 ns
3814,3838,3839,3848,3854,3854,3854,3860,3860,3860
Post-pr: avg 3874.20 ns, regressed 0.68%
3863,3867,3871,3874,3875,3877,3877,3877,3880,3881
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/11910
Reviewed By: ajkr
Differential Revision: D49788060
Pulled By: hx235
fbshipit-source-id: 79e73699cda5be3b66461687e5147c2484fc5eff
2023-12-29 23:29:23 +00:00
|
|
|
const WriteOptions write_options;
|
2022-03-18 23:35:51 +00:00
|
|
|
OfflineManifestWriter w(db_opts, db_name);
|
|
|
|
Status s = w.Recover(column_families);
|
|
|
|
|
|
|
|
size_t files_updated = 0;
|
|
|
|
size_t cfs_updated = 0;
|
|
|
|
auto fs = db_opts.env->GetFileSystem();
|
|
|
|
|
|
|
|
for (auto cfd : *w.Versions().GetColumnFamilySet()) {
|
|
|
|
if (!s.ok()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
assert(cfd);
|
|
|
|
|
|
|
|
if (cfd->IsDropped() || !cfd->initialized()) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
const auto* current = cfd->current();
|
|
|
|
assert(current);
|
|
|
|
|
|
|
|
const auto* vstorage = current->storage_info();
|
|
|
|
assert(vstorage);
|
|
|
|
|
|
|
|
VersionEdit edit;
|
|
|
|
edit.SetColumnFamily(cfd->GetID());
|
|
|
|
|
|
|
|
/* SST files */
|
|
|
|
for (int level = 0; level < cfd->NumberLevels(); level++) {
|
|
|
|
if (!s.ok()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
const auto& level_files = vstorage->LevelFiles(level);
|
|
|
|
|
|
|
|
for (const auto& lf : level_files) {
|
|
|
|
assert(lf);
|
|
|
|
|
|
|
|
uint64_t number = lf->fd.GetNumber();
|
|
|
|
std::string fname =
|
|
|
|
TableFileName(w.IOptions().db_paths, number, lf->fd.GetPathId());
|
|
|
|
|
|
|
|
std::unique_ptr<FSSequentialFile> f;
|
|
|
|
FileOptions fopts;
|
2022-10-11 00:59:17 +00:00
|
|
|
// Use kUnknown to signal the FileSystem to search all tiers for the
|
|
|
|
// file.
|
|
|
|
fopts.temperature = Temperature::kUnknown;
|
2022-03-18 23:35:51 +00:00
|
|
|
|
|
|
|
IOStatus file_ios =
|
|
|
|
fs->NewSequentialFile(fname, fopts, &f, /*dbg*/ nullptr);
|
|
|
|
if (file_ios.ok()) {
|
|
|
|
if (opts.update_temperatures) {
|
|
|
|
Temperature temp = f->GetTemperature();
|
|
|
|
if (temp != Temperature::kUnknown && temp != lf->temperature) {
|
|
|
|
// Current state inconsistent with manifest
|
|
|
|
++files_updated;
|
|
|
|
edit.DeleteFile(level, number);
|
2022-08-02 00:56:13 +00:00
|
|
|
edit.AddFile(
|
|
|
|
level, number, lf->fd.GetPathId(), lf->fd.GetFileSize(),
|
|
|
|
lf->smallest, lf->largest, lf->fd.smallest_seqno,
|
|
|
|
lf->fd.largest_seqno, lf->marked_for_compaction, temp,
|
|
|
|
lf->oldest_blob_file_number, lf->oldest_ancester_time,
|
Sort L0 files by newly introduced epoch_num (#10922)
Summary:
**Context:**
Sorting L0 files by `largest_seqno` has at least two inconvenience:
- File ingestion and compaction involving ingested files can create files of overlapping seqno range with the existing files. `force_consistency_check=true` will catch such overlap seqno range even those harmless overlap.
- For example, consider the following sequence of events ("key@n" indicates key at seqno "n")
- insert k1@1 to memtable m1
- ingest file s1 with k2@2, ingest file s2 with k3@3
- insert k4@4 to m1
- compact files s1, s2 and result in new file s3 of seqno range [2, 3]
- flush m1 and result in new file s4 of seqno range [1, 4]. And `force_consistency_check=true` will think s4 and s3 has file reordering corruption that might cause retuning an old value of k1
- However such caught corruption is a false positive since s1, s2 will not have overlapped keys with k1 or whatever inserted into m1 before ingest file s1 by the requirement of file ingestion (otherwise the m1 will be flushed first before any of the file ingestion completes). Therefore there in fact isn't any file reordering corruption.
- Single delete can decrease a file's largest seqno and ordering by `largest_seqno` can introduce a wrong ordering hence file reordering corruption
- For example, consider the following sequence of events ("key@n" indicates key at seqno "n", Credit to ajkr for this example)
- an existing SST s1 contains only k1@1
- insert k1@2 to memtable m1
- ingest file s2 with k3@3, ingest file s3 with k4@4
- insert single delete k5@5 in m1
- flush m1 and result in new file s4 of seqno range [2, 5]
- compact s1, s2, s3 and result in new file s5 of seqno range [1, 4]
- compact s4 and result in new file s6 of seqno range [2] due to single delete
- By the last step, we have file ordering by largest seqno (">" means "newer") : s5 > s6 while s6 contains a newer version of the k1's value (i.e, k1@2) than s5, which is a real reordering corruption. While this can be caught by `force_consistency_check=true`, there isn't a good way to prevent this from happening if ordering by `largest_seqno`
Therefore, we are redesigning the sorting criteria of L0 files and avoid above inconvenience. Credit to ajkr , we now introduce `epoch_num` which describes the order of a file being flushed or ingested/imported (compaction output file will has the minimum `epoch_num` among input files'). This will avoid the above inconvenience in the following ways:
- In the first case above, there will no longer be overlap seqno range check in `force_consistency_check=true` but `epoch_number` ordering check. This will result in file ordering s1 < s2 < s4 (pre-compaction) and s3 < s4 (post-compaction) which won't trigger false positive corruption. See test class `DBCompactionTestL0FilesMisorderCorruption*` for more.
- In the second case above, this will result in file ordering s1 < s2 < s3 < s4 (pre-compacting s1, s2, s3), s5 < s4 (post-compacting s1, s2, s3), s5 < s6 (post-compacting s4), which are correct file ordering without causing any corruption.
**Summary:**
- Introduce `epoch_number` stored per `ColumnFamilyData` and sort CF's L0 files by their assigned `epoch_number` instead of `largest_seqno`.
- `epoch_number` is increased and assigned upon `VersionEdit::AddFile()` for flush (or similarly for WriteLevel0TableForRecovery) and file ingestion (except for allow_behind_true, which will always get assigned as the `kReservedEpochNumberForFileIngestedBehind`)
- Compaction output file is assigned with the minimum `epoch_number` among input files'
- Refit level: reuse refitted file's epoch_number
- Other paths needing `epoch_number` treatment:
- Import column families: reuse file's epoch_number if exists. If not, assign one based on `NewestFirstBySeqNo`
- Repair: reuse file's epoch_number if exists. If not, assign one based on `NewestFirstBySeqNo`.
- Assigning new epoch_number to a file and adding this file to LSM tree should be atomic. This is guaranteed by us assigning epoch_number right upon `VersionEdit::AddFile()` where this version edit will be apply to LSM tree shape right after by holding the db mutex (e.g, flush, file ingestion, import column family) or by there is only 1 ongoing edit per CF (e.g, WriteLevel0TableForRecovery, Repair).
- Assigning the minimum input epoch number to compaction output file won't misorder L0 files (even through later `Refit(target_level=0)`). It's due to for every key "k" in the input range, a legit compaction will cover a continuous epoch number range of that key. As long as we assign the key "k" the minimum input epoch number, it won't become newer or older than the versions of this key that aren't included in this compaction hence no misorder.
- Persist `epoch_number` of each file in manifest and recover `epoch_number` on db recovery
- Backward compatibility with old db without `epoch_number` support is guaranteed by assigning `epoch_number` to recovered files by `NewestFirstBySeqno` order. See `VersionStorageInfo::RecoverEpochNumbers()` for more
- Forward compatibility with manifest is guaranteed by flexibility of `NewFileCustomTag`
- Replace `force_consistent_check` on L0 with `epoch_number` and remove false positive check like case 1 with `largest_seqno` above
- Due to backward compatibility issue, we might encounter files with missing epoch number at the beginning of db recovery. We will still use old L0 sorting mechanism (`NewestFirstBySeqno`) to check/sort them till we infer their epoch number. See usages of `EpochNumberRequirement`.
- Remove fix https://github.com/facebook/rocksdb/pull/5958#issue-511150930 and their outdated tests to file reordering corruption because such fix can be replaced by this PR.
- Misc:
- update existing tests with `epoch_number` so make check will pass
- update https://github.com/facebook/rocksdb/pull/5958#issue-511150930 tests to verify corruption is fixed using `epoch_number` and cover universal/fifo compaction/CompactRange/CompactFile cases
- assert db_mutex is held for a few places before calling ColumnFamilyData::NewEpochNumber()
Pull Request resolved: https://github.com/facebook/rocksdb/pull/10922
Test Plan:
- `make check`
- New unit tests under `db/db_compaction_test.cc`, `db/db_test2.cc`, `db/version_builder_test.cc`, `db/repair_test.cc`
- Updated tests (i.e, `DBCompactionTestL0FilesMisorderCorruption*`) under https://github.com/facebook/rocksdb/pull/5958#issue-511150930
- [Ongoing] Compatibility test: manually run https://github.com/ajkr/rocksdb/commit/36a5686ec012f35a4371e409aa85c404ca1c210d (with file ingestion off for running the `.orig` binary to prevent this bug affecting upgrade/downgrade formality checking) for 1 hour on `simple black/white box`, `cf_consistency/txn/enable_ts with whitebox + test_best_efforts_recovery with blackbox`
- [Ongoing] normal db stress test
- [Ongoing] db stress test with aggressive value https://github.com/facebook/rocksdb/pull/10761
Reviewed By: ajkr
Differential Revision: D41063187
Pulled By: hx235
fbshipit-source-id: 826cb23455de7beaabe2d16c57682a82733a32a9
2022-12-13 21:29:37 +00:00
|
|
|
lf->file_creation_time, lf->epoch_number, lf->file_checksum,
|
2022-12-29 21:28:24 +00:00
|
|
|
lf->file_checksum_func_name, lf->unique_id,
|
2023-06-22 04:49:01 +00:00
|
|
|
lf->compensated_range_deletion_size, lf->tail_size,
|
|
|
|
lf->user_defined_timestamps_persisted);
|
2022-03-18 23:35:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
s = file_ios;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (s.ok() && edit.NumEntries() > 0) {
|
Sync dir containing CURRENT after RenameFile on CURRENT as much as possible (#10573)
Summary:
**Context:**
Below crash test revealed a bug that directory containing CURRENT file (short for `dir_contains_current_file` below) was not always get synced after a new CURRENT is created and being called with `RenameFile` as part of the creation.
This bug exposes a risk that such un-synced directory containing the updated CURRENT can’t survive a host crash (e.g, power loss) hence get corrupted. This then will be followed by a recovery from a corrupted CURRENT that we don't want.
The root-cause is that a nullptr `FSDirectory* dir_contains_current_file` sometimes gets passed-down to `SetCurrentFile()` hence in those case `dir_contains_current_file->FSDirectory::FsyncWithDirOptions()` will be skipped (which otherwise will internally call`Env/FS::SyncDic()` )
```
./db_stress --acquire_snapshot_one_in=10000 --adaptive_readahead=1 --allow_data_in_errors=True --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=8 --block_size=16384 --bloom_bits=134.8015470676662 --bottommost_compression_type=disable --cache_size=8388608 --checkpoint_one_in=1000000 --checksum_type=kCRC32c --clear_column_family_one_in=0 --compact_files_one_in=1000000 --compact_range_one_in=1000000 --compaction_pri=2 --compaction_ttl=100 --compression_max_dict_buffer_bytes=511 --compression_max_dict_bytes=16384 --compression_type=zstd --compression_use_zstd_dict_trainer=1 --compression_zstd_max_train_bytes=65536 --continuous_verification_interval=0 --data_block_index_type=0 --db=$db --db_write_buffer_size=1048576 --delpercent=5 --delrangepercent=0 --destroy_db_initially=0 --disable_wal=0 --enable_compaction_filter=0 --enable_pipelined_write=1 --expected_values_dir=$exp --fail_if_options_file_error=1 --file_checksum_impl=none --flush_one_in=1000000 --get_current_wal_file_one_in=0 --get_live_files_one_in=1000000 --get_property_one_in=1000000 --get_sorted_wal_files_one_in=0 --index_block_restart_interval=4 --ingest_external_file_one_in=0 --iterpercent=10 --key_len_percent_dist=1,30,69 --level_compaction_dynamic_level_bytes=True --mark_for_compaction_one_file_in=10 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=10000 --max_key_len=3 --max_manifest_file_size=16384 --max_write_batch_group_size_bytes=64 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=0 --memtable_prefix_bloom_size_ratio=0.001 --memtable_protection_bytes_per_key=1 --memtable_whole_key_filtering=1 --mmap_read=1 --nooverwritepercent=1 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=100000000 --optimize_filters_for_memory=1 --paranoid_file_checks=1 --partition_pinning=2 --pause_background_one_in=1000000 --periodic_compaction_seconds=0 --prefix_size=5 --prefixpercent=5 --prepopulate_block_cache=1 --progress_reports=0 --read_fault_one_in=1000 --readpercent=45 --recycle_log_file_num=0 --reopen=0 --ribbon_starting_level=999 --secondary_cache_fault_one_in=32 --secondary_cache_uri=compressed_secondary_cache://capacity=8388608 --set_options_one_in=10000 --snapshot_hold_ops=100000 --sst_file_manager_bytes_per_sec=0 --sst_file_manager_bytes_per_truncate=0 --subcompactions=3 --sync_fault_injection=1 --target_file_size_base=2097 --target_file_size_multiplier=2 --test_batches_snapshots=1 --top_level_index_pinning=1 --use_full_merge_v1=1 --use_merge=1 --value_size_mult=32 --verify_checksum=1 --verify_checksum_one_in=1000000 --verify_db_one_in=100000 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=524288 --write_buffer_size=4194 --writepercent=35
```
```
stderr:
WARNING: prefix_size is non-zero but memtablerep != prefix_hash
db_stress: utilities/fault_injection_fs.cc:748: virtual rocksdb::IOStatus rocksdb::FaultInjectionTestFS::RenameFile(const std::string &, const std::string &, const rocksdb::IOOptions &, rocksdb::IODebugContext *): Assertion `tlist.find(tdn.second) == tlist.end()' failed.`
```
**Summary:**
The PR ensured the non-test path pass down a non-null dir containing CURRENT (which is by current RocksDB assumption just db_dir) by doing the following:
- Renamed `directory_to_fsync` as `dir_contains_current_file` in `SetCurrentFile()` to tighten the association between this directory and CURRENT file
- Changed `SetCurrentFile()` API to require `dir_contains_current_file` being passed-in, instead of making it by default nullptr.
- Because `SetCurrentFile()`'s `dir_contains_current_file` is passed down from `VersionSet::LogAndApply()` then `VersionSet::ProcessManifestWrites()` (i.e, think about this as a chain of 3 functions related to MANIFEST update), these 2 functions also got refactored to require `dir_contains_current_file`
- Updated the non-test-path callers of these 3 functions to obtain and pass in non-nullptr `dir_contains_current_file`, which by current assumption of RocksDB, is the `FSDirectory* db_dir`.
- `db_impl` path will obtain `DBImpl::directories_.getDbDir()` while others with no access to such `directories_` are obtained on the fly by creating such object `FileSystem::NewDirectory(..)` and manage it by unique pointers to ensure short life time.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/10573
Test Plan:
- `make check`
- Passed the repro db_stress command
- For future improvement, since we currently don't assert dir containing CURRENT to be non-nullptr due to https://github.com/facebook/rocksdb/pull/10573#pullrequestreview-1087698899, there is still chances that future developers mistakenly pass down nullptr dir containing CURRENT thus resulting skipped sync dir and cause the bug again. Therefore a smarter test (e.g, such as quoted from ajkr "(make) unsynced data loss to be dropping files corresponding to unsynced directory entries") is still needed.
Reviewed By: ajkr
Differential Revision: D39005886
Pulled By: hx235
fbshipit-source-id: 336fb9090d0cfa6ca3dd580db86268007dde7f5a
2022-08-30 00:35:21 +00:00
|
|
|
std::unique_ptr<FSDirectory> db_dir;
|
|
|
|
s = fs->NewDirectory(db_name, IOOptions(), &db_dir, nullptr);
|
|
|
|
if (s.ok()) {
|
Group SST write in flush, compaction and db open with new stats (#11910)
Summary:
## Context/Summary
Similar to https://github.com/facebook/rocksdb/pull/11288, https://github.com/facebook/rocksdb/pull/11444, categorizing SST/blob file write according to different io activities allows more insight into the activity.
For that, this PR does the following:
- Tag different write IOs by passing down and converting WriteOptions to IOOptions
- Add new SST_WRITE_MICROS histogram in WritableFileWriter::Append() and breakdown FILE_WRITE_{FLUSH|COMPACTION|DB_OPEN}_MICROS
Some related code refactory to make implementation cleaner:
- Blob stats
- Replace high-level write measurement with low-level WritableFileWriter::Append() measurement for BLOB_DB_BLOB_FILE_WRITE_MICROS. This is to make FILE_WRITE_{FLUSH|COMPACTION|DB_OPEN}_MICROS include blob file. As a consequence, this introduces some behavioral changes on it, see HISTORY and db bench test plan below for more info.
- Fix bugs where BLOB_DB_BLOB_FILE_SYNCED/BLOB_DB_BLOB_FILE_BYTES_WRITTEN include file failed to sync and bytes failed to write.
- Refactor WriteOptions constructor for easier construction with io_activity and rate_limiter_priority
- Refactor DBImpl::~DBImpl()/BlobDBImpl::Close() to bypass thread op verification
- Build table
- TableBuilderOptions now includes Read/WriteOpitons so BuildTable() do not need to take these two variables
- Replace the io_priority passed into BuildTable() with TableBuilderOptions::WriteOpitons::rate_limiter_priority. Similar for BlobFileBuilder.
This parameter is used for dynamically changing file io priority for flush, see https://github.com/facebook/rocksdb/pull/9988?fbclid=IwAR1DtKel6c-bRJAdesGo0jsbztRtciByNlvokbxkV6h_L-AE9MACzqRTT5s for more
- Update ThreadStatus::FLUSH_BYTES_WRITTEN to use io_activity to track flush IO in flush job and db open instead of io_priority
## Test
### db bench
Flush
```
./db_bench --statistics=1 --benchmarks=fillseq --num=100000 --write_buffer_size=100
rocksdb.sst.write.micros P50 : 1.830863 P95 : 4.094720 P99 : 6.578947 P100 : 26.000000 COUNT : 7875 SUM : 20377
rocksdb.file.write.flush.micros P50 : 1.830863 P95 : 4.094720 P99 : 6.578947 P100 : 26.000000 COUNT : 7875 SUM : 20377
rocksdb.file.write.compaction.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0
rocksdb.file.write.db.open.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0
```
compaction, db oopen
```
Setup: ./db_bench --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench
Run:./db_bench --statistics=1 --benchmarks=compact --db=../db_bench --use_existing_db=1
rocksdb.sst.write.micros P50 : 2.675325 P95 : 9.578788 P99 : 18.780000 P100 : 314.000000 COUNT : 638 SUM : 3279
rocksdb.file.write.flush.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0
rocksdb.file.write.compaction.micros P50 : 2.757353 P95 : 9.610687 P99 : 19.316667 P100 : 314.000000 COUNT : 615 SUM : 3213
rocksdb.file.write.db.open.micros P50 : 2.055556 P95 : 3.925000 P99 : 9.000000 P100 : 9.000000 COUNT : 23 SUM : 66
```
blob stats - just to make sure they aren't broken by this PR
```
Integrated Blob DB
Setup: ./db_bench --enable_blob_files=1 --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench
Run:./db_bench --enable_blob_files=1 --statistics=1 --benchmarks=compact --db=../db_bench --use_existing_db=1
pre-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 7.298246 P95 : 9.771930 P99 : 9.991813 P100 : 16.000000 COUNT : 235 SUM : 1600
rocksdb.blobdb.blob.file.synced COUNT : 1
rocksdb.blobdb.blob.file.bytes.written COUNT : 34842
post-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 2.000000 P95 : 2.829360 P99 : 2.993779 P100 : 9.000000 COUNT : 707 SUM : 1614
- COUNT is higher and values are smaller as it includes header and footer write
- COUNT is 3X higher due to each Append() count as one post-PR, while in pre-PR, 3 Append()s counts as one. See https://github.com/facebook/rocksdb/pull/11910/files#diff-32b811c0a1c000768cfb2532052b44dc0b3bf82253f3eab078e15ff201a0dabfL157-L164
rocksdb.blobdb.blob.file.synced COUNT : 1 (stay the same)
rocksdb.blobdb.blob.file.bytes.written COUNT : 34842 (stay the same)
```
```
Stacked Blob DB
Run: ./db_bench --use_blob_db=1 --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench
pre-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 12.808042 P95 : 19.674497 P99 : 28.539683 P100 : 51.000000 COUNT : 10000 SUM : 140876
rocksdb.blobdb.blob.file.synced COUNT : 8
rocksdb.blobdb.blob.file.bytes.written COUNT : 1043445
post-PR:
rocksdb.blobdb.blob.file.write.micros P50 : 1.657370 P95 : 2.952175 P99 : 3.877519 P100 : 24.000000 COUNT : 30001 SUM : 67924
- COUNT is higher and values are smaller as it includes header and footer write
- COUNT is 3X higher due to each Append() count as one post-PR, while in pre-PR, 3 Append()s counts as one. See https://github.com/facebook/rocksdb/pull/11910/files#diff-32b811c0a1c000768cfb2532052b44dc0b3bf82253f3eab078e15ff201a0dabfL157-L164
rocksdb.blobdb.blob.file.synced COUNT : 8 (stay the same)
rocksdb.blobdb.blob.file.bytes.written COUNT : 1043445 (stay the same)
```
### Rehearsal CI stress test
Trigger 3 full runs of all our CI stress tests
### Performance
Flush
```
TEST_TMPDIR=/dev/shm ./db_basic_bench_pre_pr --benchmark_filter=ManualFlush/key_num:524288/per_key_size:256 --benchmark_repetitions=1000
-- default: 1 thread is used to run benchmark; enable_statistics = true
Pre-pr: avg 507515519.3 ns
497686074,499444327,500862543,501389862,502994471,503744435,504142123,504224056,505724198,506610393,506837742,506955122,507695561,507929036,508307733,508312691,508999120,509963561,510142147,510698091,510743096,510769317,510957074,511053311,511371367,511409911,511432960,511642385,511691964,511730908,
Post-pr: avg 511971266.5 ns, regressed 0.88%
502744835,506502498,507735420,507929724,508313335,509548582,509994942,510107257,510715603,511046955,511352639,511458478,512117521,512317380,512766303,512972652,513059586,513804934,513808980,514059409,514187369,514389494,514447762,514616464,514622882,514641763,514666265,514716377,514990179,515502408,
```
Compaction
```
TEST_TMPDIR=/dev/shm ./db_basic_bench_{pre|post}_pr --benchmark_filter=ManualCompaction/comp_style:0/max_data:134217728/per_key_size:256/enable_statistics:1 --benchmark_repetitions=1000
-- default: 1 thread is used to run benchmark
Pre-pr: avg 495346098.30 ns
492118301,493203526,494201411,494336607,495269217,495404950,496402598,497012157,497358370,498153846
Post-pr: avg 504528077.20, regressed 1.85%. "ManualCompaction" include flush so the isolated regression for compaction should be around 1.85-0.88 = 0.97%
502465338,502485945,502541789,502909283,503438601,504143885,506113087,506629423,507160414,507393007
```
Put with WAL (in case passing WriteOptions slows down this path even without collecting SST write stats)
```
TEST_TMPDIR=/dev/shm ./db_basic_bench_pre_pr --benchmark_filter=DBPut/comp_style:0/max_data:107374182400/per_key_size:256/enable_statistics:1/wal:1 --benchmark_repetitions=1000
-- default: 1 thread is used to run benchmark
Pre-pr: avg 3848.10 ns
3814,3838,3839,3848,3854,3854,3854,3860,3860,3860
Post-pr: avg 3874.20 ns, regressed 0.68%
3863,3867,3871,3874,3875,3877,3877,3877,3880,3881
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/11910
Reviewed By: ajkr
Differential Revision: D49788060
Pulled By: hx235
fbshipit-source-id: 79e73699cda5be3b66461687e5147c2484fc5eff
2023-12-29 23:29:23 +00:00
|
|
|
s = w.LogAndApply(read_options, write_options, cfd, &edit,
|
|
|
|
db_dir.get());
|
Sync dir containing CURRENT after RenameFile on CURRENT as much as possible (#10573)
Summary:
**Context:**
Below crash test revealed a bug that directory containing CURRENT file (short for `dir_contains_current_file` below) was not always get synced after a new CURRENT is created and being called with `RenameFile` as part of the creation.
This bug exposes a risk that such un-synced directory containing the updated CURRENT can’t survive a host crash (e.g, power loss) hence get corrupted. This then will be followed by a recovery from a corrupted CURRENT that we don't want.
The root-cause is that a nullptr `FSDirectory* dir_contains_current_file` sometimes gets passed-down to `SetCurrentFile()` hence in those case `dir_contains_current_file->FSDirectory::FsyncWithDirOptions()` will be skipped (which otherwise will internally call`Env/FS::SyncDic()` )
```
./db_stress --acquire_snapshot_one_in=10000 --adaptive_readahead=1 --allow_data_in_errors=True --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=8 --block_size=16384 --bloom_bits=134.8015470676662 --bottommost_compression_type=disable --cache_size=8388608 --checkpoint_one_in=1000000 --checksum_type=kCRC32c --clear_column_family_one_in=0 --compact_files_one_in=1000000 --compact_range_one_in=1000000 --compaction_pri=2 --compaction_ttl=100 --compression_max_dict_buffer_bytes=511 --compression_max_dict_bytes=16384 --compression_type=zstd --compression_use_zstd_dict_trainer=1 --compression_zstd_max_train_bytes=65536 --continuous_verification_interval=0 --data_block_index_type=0 --db=$db --db_write_buffer_size=1048576 --delpercent=5 --delrangepercent=0 --destroy_db_initially=0 --disable_wal=0 --enable_compaction_filter=0 --enable_pipelined_write=1 --expected_values_dir=$exp --fail_if_options_file_error=1 --file_checksum_impl=none --flush_one_in=1000000 --get_current_wal_file_one_in=0 --get_live_files_one_in=1000000 --get_property_one_in=1000000 --get_sorted_wal_files_one_in=0 --index_block_restart_interval=4 --ingest_external_file_one_in=0 --iterpercent=10 --key_len_percent_dist=1,30,69 --level_compaction_dynamic_level_bytes=True --mark_for_compaction_one_file_in=10 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=10000 --max_key_len=3 --max_manifest_file_size=16384 --max_write_batch_group_size_bytes=64 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=0 --memtable_prefix_bloom_size_ratio=0.001 --memtable_protection_bytes_per_key=1 --memtable_whole_key_filtering=1 --mmap_read=1 --nooverwritepercent=1 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=100000000 --optimize_filters_for_memory=1 --paranoid_file_checks=1 --partition_pinning=2 --pause_background_one_in=1000000 --periodic_compaction_seconds=0 --prefix_size=5 --prefixpercent=5 --prepopulate_block_cache=1 --progress_reports=0 --read_fault_one_in=1000 --readpercent=45 --recycle_log_file_num=0 --reopen=0 --ribbon_starting_level=999 --secondary_cache_fault_one_in=32 --secondary_cache_uri=compressed_secondary_cache://capacity=8388608 --set_options_one_in=10000 --snapshot_hold_ops=100000 --sst_file_manager_bytes_per_sec=0 --sst_file_manager_bytes_per_truncate=0 --subcompactions=3 --sync_fault_injection=1 --target_file_size_base=2097 --target_file_size_multiplier=2 --test_batches_snapshots=1 --top_level_index_pinning=1 --use_full_merge_v1=1 --use_merge=1 --value_size_mult=32 --verify_checksum=1 --verify_checksum_one_in=1000000 --verify_db_one_in=100000 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=524288 --write_buffer_size=4194 --writepercent=35
```
```
stderr:
WARNING: prefix_size is non-zero but memtablerep != prefix_hash
db_stress: utilities/fault_injection_fs.cc:748: virtual rocksdb::IOStatus rocksdb::FaultInjectionTestFS::RenameFile(const std::string &, const std::string &, const rocksdb::IOOptions &, rocksdb::IODebugContext *): Assertion `tlist.find(tdn.second) == tlist.end()' failed.`
```
**Summary:**
The PR ensured the non-test path pass down a non-null dir containing CURRENT (which is by current RocksDB assumption just db_dir) by doing the following:
- Renamed `directory_to_fsync` as `dir_contains_current_file` in `SetCurrentFile()` to tighten the association between this directory and CURRENT file
- Changed `SetCurrentFile()` API to require `dir_contains_current_file` being passed-in, instead of making it by default nullptr.
- Because `SetCurrentFile()`'s `dir_contains_current_file` is passed down from `VersionSet::LogAndApply()` then `VersionSet::ProcessManifestWrites()` (i.e, think about this as a chain of 3 functions related to MANIFEST update), these 2 functions also got refactored to require `dir_contains_current_file`
- Updated the non-test-path callers of these 3 functions to obtain and pass in non-nullptr `dir_contains_current_file`, which by current assumption of RocksDB, is the `FSDirectory* db_dir`.
- `db_impl` path will obtain `DBImpl::directories_.getDbDir()` while others with no access to such `directories_` are obtained on the fly by creating such object `FileSystem::NewDirectory(..)` and manage it by unique pointers to ensure short life time.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/10573
Test Plan:
- `make check`
- Passed the repro db_stress command
- For future improvement, since we currently don't assert dir containing CURRENT to be non-nullptr due to https://github.com/facebook/rocksdb/pull/10573#pullrequestreview-1087698899, there is still chances that future developers mistakenly pass down nullptr dir containing CURRENT thus resulting skipped sync dir and cause the bug again. Therefore a smarter test (e.g, such as quoted from ajkr "(make) unsynced data loss to be dropping files corresponding to unsynced directory entries") is still needed.
Reviewed By: ajkr
Differential Revision: D39005886
Pulled By: hx235
fbshipit-source-id: 336fb9090d0cfa6ca3dd580db86268007dde7f5a
2022-08-30 00:35:21 +00:00
|
|
|
}
|
2022-03-18 23:35:51 +00:00
|
|
|
if (s.ok()) {
|
|
|
|
++cfs_updated;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (cfs_updated > 0) {
|
|
|
|
ROCKS_LOG_INFO(db_opts.info_log,
|
|
|
|
"UpdateManifestForFilesState: updated %zu files in %zu CFs",
|
|
|
|
files_updated, cfs_updated);
|
|
|
|
} else if (s.ok()) {
|
|
|
|
ROCKS_LOG_INFO(db_opts.info_log,
|
|
|
|
"UpdateManifestForFilesState: no updates needed");
|
|
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
|
|
ROCKS_LOG_ERROR(db_opts.info_log, "UpdateManifestForFilesState failed: %s",
|
|
|
|
s.ToString().c_str());
|
|
|
|
}
|
|
|
|
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
KeySegmentsExtractor and prototype higher-dimensional filtering (#12075)
Summary:
This change contains a prototype new API for "higher dimensional" filtering of read queries. Existing filters treat keys as one-dimensional, either as distinct points (whole key) or as contiguous ranges in comparator order (prefix filters). The proposed KeySegmentsExtractor allows treating keys as multi-dimensional for filtering purposes even though they still have a single total order across dimensions. For example, consider these keys in different LSM levels:
L0:
abc_0123
abc_0150
def_0114
ghi_0134
L1:
abc_0045
bcd_0091
def_0077
xyz_0080
If we get a range query for [def_0100, def_0200), a prefix filter (up to the underscore) will tell us that both levels are potentially relevant. However, if each SST file stores a simple range of the values for the second segment of the key, we would see that L1 only has [0045, 0091] which (under certain required assumptions) we are sure does not overlap with the given range query. Thus, we can filter out processing or reading any index or data blocks from L1 for the query.
This kind of case shows up with time-ordered data but is more general than filtering based on user timestamp. See https://github.com/facebook/rocksdb/issues/11332 . Here the "time" segments of the keys are meaningfully ordered with respect to each other even when the previous segment is different, so summarizing data along an alternate dimension of the key like this can work well for filtering.
This prototype implementation simply leverages existing APIs for user table properties and table filtering, which is not very CPU efficient. Eventually, we expect to create a native implementation. However, I have put some significant
thought and engineering into the new APIs overall, which I expect to be close to refined enough for production.
For details, see new public APIs in experimental.h. For a detailed example, see the new unit test in db_bloom_filter_test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12075
Test Plan: Unit test included
Reviewed By: jowlyzhang
Differential Revision: D53619406
Pulled By: pdillinger
fbshipit-source-id: 9e6e7b82b4db8d815db76a6ab340e90db2c191f2
2024-02-15 23:39:55 +00:00
|
|
|
// EXPERIMENTAL new filtering features
|
|
|
|
|
|
|
|
namespace {
|
|
|
|
void GetFilterInput(FilterInput select, const Slice& key,
|
|
|
|
const KeySegmentsExtractor::Result& extracted,
|
|
|
|
Slice* out_input, Slice* out_leadup) {
|
|
|
|
struct FilterInputGetter {
|
|
|
|
explicit FilterInputGetter(const Slice& _key,
|
|
|
|
const KeySegmentsExtractor::Result& _extracted)
|
|
|
|
: key(_key), extracted(_extracted) {}
|
|
|
|
const Slice& key;
|
|
|
|
const KeySegmentsExtractor::Result& extracted;
|
|
|
|
|
|
|
|
Slice operator()(SelectKeySegment select) {
|
|
|
|
size_t count = extracted.segment_ends.size();
|
|
|
|
if (count <= select.segment_index) {
|
|
|
|
return Slice();
|
|
|
|
}
|
|
|
|
assert(count > 0);
|
|
|
|
size_t start = select.segment_index > 0
|
|
|
|
? extracted.segment_ends[select.segment_index - 1]
|
|
|
|
: 0;
|
|
|
|
size_t end =
|
|
|
|
extracted
|
|
|
|
.segment_ends[std::min(size_t{select.segment_index}, count - 1)];
|
|
|
|
return Slice(key.data() + start, end - start);
|
|
|
|
}
|
|
|
|
|
|
|
|
Slice operator()(SelectKeySegmentRange select) {
|
|
|
|
assert(select.from_segment_index <= select.to_segment_index);
|
|
|
|
size_t count = extracted.segment_ends.size();
|
|
|
|
if (count <= select.from_segment_index) {
|
|
|
|
return Slice();
|
|
|
|
}
|
|
|
|
assert(count > 0);
|
|
|
|
size_t start = select.from_segment_index > 0
|
|
|
|
? extracted.segment_ends[select.from_segment_index - 1]
|
|
|
|
: 0;
|
|
|
|
size_t end = extracted.segment_ends[std::min(
|
|
|
|
size_t{select.to_segment_index}, count - 1)];
|
|
|
|
return Slice(key.data() + start, end - start);
|
|
|
|
}
|
|
|
|
|
|
|
|
Slice operator()(SelectWholeKey) { return key; }
|
|
|
|
|
|
|
|
Slice operator()(SelectLegacyKeyPrefix) {
|
|
|
|
// TODO
|
|
|
|
assert(false);
|
|
|
|
return Slice();
|
|
|
|
}
|
|
|
|
|
|
|
|
Slice operator()(SelectUserTimestamp) {
|
|
|
|
// TODO
|
|
|
|
assert(false);
|
|
|
|
return Slice();
|
|
|
|
}
|
|
|
|
|
|
|
|
Slice operator()(SelectColumnName) {
|
|
|
|
// TODO
|
|
|
|
assert(false);
|
|
|
|
return Slice();
|
|
|
|
}
|
|
|
|
|
|
|
|
Slice operator()(SelectValue) {
|
|
|
|
// TODO
|
|
|
|
assert(false);
|
|
|
|
return Slice();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
Slice input = std::visit(FilterInputGetter(key, extracted), select);
|
|
|
|
*out_input = input;
|
|
|
|
if (input.empty() || input.data() < key.data() ||
|
|
|
|
input.data() > key.data() + key.size()) {
|
|
|
|
*out_leadup = key;
|
|
|
|
} else {
|
|
|
|
*out_leadup = Slice(key.data(), input.data() - key.data());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const char* DeserializeFilterInput(const char* p, const char* limit,
|
|
|
|
FilterInput* out) {
|
|
|
|
if (p >= limit) {
|
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
uint8_t b = static_cast<uint8_t>(*p++);
|
|
|
|
if (b & 0x80) {
|
|
|
|
// Reserved for future use to read more bytes
|
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
|
|
|
|
switch (b >> 4) {
|
|
|
|
case 0:
|
|
|
|
// Various cases that don't have an argument
|
|
|
|
switch (b) {
|
|
|
|
case 0:
|
|
|
|
*out = SelectWholeKey{};
|
|
|
|
return p;
|
|
|
|
case 1:
|
|
|
|
*out = SelectLegacyKeyPrefix{};
|
|
|
|
return p;
|
|
|
|
case 2:
|
|
|
|
*out = SelectUserTimestamp{};
|
|
|
|
return p;
|
|
|
|
case 3:
|
|
|
|
*out = SelectColumnName{};
|
|
|
|
return p;
|
|
|
|
case 4:
|
|
|
|
*out = SelectValue{};
|
|
|
|
return p;
|
|
|
|
default:
|
|
|
|
// Reserved for future use
|
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
case 1:
|
|
|
|
// First 16 cases of SelectKeySegment
|
|
|
|
*out = SelectKeySegment{BitwiseAnd(b, 0xf)};
|
|
|
|
return p;
|
|
|
|
case 2:
|
|
|
|
// First 16 cases of SelectKeySegmentRange
|
|
|
|
// that are not a single key segment
|
|
|
|
// 0: 0-1
|
|
|
|
// 1: 0-2
|
|
|
|
// 2: 1-2
|
|
|
|
// 3: 0-3
|
|
|
|
// 4: 1-3
|
|
|
|
// 5: 2-3
|
|
|
|
// 6: 0-4
|
|
|
|
// 7: 1-4
|
|
|
|
// 8: 2-4
|
|
|
|
// 9: 3-4
|
|
|
|
// 10: 0-5
|
|
|
|
// 11: 1-5
|
|
|
|
// 12: 2-5
|
|
|
|
// 13: 3-5
|
|
|
|
// 14: 4-5
|
|
|
|
// 15: 0-6
|
|
|
|
if (b < 6) {
|
|
|
|
if (b >= 3) {
|
|
|
|
*out = SelectKeySegmentRange{static_cast<uint8_t>(b - 3), 3};
|
|
|
|
} else if (b >= 1) {
|
|
|
|
*out = SelectKeySegmentRange{static_cast<uint8_t>(b - 1), 2};
|
|
|
|
} else {
|
|
|
|
*out = SelectKeySegmentRange{0, 1};
|
|
|
|
}
|
|
|
|
} else if (b < 10) {
|
|
|
|
*out = SelectKeySegmentRange{static_cast<uint8_t>(b - 6), 4};
|
|
|
|
} else if (b < 15) {
|
|
|
|
*out = SelectKeySegmentRange{static_cast<uint8_t>(b - 10), 5};
|
|
|
|
} else {
|
|
|
|
*out = SelectKeySegmentRange{0, 6};
|
|
|
|
}
|
|
|
|
return p;
|
|
|
|
default:
|
|
|
|
// Reserved for future use
|
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void SerializeFilterInput(std::string* out, const FilterInput& select) {
|
|
|
|
struct FilterInputSerializer {
|
|
|
|
std::string* out;
|
|
|
|
void operator()(SelectWholeKey) { out->push_back(0); }
|
|
|
|
void operator()(SelectLegacyKeyPrefix) { out->push_back(1); }
|
|
|
|
void operator()(SelectUserTimestamp) { out->push_back(2); }
|
|
|
|
void operator()(SelectColumnName) { out->push_back(3); }
|
|
|
|
void operator()(SelectValue) { out->push_back(4); }
|
|
|
|
void operator()(SelectKeySegment select) {
|
|
|
|
// TODO: expand supported cases
|
|
|
|
assert(select.segment_index < 16);
|
|
|
|
out->push_back(static_cast<char>((1 << 4) | select.segment_index));
|
|
|
|
}
|
|
|
|
void operator()(SelectKeySegmentRange select) {
|
|
|
|
auto from = select.from_segment_index;
|
|
|
|
auto to = select.to_segment_index;
|
|
|
|
// TODO: expand supported cases
|
|
|
|
assert(from < 6);
|
|
|
|
assert(to < 6 || (to == 6 && from == 0));
|
|
|
|
assert(from < to);
|
|
|
|
int start = (to - 1) * to / 2;
|
|
|
|
assert(start + from < 16);
|
|
|
|
out->push_back(static_cast<char>((2 << 4) | (start + from)));
|
|
|
|
}
|
|
|
|
};
|
|
|
|
std::visit(FilterInputSerializer{out}, select);
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t GetFilterInputSerializedLength(const FilterInput& /*select*/) {
|
|
|
|
// TODO: expand supported cases
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64_t CategorySetToUint(const KeySegmentsExtractor::KeyCategorySet& s) {
|
|
|
|
static_assert(sizeof(KeySegmentsExtractor::KeyCategorySet) ==
|
|
|
|
sizeof(uint64_t));
|
|
|
|
return *reinterpret_cast<const uint64_t*>(&s);
|
|
|
|
}
|
|
|
|
|
|
|
|
KeySegmentsExtractor::KeyCategorySet UintToCategorySet(uint64_t s) {
|
|
|
|
static_assert(sizeof(KeySegmentsExtractor::KeyCategorySet) ==
|
|
|
|
sizeof(uint64_t));
|
|
|
|
return *reinterpret_cast<const KeySegmentsExtractor::KeyCategorySet*>(&s);
|
|
|
|
}
|
|
|
|
|
|
|
|
enum BuiltinSstQueryFilters : char {
|
|
|
|
// Wraps a set of filters such that they use a particular
|
|
|
|
// KeySegmentsExtractor and a set of categories covering all keys seen.
|
|
|
|
// TODO: unit test category covering filtering
|
|
|
|
kExtrAndCatFilterWrapper = 0x1,
|
|
|
|
|
|
|
|
// Wraps a set of filters to limit their scope to a particular set of
|
|
|
|
// categories. (Unlike kExtrAndCatFilterWrapper,
|
|
|
|
// keys in other categories may have been seen so are not filtered here.)
|
|
|
|
// TODO: unit test more subtleties
|
|
|
|
kCategoryScopeFilterWrapper = 0x2,
|
|
|
|
|
|
|
|
// ... (reserve some values for more wrappers)
|
|
|
|
|
|
|
|
// A filter representing the bytewise min and max values of a numbered
|
|
|
|
// segment or composite (range of segments). The empty value is tracked
|
|
|
|
// and filtered independently because it might be a special case that is
|
|
|
|
// not representative of the minimum in a spread of values.
|
|
|
|
kBytewiseMinMaxFilter = 0x10,
|
|
|
|
};
|
|
|
|
|
|
|
|
class SstQueryFilterBuilder {
|
|
|
|
public:
|
2024-03-04 18:08:32 +00:00
|
|
|
virtual ~SstQueryFilterBuilder() = default;
|
KeySegmentsExtractor and prototype higher-dimensional filtering (#12075)
Summary:
This change contains a prototype new API for "higher dimensional" filtering of read queries. Existing filters treat keys as one-dimensional, either as distinct points (whole key) or as contiguous ranges in comparator order (prefix filters). The proposed KeySegmentsExtractor allows treating keys as multi-dimensional for filtering purposes even though they still have a single total order across dimensions. For example, consider these keys in different LSM levels:
L0:
abc_0123
abc_0150
def_0114
ghi_0134
L1:
abc_0045
bcd_0091
def_0077
xyz_0080
If we get a range query for [def_0100, def_0200), a prefix filter (up to the underscore) will tell us that both levels are potentially relevant. However, if each SST file stores a simple range of the values for the second segment of the key, we would see that L1 only has [0045, 0091] which (under certain required assumptions) we are sure does not overlap with the given range query. Thus, we can filter out processing or reading any index or data blocks from L1 for the query.
This kind of case shows up with time-ordered data but is more general than filtering based on user timestamp. See https://github.com/facebook/rocksdb/issues/11332 . Here the "time" segments of the keys are meaningfully ordered with respect to each other even when the previous segment is different, so summarizing data along an alternate dimension of the key like this can work well for filtering.
This prototype implementation simply leverages existing APIs for user table properties and table filtering, which is not very CPU efficient. Eventually, we expect to create a native implementation. However, I have put some significant
thought and engineering into the new APIs overall, which I expect to be close to refined enough for production.
For details, see new public APIs in experimental.h. For a detailed example, see the new unit test in db_bloom_filter_test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12075
Test Plan: Unit test included
Reviewed By: jowlyzhang
Differential Revision: D53619406
Pulled By: pdillinger
fbshipit-source-id: 9e6e7b82b4db8d815db76a6ab340e90db2c191f2
2024-02-15 23:39:55 +00:00
|
|
|
virtual void Add(const Slice& key,
|
|
|
|
const KeySegmentsExtractor::Result& extracted,
|
|
|
|
const Slice* prev_key,
|
|
|
|
const KeySegmentsExtractor::Result* prev_extracted) = 0;
|
|
|
|
virtual Status GetStatus() const = 0;
|
|
|
|
virtual size_t GetEncodedLength() const = 0;
|
|
|
|
virtual void Finish(std::string& append_to) = 0;
|
|
|
|
};
|
|
|
|
|
|
|
|
class SstQueryFilterConfigImpl : public SstQueryFilterConfig {
|
|
|
|
public:
|
|
|
|
explicit SstQueryFilterConfigImpl(
|
|
|
|
const FilterInput& input,
|
|
|
|
const KeySegmentsExtractor::KeyCategorySet& categories)
|
|
|
|
: input_(input), categories_(categories) {}
|
|
|
|
|
2024-03-04 18:08:32 +00:00
|
|
|
virtual ~SstQueryFilterConfigImpl() = default;
|
KeySegmentsExtractor and prototype higher-dimensional filtering (#12075)
Summary:
This change contains a prototype new API for "higher dimensional" filtering of read queries. Existing filters treat keys as one-dimensional, either as distinct points (whole key) or as contiguous ranges in comparator order (prefix filters). The proposed KeySegmentsExtractor allows treating keys as multi-dimensional for filtering purposes even though they still have a single total order across dimensions. For example, consider these keys in different LSM levels:
L0:
abc_0123
abc_0150
def_0114
ghi_0134
L1:
abc_0045
bcd_0091
def_0077
xyz_0080
If we get a range query for [def_0100, def_0200), a prefix filter (up to the underscore) will tell us that both levels are potentially relevant. However, if each SST file stores a simple range of the values for the second segment of the key, we would see that L1 only has [0045, 0091] which (under certain required assumptions) we are sure does not overlap with the given range query. Thus, we can filter out processing or reading any index or data blocks from L1 for the query.
This kind of case shows up with time-ordered data but is more general than filtering based on user timestamp. See https://github.com/facebook/rocksdb/issues/11332 . Here the "time" segments of the keys are meaningfully ordered with respect to each other even when the previous segment is different, so summarizing data along an alternate dimension of the key like this can work well for filtering.
This prototype implementation simply leverages existing APIs for user table properties and table filtering, which is not very CPU efficient. Eventually, we expect to create a native implementation. However, I have put some significant
thought and engineering into the new APIs overall, which I expect to be close to refined enough for production.
For details, see new public APIs in experimental.h. For a detailed example, see the new unit test in db_bloom_filter_test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12075
Test Plan: Unit test included
Reviewed By: jowlyzhang
Differential Revision: D53619406
Pulled By: pdillinger
fbshipit-source-id: 9e6e7b82b4db8d815db76a6ab340e90db2c191f2
2024-02-15 23:39:55 +00:00
|
|
|
|
|
|
|
virtual std::unique_ptr<SstQueryFilterBuilder> NewBuilder(
|
|
|
|
bool sanity_checks) const = 0;
|
|
|
|
|
|
|
|
protected:
|
|
|
|
FilterInput input_;
|
|
|
|
KeySegmentsExtractor::KeyCategorySet categories_;
|
|
|
|
};
|
|
|
|
|
|
|
|
class CategoryScopeFilterWrapperBuilder : public SstQueryFilterBuilder {
|
|
|
|
public:
|
|
|
|
explicit CategoryScopeFilterWrapperBuilder(
|
|
|
|
KeySegmentsExtractor::KeyCategorySet categories,
|
|
|
|
std::unique_ptr<SstQueryFilterBuilder> wrapped)
|
|
|
|
: categories_(categories), wrapped_(std::move(wrapped)) {}
|
|
|
|
|
|
|
|
void Add(const Slice& key, const KeySegmentsExtractor::Result& extracted,
|
|
|
|
const Slice* prev_key,
|
|
|
|
const KeySegmentsExtractor::Result* prev_extracted) override {
|
|
|
|
if (!categories_.Contains(extracted.category)) {
|
|
|
|
// Category not in scope of the contituent filters
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
wrapped_->Add(key, extracted, prev_key, prev_extracted);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status GetStatus() const override { return wrapped_->GetStatus(); }
|
|
|
|
|
|
|
|
size_t GetEncodedLength() const override {
|
|
|
|
size_t wrapped_length = wrapped_->GetEncodedLength();
|
|
|
|
if (wrapped_length == 0) {
|
|
|
|
// Use empty filter
|
|
|
|
// FIXME: needs unit test
|
|
|
|
return 0;
|
|
|
|
} else {
|
|
|
|
// For now in the code, wraps only 1 filter, but schema supports multiple
|
|
|
|
return 1 + VarintLength(CategorySetToUint(categories_)) +
|
|
|
|
VarintLength(1) + wrapped_length;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void Finish(std::string& append_to) override {
|
|
|
|
size_t encoded_length = GetEncodedLength();
|
|
|
|
if (encoded_length == 0) {
|
|
|
|
// Nothing to do
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
size_t old_append_to_size = append_to.size();
|
|
|
|
append_to.reserve(old_append_to_size + encoded_length);
|
|
|
|
append_to.push_back(kCategoryScopeFilterWrapper);
|
|
|
|
|
|
|
|
PutVarint64(&append_to, CategorySetToUint(categories_));
|
|
|
|
|
|
|
|
// Wrapping just 1 filter for now
|
|
|
|
PutVarint64(&append_to, 1);
|
|
|
|
wrapped_->Finish(append_to);
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
KeySegmentsExtractor::KeyCategorySet categories_;
|
|
|
|
std::unique_ptr<SstQueryFilterBuilder> wrapped_;
|
|
|
|
};
|
|
|
|
|
|
|
|
class BytewiseMinMaxSstQueryFilterConfig : public SstQueryFilterConfigImpl {
|
|
|
|
public:
|
|
|
|
using SstQueryFilterConfigImpl::SstQueryFilterConfigImpl;
|
|
|
|
|
|
|
|
std::unique_ptr<SstQueryFilterBuilder> NewBuilder(
|
|
|
|
bool sanity_checks) const override {
|
|
|
|
auto b = std::make_unique<MyBuilder>(*this, sanity_checks);
|
|
|
|
if (categories_ != KeySegmentsExtractor::KeyCategorySet::All()) {
|
|
|
|
return std::make_unique<CategoryScopeFilterWrapperBuilder>(categories_,
|
|
|
|
std::move(b));
|
|
|
|
} else {
|
|
|
|
return b;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static bool RangeMayMatch(
|
|
|
|
const Slice& filter, const Slice& lower_bound_incl,
|
|
|
|
const KeySegmentsExtractor::Result& lower_bound_extracted,
|
|
|
|
const Slice& upper_bound_excl,
|
|
|
|
const KeySegmentsExtractor::Result& upper_bound_extracted) {
|
|
|
|
assert(!filter.empty() && filter[0] == kBytewiseMinMaxFilter);
|
|
|
|
if (filter.size() <= 4) {
|
|
|
|
// Missing some data
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
bool empty_included = (filter[1] & kEmptySeenFlag) != 0;
|
|
|
|
const char* p = filter.data() + 2;
|
|
|
|
const char* limit = filter.data() + filter.size();
|
|
|
|
|
|
|
|
FilterInput in;
|
|
|
|
p = DeserializeFilterInput(p, limit, &in);
|
|
|
|
if (p == nullptr) {
|
|
|
|
// Corrupt or unsupported
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
uint32_t smallest_size;
|
|
|
|
p = GetVarint32Ptr(p, limit, &smallest_size);
|
|
|
|
if (p == nullptr || static_cast<size_t>(limit - p) <= smallest_size) {
|
|
|
|
// Corrupt
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
Slice smallest = Slice(p, smallest_size);
|
|
|
|
p += smallest_size;
|
|
|
|
|
|
|
|
size_t largest_size = static_cast<size_t>(limit - p);
|
|
|
|
Slice largest = Slice(p, largest_size);
|
|
|
|
|
|
|
|
Slice lower_bound_input, lower_bound_leadup;
|
|
|
|
Slice upper_bound_input, upper_bound_leadup;
|
|
|
|
GetFilterInput(in, lower_bound_incl, lower_bound_extracted,
|
|
|
|
&lower_bound_input, &lower_bound_leadup);
|
|
|
|
GetFilterInput(in, upper_bound_excl, upper_bound_extracted,
|
|
|
|
&upper_bound_input, &upper_bound_leadup);
|
|
|
|
|
|
|
|
if (lower_bound_leadup.compare(upper_bound_leadup) != 0) {
|
|
|
|
// Unable to filter range when bounds have different lead-up to key
|
|
|
|
// segment
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (empty_included && lower_bound_input.empty()) {
|
|
|
|
// May match on 0-length segment
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
// TODO: potentially fix upper bound to actually be exclusive, but it's not
|
|
|
|
// as simple as changing >= to > below, because it's upper_bound_excl that's
|
|
|
|
// exclusive, and the upper_bound_input part extracted from it might not be.
|
|
|
|
|
|
|
|
// May match if both the upper bound and lower bound indicate there could
|
|
|
|
// be overlap
|
|
|
|
return upper_bound_input.compare(smallest) >= 0 &&
|
|
|
|
lower_bound_input.compare(largest) <= 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
protected:
|
|
|
|
struct MyBuilder : public SstQueryFilterBuilder {
|
|
|
|
MyBuilder(const BytewiseMinMaxSstQueryFilterConfig& _parent,
|
|
|
|
bool _sanity_checks)
|
|
|
|
: parent(_parent), sanity_checks(_sanity_checks) {}
|
|
|
|
|
|
|
|
void Add(const Slice& key, const KeySegmentsExtractor::Result& extracted,
|
|
|
|
const Slice* prev_key,
|
|
|
|
const KeySegmentsExtractor::Result* prev_extracted) override {
|
|
|
|
Slice input, leadup;
|
|
|
|
GetFilterInput(parent.input_, key, extracted, &input, &leadup);
|
|
|
|
|
|
|
|
if (sanity_checks && prev_key && prev_extracted) {
|
|
|
|
// Opportunistic checking of segment ordering invariant
|
|
|
|
Slice prev_input, prev_leadup;
|
|
|
|
GetFilterInput(parent.input_, *prev_key, *prev_extracted, &prev_input,
|
|
|
|
&prev_leadup);
|
|
|
|
|
|
|
|
int compare = prev_leadup.compare(leadup);
|
|
|
|
if (compare > 0) {
|
|
|
|
status = Status::Corruption(
|
|
|
|
"Ordering invariant violated from 0x" +
|
|
|
|
prev_key->ToString(/*hex=*/true) + " with prefix 0x" +
|
|
|
|
prev_leadup.ToString(/*hex=*/true) + " to 0x" +
|
|
|
|
key.ToString(/*hex=*/true) + " with prefix 0x" +
|
|
|
|
leadup.ToString(/*hex=*/true));
|
|
|
|
return;
|
|
|
|
} else if (compare == 0) {
|
|
|
|
// On the same prefix leading up to the segment, the segments must
|
|
|
|
// not be out of order.
|
|
|
|
compare = prev_input.compare(input);
|
|
|
|
if (compare > 0) {
|
|
|
|
status = Status::Corruption(
|
|
|
|
"Ordering invariant violated from 0x" +
|
|
|
|
prev_key->ToString(/*hex=*/true) + " with segment 0x" +
|
|
|
|
prev_input.ToString(/*hex=*/true) + " to 0x" +
|
|
|
|
key.ToString(/*hex=*/true) + " with segment 0x" +
|
|
|
|
input.ToString(/*hex=*/true));
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Now actually update state for the filter inputs
|
|
|
|
// TODO: shorten largest and smallest if appropriate
|
|
|
|
if (input.empty()) {
|
|
|
|
empty_seen = true;
|
|
|
|
} else if (largest.empty()) {
|
|
|
|
// Step for first non-empty input
|
|
|
|
smallest = largest = input.ToString();
|
|
|
|
} else if (input.compare(largest) > 0) {
|
|
|
|
largest = input.ToString();
|
|
|
|
} else if (input.compare(smallest) < 0) {
|
|
|
|
smallest = input.ToString();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Status GetStatus() const override { return status; }
|
|
|
|
|
|
|
|
size_t GetEncodedLength() const override {
|
|
|
|
if (largest.empty()) {
|
|
|
|
// Not an interesting filter -> 0 to indicate no filter
|
|
|
|
// FIXME: needs unit test
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
return 2 + GetFilterInputSerializedLength(parent.input_) +
|
|
|
|
VarintLength(smallest.size()) + smallest.size() + largest.size();
|
|
|
|
}
|
|
|
|
|
|
|
|
void Finish(std::string& append_to) override {
|
|
|
|
assert(status.ok());
|
|
|
|
size_t encoded_length = GetEncodedLength();
|
|
|
|
if (encoded_length == 0) {
|
|
|
|
// Nothing to do
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
size_t old_append_to_size = append_to.size();
|
|
|
|
append_to.reserve(old_append_to_size + encoded_length);
|
|
|
|
append_to.push_back(kBytewiseMinMaxFilter);
|
|
|
|
|
|
|
|
append_to.push_back(empty_seen ? kEmptySeenFlag : 0);
|
|
|
|
|
|
|
|
SerializeFilterInput(&append_to, parent.input_);
|
|
|
|
|
|
|
|
PutVarint32(&append_to, static_cast<uint32_t>(smallest.size()));
|
|
|
|
append_to.append(smallest);
|
|
|
|
// The end of `largest` is given by the end of the filter
|
|
|
|
append_to.append(largest);
|
|
|
|
assert(append_to.size() == old_append_to_size + encoded_length);
|
|
|
|
}
|
|
|
|
|
|
|
|
const BytewiseMinMaxSstQueryFilterConfig& parent;
|
|
|
|
const bool sanity_checks;
|
|
|
|
// Smallest and largest segment seen, excluding the empty segment which
|
|
|
|
// is tracked separately
|
|
|
|
std::string smallest;
|
|
|
|
std::string largest;
|
|
|
|
bool empty_seen = false;
|
|
|
|
|
|
|
|
// Only for sanity checks
|
|
|
|
Status status;
|
|
|
|
};
|
|
|
|
|
|
|
|
private:
|
|
|
|
static constexpr char kEmptySeenFlag = 0x1;
|
|
|
|
};
|
|
|
|
|
|
|
|
const SstQueryFilterConfigs kEmptyNotFoundSQFC{};
|
|
|
|
|
|
|
|
class SstQueryFilterConfigsManagerImpl : public SstQueryFilterConfigsManager {
|
|
|
|
public:
|
|
|
|
using ConfigVersionMap = std::map<FilteringVersion, SstQueryFilterConfigs>;
|
|
|
|
|
|
|
|
Status Populate(const Data& data) {
|
|
|
|
if (data.empty()) {
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
// Populate only once
|
|
|
|
assert(min_ver_ == 0 && max_ver_ == 0);
|
|
|
|
min_ver_ = max_ver_ = data.begin()->first;
|
|
|
|
|
|
|
|
FilteringVersion prev_ver = 0;
|
|
|
|
bool first_entry = true;
|
|
|
|
for (const auto& ver_info : data) {
|
|
|
|
if (ver_info.first == 0) {
|
|
|
|
return Status::InvalidArgument(
|
|
|
|
"Filtering version 0 is reserved for empty configuration and may "
|
|
|
|
"not be overridden");
|
|
|
|
}
|
|
|
|
if (first_entry) {
|
|
|
|
min_ver_ = ver_info.first;
|
|
|
|
first_entry = false;
|
|
|
|
} else if (ver_info.first != prev_ver + 1) {
|
|
|
|
return Status::InvalidArgument(
|
|
|
|
"Filtering versions must increase by 1 without repeating: " +
|
|
|
|
std::to_string(prev_ver) + " -> " + std::to_string(ver_info.first));
|
|
|
|
}
|
|
|
|
max_ver_ = ver_info.first;
|
|
|
|
UnorderedSet<std::string> names_seen_this_ver;
|
|
|
|
for (const auto& config : ver_info.second) {
|
|
|
|
if (!names_seen_this_ver.insert(config.first).second) {
|
|
|
|
return Status::InvalidArgument(
|
|
|
|
"Duplicate name in filtering version " +
|
|
|
|
std::to_string(ver_info.first) + ": " + config.first);
|
|
|
|
}
|
|
|
|
auto& ver_map = name_map_[config.first];
|
|
|
|
ver_map[ver_info.first] = config.second;
|
|
|
|
if (config.second.extractor) {
|
|
|
|
extractor_map_[config.second.extractor->GetId()] =
|
|
|
|
config.second.extractor;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
prev_ver = ver_info.first;
|
|
|
|
}
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
|
|
|
|
struct MyCollector : public TablePropertiesCollector {
|
|
|
|
// Keeps a reference to `configs` which should be kept alive by
|
|
|
|
// SstQueryFilterConfigsManagerImpl, which should be kept alive by
|
|
|
|
// any factories
|
|
|
|
// TODO: sanity_checks option
|
|
|
|
explicit MyCollector(const SstQueryFilterConfigs& configs,
|
|
|
|
const SstQueryFilterConfigsManagerImpl& _parent)
|
|
|
|
: parent(_parent),
|
|
|
|
extractor(configs.extractor.get()),
|
|
|
|
sanity_checks(true) {
|
|
|
|
for (const auto& c : configs.filters) {
|
|
|
|
builders.push_back(
|
|
|
|
static_cast<SstQueryFilterConfigImpl&>(*c).NewBuilder(
|
|
|
|
sanity_checks));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Status AddUserKey(const Slice& key, const Slice& /*value*/,
|
|
|
|
EntryType /*type*/, SequenceNumber /*seq*/,
|
|
|
|
uint64_t /*file_size*/) override {
|
|
|
|
// FIXME later: `key` might contain user timestamp. That should be
|
|
|
|
// exposed properly in a future update to TablePropertiesCollector
|
|
|
|
KeySegmentsExtractor::Result extracted;
|
|
|
|
if (extractor) {
|
|
|
|
extractor->Extract(key, KeySegmentsExtractor::kFullUserKey, &extracted);
|
|
|
|
if (UNLIKELY(extracted.category >=
|
|
|
|
KeySegmentsExtractor::kMinErrorCategory)) {
|
|
|
|
// TODO: proper failure scopes
|
|
|
|
Status s = Status::Corruption(
|
|
|
|
"Extractor returned error category from key 0x" +
|
|
|
|
Slice(key).ToString(/*hex=*/true));
|
|
|
|
overall_status.UpdateIfOk(s);
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
assert(extracted.category <= KeySegmentsExtractor::kMaxUsableCategory);
|
|
|
|
|
|
|
|
bool new_category = categories_seen.Add(extracted.category);
|
|
|
|
if (sanity_checks) {
|
|
|
|
// Opportunistic checking of category ordering invariant
|
|
|
|
if (!first_key) {
|
|
|
|
if (prev_extracted.category != extracted.category &&
|
|
|
|
!new_category) {
|
|
|
|
Status s = Status::Corruption(
|
|
|
|
"Category ordering invariant violated from key 0x" +
|
|
|
|
Slice(prev_key).ToString(/*hex=*/true) + " to 0x" +
|
|
|
|
key.ToString(/*hex=*/true));
|
|
|
|
overall_status.UpdateIfOk(s);
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for (const auto& b : builders) {
|
|
|
|
if (first_key) {
|
|
|
|
b->Add(key, extracted, nullptr, nullptr);
|
|
|
|
} else {
|
|
|
|
Slice prev_key_slice = Slice(prev_key);
|
|
|
|
b->Add(key, extracted, &prev_key_slice, &prev_extracted);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
prev_key.assign(key.data(), key.size());
|
|
|
|
prev_extracted = std::move(extracted);
|
|
|
|
first_key = false;
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
Status Finish(UserCollectedProperties* properties) override {
|
|
|
|
assert(properties != nullptr);
|
|
|
|
|
|
|
|
if (!overall_status.ok()) {
|
|
|
|
return overall_status;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t total_size = 1;
|
|
|
|
autovector<std::pair<SstQueryFilterBuilder&, size_t>> filters_to_finish;
|
|
|
|
// Need to determine number of filters before serializing them. Might
|
|
|
|
// as well determine full length also.
|
|
|
|
for (const auto& b : builders) {
|
|
|
|
Status s = b->GetStatus();
|
|
|
|
if (s.ok()) {
|
|
|
|
size_t len = b->GetEncodedLength();
|
|
|
|
if (len > 0) {
|
|
|
|
total_size += VarintLength(len) + len;
|
|
|
|
filters_to_finish.emplace_back(*b, len);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// FIXME: no way to report partial failure without getting
|
|
|
|
// remaining filters thrown out
|
|
|
|
}
|
|
|
|
}
|
|
|
|
total_size += VarintLength(filters_to_finish.size());
|
|
|
|
if (filters_to_finish.empty()) {
|
|
|
|
// No filters to add
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
// Length of the last filter is omitted
|
|
|
|
total_size -= VarintLength(filters_to_finish.back().second);
|
|
|
|
|
|
|
|
// Need to determine size of
|
|
|
|
// kExtrAndCatFilterWrapper if used
|
|
|
|
std::string extractor_id;
|
|
|
|
if (extractor) {
|
|
|
|
extractor_id = extractor->GetId();
|
|
|
|
// identifier byte
|
|
|
|
total_size += 1;
|
|
|
|
// fields of the wrapper
|
|
|
|
total_size += VarintLength(extractor_id.size()) + extractor_id.size() +
|
|
|
|
VarintLength(CategorySetToUint(categories_seen));
|
|
|
|
// outer layer will have just 1 filter in its count (added here)
|
|
|
|
// and this filter wrapper will have filters_to_finish.size()
|
|
|
|
// (added above).
|
|
|
|
total_size += VarintLength(1);
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string filters;
|
|
|
|
filters.reserve(total_size);
|
|
|
|
|
|
|
|
// Leave room for drastic changes in the future.
|
|
|
|
filters.push_back(kSchemaVersion);
|
|
|
|
|
|
|
|
if (extractor) {
|
|
|
|
// Wrap everything in a kExtrAndCatFilterWrapper
|
|
|
|
// TODO in future: put whole key filters outside of this wrapper.
|
|
|
|
// Also TODO in future: order the filters starting with broadest
|
|
|
|
// applicability.
|
|
|
|
|
|
|
|
// Just one top-level filter (wrapper). Because it's last, we don't
|
|
|
|
// need to encode its length.
|
|
|
|
PutVarint64(&filters, 1);
|
|
|
|
// The filter(s) wrapper itself
|
|
|
|
filters.push_back(kExtrAndCatFilterWrapper);
|
|
|
|
PutVarint64(&filters, extractor_id.size());
|
|
|
|
filters += extractor_id;
|
|
|
|
PutVarint64(&filters, CategorySetToUint(categories_seen));
|
|
|
|
}
|
|
|
|
|
|
|
|
PutVarint64(&filters, filters_to_finish.size());
|
|
|
|
|
|
|
|
for (const auto& e : filters_to_finish) {
|
|
|
|
// Encode filter length, except last filter
|
|
|
|
if (&e != &filters_to_finish.back()) {
|
|
|
|
PutVarint64(&filters, e.second);
|
|
|
|
}
|
|
|
|
// Encode filter
|
|
|
|
e.first.Finish(filters);
|
|
|
|
}
|
|
|
|
if (filters.size() != total_size) {
|
|
|
|
assert(false);
|
|
|
|
return Status::Corruption(
|
|
|
|
"Internal inconsistency building SST query filters");
|
|
|
|
}
|
|
|
|
|
|
|
|
(*properties)[kTablePropertyName] = std::move(filters);
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
UserCollectedProperties GetReadableProperties() const override {
|
|
|
|
// TODO?
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
const char* Name() const override {
|
|
|
|
// placeholder
|
|
|
|
return "SstQueryFilterConfigsImpl::MyCollector";
|
|
|
|
}
|
|
|
|
|
|
|
|
Status overall_status;
|
|
|
|
const SstQueryFilterConfigsManagerImpl& parent;
|
|
|
|
const KeySegmentsExtractor* const extractor;
|
|
|
|
const bool sanity_checks;
|
|
|
|
std::vector<std::shared_ptr<SstQueryFilterBuilder>> builders;
|
|
|
|
bool first_key = true;
|
|
|
|
std::string prev_key;
|
|
|
|
KeySegmentsExtractor::Result prev_extracted;
|
|
|
|
KeySegmentsExtractor::KeyCategorySet categories_seen;
|
|
|
|
};
|
|
|
|
|
|
|
|
struct RangeQueryFilterReader {
|
|
|
|
Slice lower_bound_incl;
|
|
|
|
Slice upper_bound_excl;
|
|
|
|
const KeySegmentsExtractor* extractor;
|
|
|
|
const UnorderedMap<std::string,
|
|
|
|
std::shared_ptr<const KeySegmentsExtractor>>&
|
|
|
|
extractor_map;
|
|
|
|
|
|
|
|
struct State {
|
|
|
|
KeySegmentsExtractor::Result lb_extracted;
|
|
|
|
KeySegmentsExtractor::Result ub_extracted;
|
|
|
|
};
|
|
|
|
|
|
|
|
bool MayMatch_CategoryScopeFilterWrapper(Slice wrapper,
|
|
|
|
State& state) const {
|
|
|
|
assert(!wrapper.empty() && wrapper[0] == kCategoryScopeFilterWrapper);
|
|
|
|
|
|
|
|
// Regardless of the filter values (which we assume is not all
|
|
|
|
// categories; that should skip the wrapper), we need upper bound and
|
|
|
|
// lower bound to be in the same category to do any range filtering.
|
|
|
|
// (There could be another category in range between the bounds.)
|
|
|
|
if (state.lb_extracted.category != state.ub_extracted.category) {
|
|
|
|
// Can't filter between categories
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
const char* p = wrapper.data() + 1;
|
|
|
|
const char* limit = wrapper.data() + wrapper.size();
|
|
|
|
|
|
|
|
uint64_t cats_raw;
|
|
|
|
p = GetVarint64Ptr(p, limit, &cats_raw);
|
|
|
|
if (p == nullptr) {
|
|
|
|
// Missing categories
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
KeySegmentsExtractor::KeyCategorySet categories =
|
|
|
|
UintToCategorySet(cats_raw);
|
|
|
|
|
|
|
|
// Check category against those in scope
|
|
|
|
if (!categories.Contains(state.lb_extracted.category)) {
|
|
|
|
// Can't filter this category
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Process the wrapped filters
|
|
|
|
return MayMatch(Slice(p, limit - p), &state);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool MayMatch_ExtrAndCatFilterWrapper(Slice wrapper) const {
|
|
|
|
assert(!wrapper.empty() && wrapper[0] == kExtrAndCatFilterWrapper);
|
|
|
|
if (wrapper.size() <= 4) {
|
|
|
|
// Missing some data
|
|
|
|
// (1 byte marker, >= 1 byte name length, >= 1 byte name, >= 1 byte
|
|
|
|
// categories, ...)
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
const char* p = wrapper.data() + 1;
|
|
|
|
const char* limit = wrapper.data() + wrapper.size();
|
|
|
|
uint64_t name_len;
|
|
|
|
p = GetVarint64Ptr(p, limit, &name_len);
|
|
|
|
if (p == nullptr || name_len == 0 ||
|
|
|
|
static_cast<size_t>(limit - p) < name_len) {
|
|
|
|
// Missing some data
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
Slice name(p, name_len);
|
|
|
|
p += name_len;
|
|
|
|
const KeySegmentsExtractor* ex = nullptr;
|
|
|
|
if (extractor && name == Slice(extractor->GetId())) {
|
|
|
|
ex = extractor;
|
|
|
|
} else {
|
|
|
|
auto it = extractor_map.find(name.ToString());
|
|
|
|
if (it != extractor_map.end()) {
|
|
|
|
ex = it->second.get();
|
|
|
|
} else {
|
|
|
|
// Extractor mismatch / not found
|
|
|
|
// TODO future: try to get the extractor from the ObjectRegistry
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO future: cache extraction?
|
|
|
|
|
|
|
|
// Ready to run extractor
|
|
|
|
assert(ex);
|
|
|
|
State state;
|
|
|
|
ex->Extract(lower_bound_incl, KeySegmentsExtractor::kInclusiveLowerBound,
|
|
|
|
&state.lb_extracted);
|
|
|
|
if (UNLIKELY(state.lb_extracted.category >=
|
|
|
|
KeySegmentsExtractor::kMinErrorCategory)) {
|
|
|
|
// TODO? Report problem
|
|
|
|
// No filtering
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
assert(state.lb_extracted.category <=
|
|
|
|
KeySegmentsExtractor::kMaxUsableCategory);
|
|
|
|
|
|
|
|
ex->Extract(upper_bound_excl, KeySegmentsExtractor::kExclusiveUpperBound,
|
|
|
|
&state.ub_extracted);
|
|
|
|
if (UNLIKELY(state.ub_extracted.category >=
|
|
|
|
KeySegmentsExtractor::kMinErrorCategory)) {
|
|
|
|
// TODO? Report problem
|
|
|
|
// No filtering
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
assert(state.ub_extracted.category <=
|
|
|
|
KeySegmentsExtractor::kMaxUsableCategory);
|
|
|
|
|
|
|
|
uint64_t cats_raw;
|
|
|
|
p = GetVarint64Ptr(p, limit, &cats_raw);
|
|
|
|
if (p == nullptr) {
|
|
|
|
// Missing categories
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
KeySegmentsExtractor::KeyCategorySet categories =
|
|
|
|
UintToCategorySet(cats_raw);
|
|
|
|
|
|
|
|
// Can only filter out based on category if upper and lower bound have
|
|
|
|
// the same category. (Each category is contiguous by key order, but we
|
|
|
|
// don't know the order between categories.)
|
|
|
|
if (state.lb_extracted.category == state.ub_extracted.category &&
|
|
|
|
!categories.Contains(state.lb_extracted.category)) {
|
|
|
|
// Filtered out
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Process the wrapped filters
|
|
|
|
return MayMatch(Slice(p, limit - p), &state);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool MayMatch(Slice filters, State* state = nullptr) const {
|
|
|
|
const char* p = filters.data();
|
|
|
|
const char* limit = p + filters.size();
|
|
|
|
uint64_t filter_count;
|
|
|
|
p = GetVarint64Ptr(p, limit, &filter_count);
|
|
|
|
if (p == nullptr || filter_count == 0) {
|
|
|
|
// TODO? Report problem
|
|
|
|
// No filtering
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
for (size_t i = 0; i < filter_count; ++i) {
|
|
|
|
uint64_t filter_len;
|
|
|
|
if (i + 1 == filter_count) {
|
|
|
|
// Last filter
|
|
|
|
filter_len = static_cast<uint64_t>(limit - p);
|
|
|
|
} else {
|
|
|
|
p = GetVarint64Ptr(p, limit, &filter_len);
|
|
|
|
if (p == nullptr || filter_len == 0 ||
|
|
|
|
static_cast<size_t>(limit - p) < filter_len) {
|
|
|
|
// TODO? Report problem
|
|
|
|
// No filtering
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Slice filter = Slice(p, filter_len);
|
|
|
|
p += filter_len;
|
|
|
|
bool may_match = true;
|
|
|
|
char type = filter[0];
|
|
|
|
switch (type) {
|
|
|
|
case kExtrAndCatFilterWrapper:
|
|
|
|
may_match = MayMatch_ExtrAndCatFilterWrapper(filter);
|
|
|
|
break;
|
|
|
|
case kCategoryScopeFilterWrapper:
|
|
|
|
if (state == nullptr) {
|
|
|
|
// TODO? Report problem
|
|
|
|
// No filtering
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
may_match = MayMatch_CategoryScopeFilterWrapper(filter, *state);
|
|
|
|
break;
|
|
|
|
case kBytewiseMinMaxFilter:
|
|
|
|
if (state == nullptr) {
|
|
|
|
// TODO? Report problem
|
|
|
|
// No filtering
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
may_match = BytewiseMinMaxSstQueryFilterConfig::RangeMayMatch(
|
|
|
|
filter, lower_bound_incl, state->lb_extracted, upper_bound_excl,
|
|
|
|
state->ub_extracted);
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
// TODO? Report problem
|
|
|
|
{}
|
|
|
|
// Unknown filter type
|
|
|
|
}
|
|
|
|
if (!may_match) {
|
|
|
|
// Successfully filtered
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wasn't filtered
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
struct MyFactory : public Factory {
|
|
|
|
explicit MyFactory(
|
|
|
|
std::shared_ptr<const SstQueryFilterConfigsManagerImpl> _parent,
|
|
|
|
const std::string& _configs_name)
|
|
|
|
: parent(std::move(_parent)),
|
|
|
|
ver_map(parent->GetVerMap(_configs_name)),
|
|
|
|
configs_name(_configs_name) {}
|
|
|
|
|
|
|
|
TablePropertiesCollector* CreateTablePropertiesCollector(
|
|
|
|
TablePropertiesCollectorFactory::Context /*context*/) override {
|
|
|
|
auto& configs = GetConfigs();
|
|
|
|
if (configs.IsEmptyNotFound()) {
|
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
return new MyCollector(configs, *parent);
|
|
|
|
}
|
|
|
|
const char* Name() const override {
|
|
|
|
// placeholder
|
|
|
|
return "SstQueryFilterConfigsManagerImpl::MyFactory";
|
|
|
|
}
|
|
|
|
|
|
|
|
Status SetFilteringVersion(FilteringVersion ver) override {
|
|
|
|
if (ver > 0 && ver < parent->min_ver_) {
|
|
|
|
return Status::InvalidArgument(
|
|
|
|
"Filtering version is before earliest known configuration: " +
|
|
|
|
std::to_string(ver) + " < " + std::to_string(parent->min_ver_));
|
|
|
|
}
|
|
|
|
if (ver > parent->max_ver_) {
|
|
|
|
return Status::InvalidArgument(
|
|
|
|
"Filtering version is after latest known configuration: " +
|
|
|
|
std::to_string(ver) + " > " + std::to_string(parent->max_ver_));
|
|
|
|
}
|
|
|
|
version.StoreRelaxed(ver);
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
FilteringVersion GetFilteringVersion() const override {
|
|
|
|
return version.LoadRelaxed();
|
|
|
|
}
|
|
|
|
const std::string& GetConfigsName() const override { return configs_name; }
|
|
|
|
const SstQueryFilterConfigs& GetConfigs() const override {
|
|
|
|
FilteringVersion ver = version.LoadRelaxed();
|
|
|
|
if (ver == 0) {
|
|
|
|
// Special case
|
|
|
|
return kEmptyNotFoundSQFC;
|
|
|
|
}
|
|
|
|
assert(ver >= parent->min_ver_);
|
|
|
|
assert(ver <= parent->max_ver_);
|
|
|
|
auto it = ver_map.upper_bound(ver);
|
|
|
|
if (it == ver_map.begin()) {
|
|
|
|
return kEmptyNotFoundSQFC;
|
|
|
|
} else {
|
|
|
|
--it;
|
|
|
|
return it->second;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// The buffers pointed to by the Slices must live as long as any read
|
|
|
|
// operations using this table filter function.
|
|
|
|
std::function<bool(const TableProperties&)> GetTableFilterForRangeQuery(
|
|
|
|
Slice lower_bound_incl, Slice upper_bound_excl) const override {
|
|
|
|
// TODO: cache extractor results between SST files, assuming most will
|
|
|
|
// use the same version
|
|
|
|
return
|
|
|
|
[rqf = RangeQueryFilterReader{
|
|
|
|
lower_bound_incl, upper_bound_excl, GetConfigs().extractor.get(),
|
|
|
|
parent->extractor_map_}](const TableProperties& props) -> bool {
|
|
|
|
auto it = props.user_collected_properties.find(kTablePropertyName);
|
|
|
|
if (it == props.user_collected_properties.end()) {
|
|
|
|
// No filtering
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
auto& filters = it->second;
|
|
|
|
// Parse the serialized filters string
|
|
|
|
if (filters.size() < 2 || filters[0] != kSchemaVersion) {
|
|
|
|
// TODO? Report problem
|
|
|
|
// No filtering
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return rqf.MayMatch(Slice(filters.data() + 1, filters.size() - 1));
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
const std::shared_ptr<const SstQueryFilterConfigsManagerImpl> parent;
|
|
|
|
const ConfigVersionMap& ver_map;
|
|
|
|
const std::string configs_name;
|
|
|
|
RelaxedAtomic<FilteringVersion> version;
|
|
|
|
};
|
|
|
|
|
|
|
|
Status MakeSharedFactory(const std::string& configs_name,
|
|
|
|
FilteringVersion ver,
|
|
|
|
std::shared_ptr<Factory>* out) const override {
|
|
|
|
auto obj = std::make_shared<MyFactory>(
|
|
|
|
static_cast_with_check<const SstQueryFilterConfigsManagerImpl>(
|
|
|
|
shared_from_this()),
|
|
|
|
configs_name);
|
|
|
|
Status s = obj->SetFilteringVersion(ver);
|
|
|
|
if (s.ok()) {
|
|
|
|
*out = std::move(obj);
|
|
|
|
}
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
const ConfigVersionMap& GetVerMap(const std::string& configs_name) const {
|
|
|
|
static const ConfigVersionMap kEmptyMap;
|
|
|
|
auto it = name_map_.find(configs_name);
|
|
|
|
if (it == name_map_.end()) {
|
|
|
|
return kEmptyMap;
|
|
|
|
}
|
|
|
|
return it->second;
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
static const std::string kTablePropertyName;
|
|
|
|
static constexpr char kSchemaVersion = 1;
|
|
|
|
|
|
|
|
private:
|
|
|
|
UnorderedMap<std::string, ConfigVersionMap> name_map_;
|
|
|
|
UnorderedMap<std::string, std::shared_ptr<const KeySegmentsExtractor>>
|
|
|
|
extractor_map_;
|
|
|
|
FilteringVersion min_ver_ = 0;
|
|
|
|
FilteringVersion max_ver_ = 0;
|
|
|
|
};
|
|
|
|
|
|
|
|
// SstQueryFilterConfigs
|
|
|
|
const std::string SstQueryFilterConfigsManagerImpl::kTablePropertyName =
|
|
|
|
"rocksdb.sqfc";
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
bool SstQueryFilterConfigs::IsEmptyNotFound() const {
|
|
|
|
return this == &kEmptyNotFoundSQFC;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::shared_ptr<SstQueryFilterConfig> MakeSharedBytewiseMinMaxSQFC(
|
|
|
|
FilterInput input, KeySegmentsExtractor::KeyCategorySet categories) {
|
|
|
|
return std::make_shared<BytewiseMinMaxSstQueryFilterConfig>(input,
|
|
|
|
categories);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status SstQueryFilterConfigsManager::MakeShared(
|
|
|
|
const Data& data, std::shared_ptr<SstQueryFilterConfigsManager>* out) {
|
|
|
|
auto obj = std::make_shared<SstQueryFilterConfigsManagerImpl>();
|
|
|
|
Status s = obj->Populate(data);
|
|
|
|
if (s.ok()) {
|
|
|
|
*out = std::move(obj);
|
|
|
|
}
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
2024-03-04 18:08:32 +00:00
|
|
|
} // namespace ROCKSDB_NAMESPACE::experimental
|