mirror of https://github.com/facebook/rocksdb.git
Blob DB: Fix race condition between flush and write
Summary: A race condition will happen when: * a user thread writes a value, but it hits the write stop condition because there are too many un-flushed memtables, while holding blob_db_impl.write_mutex_. * Flush is triggered and call flush begin listener and try to acquire blob_db_impl.write_mutex_. Fixing it. Closes https://github.com/facebook/rocksdb/pull/3149 Differential Revision: D6279805 Pulled By: yiwu-arbug fbshipit-source-id: 0e3c58afb78795ebe3360a2c69e05651e3908c40
This commit is contained in:
parent
ca75f0a64a
commit
5e9e5a4702
|
@ -745,13 +745,22 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
|
|||
};
|
||||
|
||||
Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
|
||||
MutexLock l(&write_mutex_);
|
||||
|
||||
uint32_t default_cf_id =
|
||||
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
||||
// TODO(yiwu): In case there are multiple writers the latest sequence would
|
||||
// not be the actually sequence we are writting. Need to get the sequence
|
||||
// from write batch after DB write instead.
|
||||
SequenceNumber current_seq = GetLatestSequenceNumber() + 1;
|
||||
Status s;
|
||||
BlobInserter blob_inserter(options, this, default_cf_id, current_seq);
|
||||
Status s = updates->Iterate(&blob_inserter);
|
||||
{
|
||||
// Release write_mutex_ before DB write to avoid race condition with
|
||||
// flush begin listener, which also require write_mutex_ to sync
|
||||
// blob files.
|
||||
MutexLock l(&write_mutex_);
|
||||
s = updates->Iterate(&blob_inserter);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -759,7 +768,6 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
|
|||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
assert(blob_inserter.sequence() == GetLatestSequenceNumber() + 1);
|
||||
|
||||
// add deleted key to list of keys that have been deleted for book-keeping
|
||||
class DeleteBookkeeper : public WriteBatch::Handler {
|
||||
|
@ -849,10 +857,19 @@ Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
|
|||
Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
|
||||
const Slice& value, uint64_t expiration) {
|
||||
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
|
||||
MutexLock l(&write_mutex_);
|
||||
SequenceNumber sequence = GetLatestSequenceNumber() + 1;
|
||||
Status s;
|
||||
WriteBatch batch;
|
||||
Status s = PutBlobValue(options, key, value, expiration, sequence, &batch);
|
||||
{
|
||||
// Release write_mutex_ before DB write to avoid race condition with
|
||||
// flush begin listener, which also require write_mutex_ to sync
|
||||
// blob files.
|
||||
MutexLock l(&write_mutex_);
|
||||
// TODO(yiwu): In case there are multiple writers the latest sequence would
|
||||
// not be the actually sequence we are writting. Need to get the sequence
|
||||
// from write batch after DB write instead.
|
||||
SequenceNumber sequence = GetLatestSequenceNumber() + 1;
|
||||
s = PutBlobValue(options, key, value, expiration, sequence, &batch);
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = db_->Write(options, &batch);
|
||||
}
|
||||
|
@ -1198,8 +1215,6 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
|||
return Status::Corruption("Corruption. Blob CRC mismatch");
|
||||
}
|
||||
|
||||
// TODO(yiwu): Should use compression flag in the blob file instead of
|
||||
// current compression option.
|
||||
if (bfile->compression() != kNoCompression) {
|
||||
BlockContents contents;
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
||||
|
|
Loading…
Reference in New Issue