Add some checks at property block creation side (#12898)

Summary:
Crash test encountered this failure:
```file ingestion error: Corruption: properties unsorted under specified IngestExternalFileOptions: move_files: 0, verify_checksums_before_ingest: 1, verify_checksums_readahead_size: 1048576 (Empty string or missing field indicates default option or value is used```

Further inspection showed out of order table properties in an external file created by `SstFileWriter` for ingestion, and the file is likely created like this because it passed the initial checksum check. This change added some assertions to check invariant at the properties creation and collecting side.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12898

Test Plan: Existing tests

Reviewed By: hx235

Differential Revision: D60459817

Pulled By: jowlyzhang

fbshipit-source-id: 91474943d2f9d7795f00b6031c08a13ab91e2470
This commit is contained in:
Yu Zhang 2024-07-31 13:28:17 -07:00 committed by Facebook GitHub Bot
parent 2595476541
commit 319374ae67
5 changed files with 32 additions and 14 deletions

View File

@ -674,10 +674,8 @@ class SstFileWriterCollector : public TablePropertiesCollector {
Status Finish(UserCollectedProperties* properties) override { Status Finish(UserCollectedProperties* properties) override {
std::string count = std::to_string(count_); std::string count = std::to_string(count_);
*properties = UserCollectedProperties{ properties->insert({prefix_ + "_SstFileWriterCollector", "YES"});
{prefix_ + "_SstFileWriterCollector", "YES"}, properties->insert({prefix_ + "_Count", count});
{prefix_ + "_Count", count},
};
return Status::OK(); return Status::OK();
} }

View File

@ -354,13 +354,13 @@ TEST_F(EventListenerTest, OnSingleDBFlushTest) {
} }
TEST_F(EventListenerTest, MultiCF) { TEST_F(EventListenerTest, MultiCF) {
for (auto atomic_flush : {false, true}) {
Options options; Options options;
options.env = CurrentOptions().env; options.env = CurrentOptions().env;
options.write_buffer_size = k110KB; options.write_buffer_size = k110KB;
#ifdef ROCKSDB_USING_THREAD_STATUS #ifdef ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true; options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
for (auto atomic_flush : {false, true}) {
options.atomic_flush = atomic_flush; options.atomic_flush = atomic_flush;
options.create_if_missing = true; options.create_if_missing = true;
DestroyAndReopen(options); DestroyAndReopen(options);

View File

@ -125,6 +125,8 @@ class TablePropertiesCollector {
// Finish() will be called when a table has already been built and is ready // Finish() will be called when a table has already been built and is ready
// for writing the properties block. // for writing the properties block.
// It will be called only once by RocksDB internal. // It will be called only once by RocksDB internal.
// When the returned Status is not OK, the collected properties will not be
// written to the file's property block.
// //
// @params properties User will add their collected statistics to // @params properties User will add their collected statistics to
// `properties`. // `properties`.

View File

@ -59,12 +59,11 @@ PropertyBlockBuilder::PropertyBlockBuilder()
void PropertyBlockBuilder::Add(const std::string& name, void PropertyBlockBuilder::Add(const std::string& name,
const std::string& val) { const std::string& val) {
assert(props_.find(name) == props_.end());
props_.insert({name, val}); props_.insert({name, val});
} }
void PropertyBlockBuilder::Add(const std::string& name, uint64_t val) { void PropertyBlockBuilder::Add(const std::string& name, uint64_t val) {
assert(props_.find(name) == props_.end());
std::string dst; std::string dst;
PutVarint64(&dst, val); PutVarint64(&dst, val);
@ -168,7 +167,12 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) {
Slice PropertyBlockBuilder::Finish() { Slice PropertyBlockBuilder::Finish() {
for (const auto& prop : props_) { for (const auto& prop : props_) {
assert(last_prop_added_to_block_.empty() ||
comparator_->Compare(prop.first, last_prop_added_to_block_) > 0);
properties_block_->Add(prop.first, prop.second); properties_block_->Add(prop.first, prop.second);
#ifndef NDEBUG
last_prop_added_to_block_ = prop.first;
#endif /* !NDEBUG */
} }
return properties_block_->Finish(); return properties_block_->Finish();
@ -218,12 +222,21 @@ bool NotifyCollectTableCollectorsOnFinish(
UserCollectedProperties& readable_properties) { UserCollectedProperties& readable_properties) {
bool all_succeeded = true; bool all_succeeded = true;
for (auto& collector : collectors) { for (auto& collector : collectors) {
Status s = collector->Finish(&user_collected_properties); UserCollectedProperties user_properties;
Status s = collector->Finish(&user_properties);
if (s.ok()) { if (s.ok()) {
for (const auto& prop : collector->GetReadableProperties()) { for (const auto& prop : collector->GetReadableProperties()) {
readable_properties.insert(prop); readable_properties.insert(prop);
} }
builder->Add(user_collected_properties); #ifndef NDEBUG
// Check different user properties collectors are not adding properties of
// the same name.
for (const auto& pair : user_properties) {
assert(user_collected_properties.find(pair.first) ==
user_collected_properties.end());
}
#endif /* !NDEBUG */
user_collected_properties.merge(user_properties);
} else { } else {
LogPropertiesCollectionError(info_log, "Finish" /* method */, LogPropertiesCollectionError(info_log, "Finish" /* method */,
collector->Name()); collector->Name());
@ -232,6 +245,7 @@ bool NotifyCollectTableCollectorsOnFinish(
} }
} }
} }
builder->Add(user_collected_properties);
return all_succeeded; return all_succeeded;
} }

View File

@ -73,6 +73,10 @@ class PropertyBlockBuilder {
private: private:
std::unique_ptr<BlockBuilder> properties_block_; std::unique_ptr<BlockBuilder> properties_block_;
stl_wrappers::KVMap props_; stl_wrappers::KVMap props_;
#ifndef NDEBUG
const Comparator* comparator_ = BytewiseComparator();
Slice last_prop_added_to_block_;
#endif /* !NDEBUG */
}; };
// Were we encounter any error occurs during user-defined statistics collection, // Were we encounter any error occurs during user-defined statistics collection,