mirror of https://github.com/facebook/rocksdb.git
Fix and test for leaks of open SST files (#13117)
Summary: Follow-up to https://github.com/facebook/rocksdb/issues/13106 which revealed that some SST file readers (in addition to blob files) were being essentially leaked in TableCache (until DB::Close() time). Patched sources of leaks: * Flush that is not committed (builder.cc) * Various obsolete SST files picked up by directory scan but not caught by SubcompactionState::Cleanup() cleaning up from some failed compactions. Dozens of unit tests fail without the "backstop" TableCache::Evict() call in PurgeObsoleteFiles(). We also needed to adjust the check for leaks as follows: * Ok if DB::Open never finished (see comment) * Ok if deletions are disabled (see comment) * Allow "quarantined" files to be in table_cache because (presumably) they might become live again. * Get live files from all live Versions. Suggested follow-up: * Potentially delete more obsolete files sooner with a FIXME in db_impl_files.cc. This could potentially be high value because it seems to gate deletion of any/all newer obsolete files on all older compactions finishing. * Try to catch obsolete files in more places using the VersionSet::obsolete_files_ pipeline rather than relying on them being picked up with directory scan, or deleting them outside of normal mechanisms. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13117 Test Plan: updated check used in most all unit tests in ASAN build Reviewed By: hx235 Differential Revision: D65502988 Pulled By: pdillinger fbshipit-source-id: aa0795a8a09d9ec578d25183fe43e2a35849209c
This commit is contained in:
parent
ba164ac373
commit
485ee4f45c
|
@ -460,9 +460,18 @@ Status BuildTable(
|
||||||
Status prepare =
|
Status prepare =
|
||||||
WritableFileWriter::PrepareIOOptions(tboptions.write_options, opts);
|
WritableFileWriter::PrepareIOOptions(tboptions.write_options, opts);
|
||||||
if (prepare.ok()) {
|
if (prepare.ok()) {
|
||||||
|
// FIXME: track file for "slow" deletion, e.g. into the
|
||||||
|
// VersionSet::obsolete_files_ pipeline
|
||||||
Status ignored = fs->DeleteFile(fname, opts, dbg);
|
Status ignored = fs->DeleteFile(fname, opts, dbg);
|
||||||
ignored.PermitUncheckedError();
|
ignored.PermitUncheckedError();
|
||||||
}
|
}
|
||||||
|
// Ensure we don't leak table cache entries when throwing away output
|
||||||
|
// files. (The usual logic in PurgeObsoleteFiles is not applicable because
|
||||||
|
// this function deletes the obsolete file itself, while they should
|
||||||
|
// probably go into the VersionSet::obsolete_files_ pipeline.)
|
||||||
|
TableCache::ReleaseObsolete(table_cache->get_cache().get(),
|
||||||
|
meta->fd.GetNumber(), nullptr /*handle*/,
|
||||||
|
mutable_cf_options.uncache_aggressiveness);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(blob_file_additions || blob_file_paths.empty());
|
assert(blob_file_additions || blob_file_paths.empty());
|
||||||
|
|
|
@ -34,9 +34,16 @@ void SubcompactionState::Cleanup(Cache* cache) {
|
||||||
|
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
for (const auto& out : GetOutputs()) {
|
for (const auto& out : GetOutputs()) {
|
||||||
// If this file was inserted into the table cache then remove
|
// If this file was inserted into the table cache then remove it here
|
||||||
// them here because this compaction was not committed.
|
// because this compaction was not committed. This is not strictly
|
||||||
TableCache::Evict(cache, out.meta.fd.GetNumber());
|
// required because of a backstop TableCache::Evict() in
|
||||||
|
// PurgeObsoleteFiles() but is our opportunity to apply
|
||||||
|
// uncache_aggressiveness. TODO: instead, put these files into the
|
||||||
|
// VersionSet::obsolete_files_ pipeline so that they don't have to
|
||||||
|
// be picked up by scanning the DB directory.
|
||||||
|
TableCache::ReleaseObsolete(
|
||||||
|
cache, out.meta.fd.GetNumber(), nullptr /*handle*/,
|
||||||
|
compaction->mutable_cf_options()->uncache_aggressiveness);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO: sub_compact.io_status is not checked like status. Not sure if thats
|
// TODO: sub_compact.io_status is not checked like status. Not sure if thats
|
||||||
|
|
|
@ -338,31 +338,48 @@ void DBImpl::TEST_VerifyNoObsoleteFilesCached(
|
||||||
l.emplace(&mutex_);
|
l.emplace(&mutex_);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<uint64_t> live_files;
|
if (!opened_successfully_) {
|
||||||
|
// We don't need to pro-actively clean up open files during DB::Open()
|
||||||
|
// if we know we are about to fail and clean up in Close().
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (disable_delete_obsolete_files_ > 0) {
|
||||||
|
// For better or worse, DB::Close() is allowed with deletions disabled.
|
||||||
|
// Since we generally associate clean-up of open files with deleting them,
|
||||||
|
// we allow "obsolete" open files when deletions are disabled.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Live and "quarantined" files are allowed to be open in table cache
|
||||||
|
std::set<uint64_t> live_and_quar_files;
|
||||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||||
if (cfd->IsDropped()) {
|
if (cfd->IsDropped()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Sneakily add both SST and blob files to the same list
|
// Iterate over live versions
|
||||||
cfd->current()->AddLiveFiles(&live_files, &live_files);
|
Version* current = cfd->current();
|
||||||
}
|
Version* ver = current;
|
||||||
std::sort(live_files.begin(), live_files.end());
|
do {
|
||||||
|
// Sneakily add both SST and blob files to the same list
|
||||||
|
std::vector<uint64_t> live_files_vec;
|
||||||
|
ver->AddLiveFiles(&live_files_vec, &live_files_vec);
|
||||||
|
live_and_quar_files.insert(live_files_vec.begin(), live_files_vec.end());
|
||||||
|
|
||||||
auto fn = [&live_files](const Slice& key, Cache::ObjectPtr, size_t,
|
ver = ver->Next();
|
||||||
const Cache::CacheItemHelper* helper) {
|
} while (ver != current);
|
||||||
if (helper != BlobFileCache::GetHelper()) {
|
}
|
||||||
// Skip non-blob files for now
|
{
|
||||||
// FIXME: diagnose and fix the leaks of obsolete SST files revealed in
|
const auto& quar_files = error_handler_.GetFilesToQuarantine();
|
||||||
// unit tests.
|
live_and_quar_files.insert(quar_files.begin(), quar_files.end());
|
||||||
return;
|
}
|
||||||
}
|
auto fn = [&live_and_quar_files](const Slice& key, Cache::ObjectPtr, size_t,
|
||||||
|
const Cache::CacheItemHelper*) {
|
||||||
// See TableCache and BlobFileCache
|
// See TableCache and BlobFileCache
|
||||||
assert(key.size() == sizeof(uint64_t));
|
assert(key.size() == sizeof(uint64_t));
|
||||||
uint64_t file_number;
|
uint64_t file_number;
|
||||||
GetUnaligned(reinterpret_cast<const uint64_t*>(key.data()), &file_number);
|
GetUnaligned(reinterpret_cast<const uint64_t*>(key.data()), &file_number);
|
||||||
// Assert file is in sorted live_files
|
// Assert file is in live/quarantined set
|
||||||
assert(
|
assert(live_and_quar_files.find(file_number) != live_and_quar_files.end());
|
||||||
std::binary_search(live_files.begin(), live_files.end(), file_number));
|
|
||||||
};
|
};
|
||||||
table_cache_->ApplyToAllEntries(fn, {});
|
table_cache_->ApplyToAllEntries(fn, {});
|
||||||
}
|
}
|
||||||
|
|
|
@ -449,14 +449,8 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
|
||||||
// File is being deleted (actually obsolete)
|
// File is being deleted (actually obsolete)
|
||||||
auto number = file.metadata->fd.GetNumber();
|
auto number = file.metadata->fd.GetNumber();
|
||||||
candidate_files.emplace_back(MakeTableFileName(number), file.path);
|
candidate_files.emplace_back(MakeTableFileName(number), file.path);
|
||||||
if (handle == nullptr) {
|
TableCache::ReleaseObsolete(table_cache_.get(), number, handle,
|
||||||
// For files not "pinned" in table cache
|
file.uncache_aggressiveness);
|
||||||
handle = TableCache::Lookup(table_cache_.get(), number);
|
|
||||||
}
|
|
||||||
if (handle) {
|
|
||||||
TableCache::ReleaseObsolete(table_cache_.get(), handle,
|
|
||||||
file.uncache_aggressiveness);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
file.DeleteMetadata();
|
file.DeleteMetadata();
|
||||||
}
|
}
|
||||||
|
@ -572,9 +566,17 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
|
||||||
case kTableFile:
|
case kTableFile:
|
||||||
// If the second condition is not there, this makes
|
// If the second condition is not there, this makes
|
||||||
// DontDeletePendingOutputs fail
|
// DontDeletePendingOutputs fail
|
||||||
|
// FIXME: but should NOT keep if it came from sst_delete_files?
|
||||||
keep = (sst_live_set.find(number) != sst_live_set.end()) ||
|
keep = (sst_live_set.find(number) != sst_live_set.end()) ||
|
||||||
number >= state.min_pending_output;
|
number >= state.min_pending_output;
|
||||||
if (!keep) {
|
if (!keep) {
|
||||||
|
// NOTE: sometimes redundant (if came from sst_delete_files)
|
||||||
|
// We don't know which column family is applicable here so we don't
|
||||||
|
// know what uncache_aggressiveness would be used with
|
||||||
|
// ReleaseObsolete(). Anyway, obsolete files ideally go into
|
||||||
|
// sst_delete_files for better/quicker handling, and this is just a
|
||||||
|
// backstop.
|
||||||
|
TableCache::Evict(table_cache_.get(), number);
|
||||||
files_to_del.insert(number);
|
files_to_del.insert(number);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -726,13 +726,19 @@ uint64_t TableCache::ApproximateSize(
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
void TableCache::ReleaseObsolete(Cache* cache, Cache::Handle* h,
|
void TableCache::ReleaseObsolete(Cache* cache, uint64_t file_number,
|
||||||
|
Cache::Handle* h,
|
||||||
uint32_t uncache_aggressiveness) {
|
uint32_t uncache_aggressiveness) {
|
||||||
CacheInterface typed_cache(cache);
|
CacheInterface typed_cache(cache);
|
||||||
TypedHandle* table_handle = reinterpret_cast<TypedHandle*>(h);
|
TypedHandle* table_handle = reinterpret_cast<TypedHandle*>(h);
|
||||||
TableReader* table_reader = typed_cache.Value(table_handle);
|
if (table_handle == nullptr) {
|
||||||
table_reader->MarkObsolete(uncache_aggressiveness);
|
table_handle = typed_cache.Lookup(GetSliceForFileNumber(&file_number));
|
||||||
typed_cache.ReleaseAndEraseIfLastRef(table_handle);
|
}
|
||||||
|
if (table_handle != nullptr) {
|
||||||
|
TableReader* table_reader = typed_cache.Value(table_handle);
|
||||||
|
table_reader->MarkObsolete(uncache_aggressiveness);
|
||||||
|
typed_cache.ReleaseAndEraseIfLastRef(table_handle);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
|
@ -159,12 +159,14 @@ class TableCache {
|
||||||
bool skip_range_deletions = false, int level = -1,
|
bool skip_range_deletions = false, int level = -1,
|
||||||
TypedHandle* table_handle = nullptr);
|
TypedHandle* table_handle = nullptr);
|
||||||
|
|
||||||
// Evict any entry for the specified file number
|
// Evict any entry for the specified file number. ReleaseObsolete() is
|
||||||
|
// preferred for cleaning up from obsolete files.
|
||||||
static void Evict(Cache* cache, uint64_t file_number);
|
static void Evict(Cache* cache, uint64_t file_number);
|
||||||
|
|
||||||
// Handles releasing, erasing, etc. of what should be the last reference
|
// Handles releasing, erasing, etc. of what should be the last reference
|
||||||
// to an obsolete file.
|
// to an obsolete file. `handle` may be nullptr if no prior handle is known.
|
||||||
static void ReleaseObsolete(Cache* cache, Cache::Handle* handle,
|
static void ReleaseObsolete(Cache* cache, uint64_t file_number,
|
||||||
|
Cache::Handle* handle,
|
||||||
uint32_t uncache_aggressiveness);
|
uint32_t uncache_aggressiveness);
|
||||||
|
|
||||||
// Return handle to an existing cache entry if there is one
|
// Return handle to an existing cache entry if there is one
|
||||||
|
|
|
@ -5204,18 +5204,16 @@ VersionSet::~VersionSet() {
|
||||||
column_family_set_.reset();
|
column_family_set_.reset();
|
||||||
|
|
||||||
for (auto& file : obsolete_files_) {
|
for (auto& file : obsolete_files_) {
|
||||||
if (file.metadata->table_reader_handle) {
|
// NOTE: DB is shutting down, so file is probably not obsolete, just
|
||||||
// NOTE: DB is shutting down, so file is probably not obsolete, just
|
// no longer referenced by Versions in memory.
|
||||||
// no longer referenced by Versions in memory.
|
// For more context, see comment on "table_cache_->EraseUnRefEntries()"
|
||||||
// For more context, see comment on "table_cache_->EraseUnRefEntries()"
|
// in DBImpl::CloseHelper().
|
||||||
// in DBImpl::CloseHelper().
|
// Using uncache_aggressiveness=0 overrides any previous marking to
|
||||||
// Using uncache_aggressiveness=0 overrides any previous marking to
|
// attempt to uncache the file's blocks (which after cleaning up
|
||||||
// attempt to uncache the file's blocks (which after cleaning up
|
// column families could cause use-after-free)
|
||||||
// column families could cause use-after-free)
|
TableCache::ReleaseObsolete(table_cache_, file.metadata->fd.GetNumber(),
|
||||||
TableCache::ReleaseObsolete(table_cache_,
|
file.metadata->table_reader_handle,
|
||||||
file.metadata->table_reader_handle,
|
/*uncache_aggressiveness=*/0);
|
||||||
/*uncache_aggressiveness=*/0);
|
|
||||||
}
|
|
||||||
file.DeleteMetadata();
|
file.DeleteMetadata();
|
||||||
}
|
}
|
||||||
obsolete_files_.clear();
|
obsolete_files_.clear();
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
* Fix leaks of some open SST files (until `DB::Close()`) that are written but never become live due to various failures. (We now have a check for such leaks with no outstanding issues.)
|
Loading…
Reference in New Issue