add frontend threadpool to database

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-11-27 06:28:32 +00:00
parent 94d7b21cf0
commit c7ae951676
10 changed files with 362 additions and 57 deletions

51
Cargo.lock generated
View file

@ -92,6 +92,18 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f093eed78becd229346bf859eec0aa4dd7ddde0757287b2b4107a1f09c80002"
[[package]]
name = "async-channel"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a"
dependencies = [
"concurrent-queue",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-compression"
version = "0.4.18"
@ -597,6 +609,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b"
[[package]]
name = "concurrent-queue"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "conduit"
version = "0.5.0"
@ -736,6 +757,7 @@ name = "conduit_database"
version = "0.5.0"
dependencies = [
"arrayvec",
"async-channel",
"conduit_core",
"const-str",
"futures",
@ -1219,6 +1241,27 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "event-listener"
version = "5.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1"
dependencies = [
"event-listener",
"pin-project-lite",
]
[[package]]
name = "fdeflate"
version = "0.3.6"
@ -2151,7 +2194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.48.5",
"windows-targets 0.52.6",
]
[[package]]
@ -2589,6 +2632,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
[[package]]
name = "parking_lot"
version = "0.12.3"

View file

@ -307,10 +307,14 @@ version = "0.13.0"
[workspace.dependencies.cyborgtime]
version = "2.1.1"
# used to replace the channels of the tokio runtime
# used for MPSC channels
[workspace.dependencies.loole]
version = "0.4.0"
# used for MPMC channels
[workspace.dependencies.async-channel]
version = "2.3.1"
[workspace.dependencies.async-trait]
version = "0.1.83"

View file

@ -36,6 +36,7 @@ zstd_compression = [
[dependencies]
arrayvec.workspace = true
async-channel.workspace = true
conduit-core.workspace = true
const-str.workspace = true
futures.workspace = true

View file

@ -16,7 +16,9 @@ use rocksdb::{
use crate::{
opts::{cf_options, db_options},
or_else, result,
or_else, pool,
pool::Pool,
result,
util::map_err,
};
@ -31,6 +33,7 @@ pub struct Engine {
corks: AtomicU32,
pub(super) read_only: bool,
pub(super) secondary: bool,
pub(crate) pool: Arc<Pool>,
}
pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
@ -111,6 +114,7 @@ impl Engine {
corks: AtomicU32::new(0),
read_only: config.rocksdb_read_only,
secondary: config.rocksdb_secondary,
pool: Pool::new(&pool::Opts::default())?,
}))
}
@ -316,6 +320,9 @@ impl Drop for Engine {
fn drop(&mut self) {
const BLOCKING: bool = true;
debug!("Joining request threads...");
self.pool.close();
debug!("Waiting for background tasks to finish...");
self.db.cancel_all_background_work(BLOCKING);

View file

@ -27,7 +27,7 @@ use std::{
};
use conduit::Result;
use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, WriteOptions};
use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, ReadTier, WriteOptions};
use crate::{watchers::Watchers, Engine};
@ -38,6 +38,7 @@ pub struct Map {
watchers: Watchers,
write_options: WriteOptions,
read_options: ReadOptions,
cache_read_options: ReadOptions,
}
impl Map {
@ -49,6 +50,7 @@ impl Map {
watchers: Watchers::default(),
write_options: write_options_default(),
read_options: read_options_default(),
cache_read_options: cache_read_options_default(),
}))
}
@ -112,6 +114,13 @@ fn open(db: &Arc<Engine>, name: &str) -> Result<Arc<ColumnFamily>> {
})
}
#[inline]
fn cache_read_options_default() -> ReadOptions {
let mut read_options = read_options_default();
read_options.set_read_tier(ReadTier::BlockCache);
read_options
}
#[inline]
fn read_options_default() -> ReadOptions {
let mut read_options = ReadOptions::default();

View file

@ -1,17 +1,21 @@
use std::{convert::AsRef, fmt::Debug, future::Future, io::Write};
use std::{convert::AsRef, fmt::Debug, future::Future, io::Write, sync::Arc};
use arrayvec::ArrayVec;
use conduit::{implement, utils::TryFutureExtExt, Err, Result};
use futures::future::ready;
use conduit::{
err, implement,
utils::{future::TryExtExt, result::FlatOk},
Result,
};
use futures::FutureExt;
use serde::Serialize;
use crate::{ser, util};
use crate::ser;
/// Returns true if the map contains the key.
/// - key is serialized into allocated buffer
/// - harder errors may not be reported
#[implement(super::Map)]
pub fn contains<K>(&self, key: &K) -> impl Future<Output = bool> + Send
pub fn contains<K>(self: &Arc<Self>, key: &K) -> impl Future<Output = bool> + Send + '_
where
K: Serialize + ?Sized + Debug,
{
@ -23,7 +27,7 @@ where
/// - key is serialized into stack-buffer
/// - harder errors will panic
#[implement(super::Map)]
pub fn acontains<const MAX: usize, K>(&self, key: &K) -> impl Future<Output = bool> + Send
pub fn acontains<const MAX: usize, K>(self: &Arc<Self>, key: &K) -> impl Future<Output = bool> + Send + '_
where
K: Serialize + ?Sized + Debug,
{
@ -36,7 +40,7 @@ where
/// - harder errors will panic
#[implement(super::Map)]
#[tracing::instrument(skip(self, buf), fields(%self), level = "trace")]
pub fn bcontains<K, B>(&self, key: &K, buf: &mut B) -> impl Future<Output = bool> + Send
pub fn bcontains<K, B>(self: &Arc<Self>, key: &K, buf: &mut B) -> impl Future<Output = bool> + Send + '_
where
K: Serialize + ?Sized + Debug,
B: Write + AsRef<[u8]>,
@ -48,41 +52,36 @@ where
/// Returns Ok if the map contains the key.
/// - key is raw
#[implement(super::Map)]
pub fn exists<K>(&self, key: &K) -> impl Future<Output = Result<()>> + Send
pub fn exists<'a, K>(self: &'a Arc<Self>, key: &K) -> impl Future<Output = Result> + Send + 'a
where
K: AsRef<[u8]> + ?Sized + Debug,
K: AsRef<[u8]> + ?Sized + Debug + 'a,
{
ready(self.exists_blocking(key))
self.get(key).map(|res| res.map(|_| ()))
}
/// Returns Ok if the map contains the key; NotFound otherwise. Harder errors
/// may not always be reported properly.
#[implement(super::Map)]
#[tracing::instrument(skip(self, key), fields(%self), level = "trace")]
pub fn exists_blocking<K>(&self, key: &K) -> Result<()>
pub fn exists_blocking<K>(&self, key: &K) -> Result
where
K: AsRef<[u8]> + ?Sized + Debug,
{
if self.maybe_exists_blocking(key)
&& self
.db
.db
.get_pinned_cf_opt(&self.cf(), key, &self.read_options)
.map_err(util::map_err)?
.is_some()
{
Ok(())
} else {
Err!(Request(NotFound("Not found in database")))
}
self.maybe_exists(key)
.then(|| self.get_blocking(key))
.flat_ok()
.map(|_| ())
.ok_or_else(|| err!(Request(NotFound("Not found in database"))))
}
/// Rocksdb limits this to kBlockCacheTier internally so this is not actually a
/// blocking call; in case that changes we set this as well in our read_options.
#[implement(super::Map)]
fn maybe_exists_blocking<K>(&self, key: &K) -> bool
pub(crate) fn maybe_exists<K>(&self, key: &K) -> bool
where
K: AsRef<[u8]> + ?Sized,
{
self.db
.db
.key_may_exist_cf_opt(&self.cf(), key, &self.read_options)
.key_may_exist_cf_opt(&self.cf(), key, &self.cache_read_options)
}

View file

@ -1,13 +1,16 @@
use std::{convert::AsRef, fmt::Debug, future::Future, io::Write};
use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
use arrayvec::ArrayVec;
use conduit::{err, implement, utils::IterStream, Result};
use futures::{FutureExt, Stream};
use conduit::{err, implement, utils::IterStream, Err, Result};
use futures::{future, Future, FutureExt, Stream};
use rocksdb::DBPinnableSlice;
use serde::Serialize;
use tokio::task;
use crate::{ser, util, Handle};
use crate::{
ser,
util::{is_incomplete, map_err, or_else},
Handle,
};
type RocksdbResult<'a> = Result<Option<DBPinnableSlice<'a>>, rocksdb::Error>;
@ -15,7 +18,7 @@ type RocksdbResult<'a> = Result<Option<DBPinnableSlice<'a>>, rocksdb::Error>;
/// asynchronously. The key is serialized into an allocated buffer to perform
/// the query.
#[implement(super::Map)]
pub fn qry<K>(&self, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
pub fn qry<K>(self: &Arc<Self>, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
{
@ -27,7 +30,7 @@ where
/// asynchronously. The key is serialized into a fixed-sized buffer to perform
/// the query. The maximum size is supplied as const generic parameter.
#[implement(super::Map)]
pub fn aqry<const MAX: usize, K>(&self, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
pub fn aqry<const MAX: usize, K>(self: &Arc<Self>, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
{
@ -39,7 +42,7 @@ where
/// asynchronously. The key is serialized into a user-supplied Writer.
#[implement(super::Map)]
#[tracing::instrument(skip(self, buf), level = "trace")]
pub fn bqry<K, B>(&self, key: &K, buf: &mut B) -> impl Future<Output = Result<Handle<'_>>> + Send
pub fn bqry<K, B>(self: &Arc<Self>, key: &K, buf: &mut B) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
B: Write + AsRef<[u8]>,
@ -52,28 +55,28 @@ where
/// asynchronously. The key is referenced directly to perform the query.
#[implement(super::Map)]
#[tracing::instrument(skip(self, key), fields(%self), level = "trace")]
pub fn get<K>(&self, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
pub fn get<K>(self: &Arc<Self>, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: AsRef<[u8]> + ?Sized + Debug,
K: AsRef<[u8]> + Debug + ?Sized,
{
let result = self.get_blocking(key);
task::consume_budget().map(move |()| result)
}
use crate::pool::{Cmd, Get};
/// Fetch a value from the database into cache, returning a reference-handle.
/// The key is referenced directly to perform the query. This is a thread-
/// blocking call.
#[implement(super::Map)]
pub fn get_blocking<K>(&self, key: &K) -> Result<Handle<'_>>
where
K: AsRef<[u8]> + ?Sized,
{
let res = self
.db
.db
.get_pinned_cf_opt(&self.cf(), key, &self.read_options);
let cached = self.get_cached(key);
if matches!(cached, Err(_) | Ok(Some(_))) {
return future::ready(cached.map(|res| res.expect("Option is Some"))).boxed();
}
into_result_handle(res)
debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete");
let cmd = Cmd::Get(Get {
map: self.clone(),
res: None,
key: key
.as_ref()
.try_into()
.expect("failed to copy key into buffer"),
});
self.db.pool.execute(cmd).boxed()
}
#[implement(super::Map)]
@ -104,9 +107,52 @@ where
.map(into_result_handle)
}
/// Fetch a value from the database into cache, returning a reference-handle.
/// The key is referenced directly to perform the query. This is a thread-
/// blocking call.
#[implement(super::Map)]
pub fn get_blocking<K>(&self, key: &K) -> Result<Handle<'_>>
where
K: AsRef<[u8]> + ?Sized,
{
let res = self
.db
.db
.get_pinned_cf_opt(&self.cf(), key, &self.read_options);
into_result_handle(res)
}
/// Fetch a value from the cache without I/O.
#[implement(super::Map)]
#[tracing::instrument(skip(self, key), fields(%self), level = "trace")]
pub(crate) fn get_cached<K>(&self, key: &K) -> Result<Option<Handle<'_>>>
where
K: AsRef<[u8]> + Debug + ?Sized,
{
let res = self
.db
.db
.get_pinned_cf_opt(&self.cf(), key, &self.cache_read_options);
match res {
// cache hit; not found
Ok(None) => Err!(Request(NotFound("Not found in database"))),
// cache hit; value found
Ok(Some(res)) => Ok(Some(Handle::from(res))),
// cache miss; unknown
Err(e) if is_incomplete(&e) => Ok(None),
// some other error occurred
Err(e) => or_else(e),
}
}
fn into_result_handle(result: RocksdbResult<'_>) -> Result<Handle<'_>> {
result
.map_err(util::map_err)?
.map_err(map_err)?
.map(Handle::from)
.ok_or(err!(Request(NotFound("Not found in database"))))
}

View file

@ -8,6 +8,7 @@ pub mod keyval;
mod map;
pub mod maps;
mod opts;
mod pool;
mod ser;
mod stream;
mod tests;

186
src/database/pool.rs Normal file
View file

@ -0,0 +1,186 @@
use std::{
convert::identity,
mem::take,
sync::{Arc, Mutex},
thread::JoinHandle,
};
use arrayvec::ArrayVec;
use async_channel::{bounded, Receiver, Sender};
use conduit::{debug, defer, err, implement, Result};
use futures::channel::oneshot;
use crate::{Handle, Map};
pub(crate) struct Pool {
workers: Mutex<Vec<JoinHandle<()>>>,
recv: Receiver<Cmd>,
send: Sender<Cmd>,
}
#[derive(Default)]
pub(crate) struct Opts {
queue_size: Option<usize>,
worker_num: Option<usize>,
}
const WORKER_THREAD_NAME: &str = "conduwuit:db";
const DEFAULT_QUEUE_SIZE: usize = 1024;
const DEFAULT_WORKER_NUM: usize = 32;
const KEY_MAX_BYTES: usize = 384;
#[derive(Debug)]
pub(crate) enum Cmd {
Get(Get),
}
#[derive(Debug)]
pub(crate) struct Get {
pub(crate) map: Arc<Map>,
pub(crate) key: ArrayVec<u8, KEY_MAX_BYTES>,
pub(crate) res: Option<oneshot::Sender<Result<Handle<'static>>>>,
}
#[implement(Pool)]
pub(crate) fn new(opts: &Opts) -> Result<Arc<Self>> {
let queue_size = opts.queue_size.unwrap_or(DEFAULT_QUEUE_SIZE);
let (send, recv) = bounded(queue_size);
let pool = Arc::new(Self {
workers: Vec::new().into(),
recv,
send,
});
let worker_num = opts.worker_num.unwrap_or(DEFAULT_WORKER_NUM);
pool.spawn_until(worker_num)?;
Ok(pool)
}
#[implement(Pool)]
fn spawn_until(self: &Arc<Self>, max: usize) -> Result {
let mut workers = self.workers.lock()?;
while workers.len() < max {
self.clone().spawn_one(&mut workers)?;
}
Ok(())
}
#[implement(Pool)]
fn spawn_one(self: Arc<Self>, workers: &mut Vec<JoinHandle<()>>) -> Result<usize> {
use std::thread::Builder;
let id = workers.len();
debug!(?id, "spawning {WORKER_THREAD_NAME}...");
let thread = Builder::new()
.name(WORKER_THREAD_NAME.into())
.spawn(move || self.worker(id))?;
workers.push(thread);
Ok(id)
}
#[implement(Pool)]
pub(crate) fn close(self: &Arc<Self>) {
debug!(
senders = %self.send.sender_count(),
receivers = %self.send.receiver_count(),
"Closing pool channel"
);
let closing = self.send.close();
debug_assert!(closing, "channel is not closing");
debug!("Shutting down pool...");
let mut workers = self.workers.lock().expect("locked");
debug!(
workers = %workers.len(),
"Waiting for workers to join..."
);
take(&mut *workers)
.into_iter()
.map(JoinHandle::join)
.try_for_each(identity)
.expect("failed to join worker threads");
debug_assert!(self.send.is_empty(), "channel is not empty");
}
#[implement(Pool)]
#[tracing::instrument(skip(self, cmd), level = "trace")]
pub(crate) async fn execute(&self, mut cmd: Cmd) -> Result<Handle<'_>> {
let (send, recv) = oneshot::channel();
match &mut cmd {
Cmd::Get(ref mut cmd) => {
_ = cmd.res.insert(send);
},
};
self.send
.send(cmd)
.await
.map_err(|e| err!(error!("send failed {e:?}")))?;
recv.await
.map(into_recv_result)
.map_err(|e| err!(error!("recv failed {e:?}")))?
}
#[implement(Pool)]
#[tracing::instrument(skip(self))]
fn worker(self: Arc<Self>, id: usize) {
debug!(?id, "worker spawned");
defer! {{ debug!(?id, "worker finished"); }}
self.worker_loop(id);
}
#[implement(Pool)]
fn worker_loop(&self, id: usize) {
while let Ok(mut cmd) = self.recv.recv_blocking() {
self.handle(id, &mut cmd);
}
}
#[implement(Pool)]
fn handle(&self, id: usize, cmd: &mut Cmd) {
match cmd {
Cmd::Get(get) => self.handle_get(id, get),
}
}
#[implement(Pool)]
#[tracing::instrument(skip(self, cmd), fields(%cmd.map), level = "trace")]
fn handle_get(&self, id: usize, cmd: &mut Get) {
let chan = cmd.res.take().expect("missing result channel");
// If the future was dropped while the command was queued then we can bail
// without any query. This makes it more efficient to use select() variants and
// pessimistic parallel queries.
if chan.is_canceled() {
return;
}
let result = cmd.map.get_blocking(&cmd.key);
let _sent = chan.send(into_send_result(result)).is_ok();
}
fn into_send_result(result: Result<Handle<'_>>) -> Result<Handle<'static>> {
// SAFETY: Necessary to send the Handle (rust_rocksdb::PinnableSlice) through
// the channel. The lifetime on the handle is a device by rust-rocksdb to
// associate a database lifetime with its assets, not a function of rocksdb or
// the asset. The Handle must be dropped before the database is dropped. The
// handle must pass through recv_handle() on the other end of the channel.
unsafe { std::mem::transmute(result) }
}
fn into_recv_result(result: Result<Handle<'static>>) -> Result<Handle<'_>> {
// SAFETY: This is to receive the Handle from the channel. Previously it had
// passed through send_handle().
unsafe { std::mem::transmute(result) }
}

View file

@ -1,5 +1,5 @@
use conduit::{err, Result};
use rocksdb::{Direction, IteratorMode};
use rocksdb::{Direction, ErrorKind, IteratorMode};
//#[cfg(debug_assertions)]
macro_rules! unhandled {
@ -45,6 +45,9 @@ pub(crate) fn and_then<T>(t: T) -> Result<T, conduit::Error> { Ok(t) }
pub(crate) fn or_else<T>(e: rocksdb::Error) -> Result<T, conduit::Error> { Err(map_err(e)) }
#[inline]
pub(crate) fn is_incomplete(e: &rocksdb::Error) -> bool { e.kind() == ErrorKind::Incomplete }
pub(crate) fn map_err(e: rocksdb::Error) -> conduit::Error {
let string = e.into_string();
err!(Database(error!("{string}")))