mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-27 11:43:49 +00:00
228f49d20a
Summary: This fixes the tsan build failures in: - write_callback_test - persistent_cache_test.* Closes https://github.com/facebook/rocksdb/pull/2339 Differential Revision: D5101190 Pulled By: sagar0 fbshipit-source-id: 537e19ed05272b1f34cfbf793aa822b2264a1643
159 lines
5.2 KiB
C++
159 lines
5.2 KiB
C++
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
// This source code is also licensed under the GPLv2 license found in the
|
|
// COPYING file in the root directory of this source tree.
|
|
#pragma once
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#ifndef OS_WIN
|
|
#include <unistd.h>
|
|
#endif // ! OS_WIN
|
|
|
|
#include <atomic>
|
|
#include <list>
|
|
#include <memory>
|
|
#include <set>
|
|
#include <sstream>
|
|
#include <stdexcept>
|
|
#include <string>
|
|
#include <thread>
|
|
|
|
#include "rocksdb/cache.h"
|
|
#include "rocksdb/comparator.h"
|
|
#include "rocksdb/persistent_cache.h"
|
|
|
|
#include "utilities/persistent_cache/block_cache_tier_file.h"
|
|
#include "utilities/persistent_cache/block_cache_tier_metadata.h"
|
|
#include "utilities/persistent_cache/persistent_cache_util.h"
|
|
|
|
#include "memtable/skiplist.h"
|
|
#include "monitoring/histogram.h"
|
|
#include "port/port.h"
|
|
#include "util/arena.h"
|
|
#include "util/coding.h"
|
|
#include "util/crc32c.h"
|
|
#include "util/mutexlock.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
//
|
|
// Block cache tier implementation
|
|
//
|
|
class BlockCacheTier : public PersistentCacheTier {
|
|
public:
|
|
explicit BlockCacheTier(const PersistentCacheConfig& opt)
|
|
: opt_(opt),
|
|
insert_ops_(opt_.max_write_pipeline_backlog_size),
|
|
buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()),
|
|
writer_(this, opt_.writer_qdepth, opt_.writer_dispatch_size) {
|
|
Info(opt_.log, "Initializing allocator. size=%d B count=%d",
|
|
opt_.write_buffer_size, opt_.write_buffer_count());
|
|
}
|
|
|
|
virtual ~BlockCacheTier() {
|
|
// Close is re-entrant so we can call close even if it is already closed
|
|
Close();
|
|
assert(!insert_th_.joinable());
|
|
}
|
|
|
|
Status Insert(const Slice& key, const char* data, const size_t size) override;
|
|
Status Lookup(const Slice& key, std::unique_ptr<char[]>* data,
|
|
size_t* size) override;
|
|
Status Open() override;
|
|
Status Close() override;
|
|
bool Erase(const Slice& key) override;
|
|
bool Reserve(const size_t size) override;
|
|
|
|
bool IsCompressed() override { return opt_.is_compressed; }
|
|
|
|
std::string GetPrintableOptions() const override { return opt_.ToString(); }
|
|
|
|
PersistentCache::StatsType Stats() override;
|
|
|
|
void TEST_Flush() override {
|
|
while (insert_ops_.Size()) {
|
|
/* sleep override */
|
|
Env::Default()->SleepForMicroseconds(1000000);
|
|
}
|
|
}
|
|
|
|
private:
|
|
// Percentage of cache to be evicted when the cache is full
|
|
static const size_t kEvictPct = 10;
|
|
// Max attempts to insert key, value to cache in pipelined mode
|
|
static const size_t kMaxRetry = 3;
|
|
|
|
// Pipelined operation
|
|
struct InsertOp {
|
|
explicit InsertOp(const bool signal) : signal_(signal) {}
|
|
explicit InsertOp(std::string&& key, const std::string& data)
|
|
: key_(std::move(key)), data_(data) {}
|
|
~InsertOp() {}
|
|
|
|
InsertOp() = delete;
|
|
InsertOp(InsertOp&& rhs) = default;
|
|
InsertOp& operator=(InsertOp&& rhs) = default;
|
|
|
|
// used for estimating size by bounded queue
|
|
size_t Size() { return data_.size() + key_.size(); }
|
|
|
|
std::string key_;
|
|
std::string data_;
|
|
const bool signal_ = false; // signal to request processing thread to exit
|
|
};
|
|
|
|
// entry point for insert thread
|
|
void InsertMain();
|
|
// insert implementation
|
|
Status InsertImpl(const Slice& key, const Slice& data);
|
|
// Create a new cache file
|
|
Status NewCacheFile();
|
|
// Get cache directory path
|
|
std::string GetCachePath() const { return opt_.path + "/cache"; }
|
|
// Cleanup folder
|
|
Status CleanupCacheFolder(const std::string& folder);
|
|
|
|
// Statistics
|
|
struct Statistics {
|
|
HistogramImpl bytes_pipelined_;
|
|
HistogramImpl bytes_written_;
|
|
HistogramImpl bytes_read_;
|
|
HistogramImpl read_hit_latency_;
|
|
HistogramImpl read_miss_latency_;
|
|
HistogramImpl write_latency_;
|
|
std::atomic<uint64_t> cache_hits_{0};
|
|
std::atomic<uint64_t> cache_misses_{0};
|
|
std::atomic<uint64_t> cache_errors_{0};
|
|
std::atomic<uint64_t> insert_dropped_{0};
|
|
|
|
double CacheHitPct() const {
|
|
const auto lookups = cache_hits_ + cache_misses_;
|
|
return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0;
|
|
}
|
|
|
|
double CacheMissPct() const {
|
|
const auto lookups = cache_hits_ + cache_misses_;
|
|
return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0;
|
|
}
|
|
};
|
|
|
|
port::RWMutex lock_; // Synchronization
|
|
const PersistentCacheConfig opt_; // BlockCache options
|
|
BoundedQueue<InsertOp> insert_ops_; // Ops waiting for insert
|
|
rocksdb::port::Thread insert_th_; // Insert thread
|
|
uint32_t writer_cache_id_ = 0; // Current cache file identifier
|
|
WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference
|
|
CacheWriteBufferAllocator buffer_allocator_; // Buffer provider
|
|
ThreadedWriter writer_; // Writer threads
|
|
BlockCacheTierMetadata metadata_; // Cache meta data manager
|
|
std::atomic<uint64_t> size_{0}; // Size of the cache
|
|
Statistics stats_; // Statistics
|
|
};
|
|
|
|
} // namespace rocksdb
|
|
|
|
#endif
|