From de21f7442a8600b08c4e44cb69d616a3656bc6ff Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 28 May 2024 06:59:50 +0000 Subject: [PATCH] devirtualize database Signed-off-by: Jason Volk --- src/database/cork.rs | 10 +- src/database/database.rs | 51 ++++ src/database/{rocksdb/mod.rs => engine.rs} | 152 +++++------ src/database/kvdatabase.rs | 302 --------------------- src/database/kvengine.rs | 38 --- src/database/kvtree.rs | 62 ----- src/database/map.rs | 242 +++++++++++++++++ src/database/maps.rs | 99 +++++++ src/database/mod.rs | 26 +- src/database/{rocksdb => }/opts.rs | 19 +- src/database/rocksdb/kvtree.rs | 227 ---------------- src/database/util.rs | 13 + 12 files changed, 497 insertions(+), 744 deletions(-) create mode 100644 src/database/database.rs rename src/database/{rocksdb/mod.rs => engine.rs} (63%) delete mode 100644 src/database/kvdatabase.rs delete mode 100644 src/database/kvengine.rs delete mode 100644 src/database/kvtree.rs create mode 100644 src/database/map.rs create mode 100644 src/database/maps.rs rename src/database/{rocksdb => }/opts.rs (96%) delete mode 100644 src/database/rocksdb/kvtree.rs create mode 100644 src/database/util.rs diff --git a/src/database/cork.rs b/src/database/cork.rs index dcabd707..faa3a491 100644 --- a/src/database/cork.rs +++ b/src/database/cork.rs @@ -1,16 +1,16 @@ use std::sync::Arc; -use super::KeyValueDatabaseEngine; +use crate::Engine; pub struct Cork { - db: Arc, + db: Arc, flush: bool, sync: bool, } impl Cork { - pub fn new(db: &Arc, flush: bool, sync: bool) -> Self { - db.cork().unwrap(); + pub fn new(db: &Arc, flush: bool, sync: bool) -> Self { + db.cork(); Self { db: db.clone(), flush, @@ -21,7 +21,7 @@ impl Cork { impl Drop for Cork { fn drop(&mut self) { - self.db.uncork().ok(); + self.db.uncork(); if self.flush { self.db.flush().ok(); } diff --git a/src/database/database.rs b/src/database/database.rs new file mode 100644 index 00000000..e995d987 --- /dev/null +++ b/src/database/database.rs @@ -0,0 +1,51 @@ +use std::{ + collections::{BTreeMap, HashMap}, + ops::Index, + sync::{Arc, Mutex, RwLock}, +}; + +use conduit::{PduCount, Result, Server}; +use lru_cache::LruCache; +use ruma::{CanonicalJsonValue, OwnedDeviceId, OwnedRoomId, OwnedUserId}; + +use crate::{maps, maps::Maps, Engine, Map}; + +pub struct Database { + pub db: Arc, + pub map: Maps, + + //TODO: not a database + pub userdevicesessionid_uiaarequest: RwLock>, + pub auth_chain_cache: Mutex, Arc<[u64]>>>, + pub appservice_in_room_cache: RwLock>>, + pub lasttimelinecount_cache: Mutex>, +} + +impl Database { + /// Load an existing database or create a new one. + pub async fn open(server: &Arc) -> Result { + let config = &server.config; + let db = Engine::open(server)?; + Ok(Self { + db: db.clone(), + map: maps::open(&db)?, + + userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()), + appservice_in_room_cache: RwLock::new(HashMap::new()), + lasttimelinecount_cache: Mutex::new(HashMap::new()), + auth_chain_cache: Mutex::new(LruCache::new( + (f64::from(config.auth_chain_cache_capacity) * config.conduit_cache_capacity_modifier) as usize, + )), + }) + } +} + +impl Index<&str> for Database { + type Output = Arc; + + fn index(&self, name: &str) -> &Self::Output { + self.map + .get(name) + .expect("column in database does not exist") + } +} diff --git a/src/database/rocksdb/mod.rs b/src/database/engine.rs similarity index 63% rename from src/database/rocksdb/mod.rs rename to src/database/engine.rs index 943b6745..396d327d 100644 --- a/src/database/rocksdb/mod.rs +++ b/src/database/engine.rs @@ -1,45 +1,36 @@ -// no_link to prevent double-inclusion of librocksdb.a here and with -// libconduit_core.so -#[no_link] -extern crate rust_rocksdb; - use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, fmt::Write, - sync::{atomic::AtomicU32, Arc}, + sync::{atomic::AtomicU32, Arc, Mutex, RwLock}, }; use chrono::{DateTime, Utc}; -use rust_rocksdb::{ +use conduit::{debug, error, info, warn, Result, Server}; +use rocksdb::{ backup::{BackupEngine, BackupEngineOptions}, perf::get_memory_usage_stats, - Cache, ColumnFamilyDescriptor, DBCommon, DBWithThreadMode as Db, Env, MultiThreaded, Options, + BoundColumnFamily, Cache, ColumnFamilyDescriptor, DBCommon, DBWithThreadMode as Db, Env, MultiThreaded, Options, }; -use tracing::{debug, error, info, warn}; -use crate::{watchers::Watchers, Config, KeyValueDatabaseEngine, KvTree, Result}; +use crate::{ + opts::{cf_options, db_options}, + or_else, result, +}; -pub(crate) mod kvtree; -pub(crate) mod opts; - -use kvtree::RocksDbEngineTree; -use opts::{cf_options, db_options}; - -use super::watchers; - -pub(crate) struct Engine { - config: Config, +pub struct Engine { + server: Arc, row_cache: Cache, - col_cache: HashMap, + col_cache: RwLock>, opts: Options, env: Env, - old_cfs: Vec, - rocks: Db, + cfs: Mutex>, + pub(crate) db: Db, corks: AtomicU32, } -impl KeyValueDatabaseEngine for Arc { - fn open(config: &Config) -> Result { +impl Engine { + pub(crate) fn open(server: &Arc) -> Result> { + let config = &server.config; let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0; #[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)] @@ -85,61 +76,64 @@ impl KeyValueDatabaseEngine for Arc { db.latest_sequence_number(), load_time.elapsed() ); - Ok(Self::new(Engine { - config: config.clone(), + + let cfs = HashSet::::from_iter(cfs); + Ok(Arc::new(Self { + server: server.clone(), row_cache, - col_cache, + col_cache: RwLock::new(col_cache), opts: db_opts, env: db_env, - old_cfs: cfs, - rocks: db, + cfs: Mutex::new(cfs), + db, corks: AtomicU32::new(0), })) } - fn open_tree(&self, name: &'static str) -> Result> { - if !self.old_cfs.contains(&name.to_owned()) { - // Create if it didn't exist + pub(crate) fn open_cf(&self, name: &str) -> Result>> { + let mut cfs = self.cfs.lock().expect("locked"); + if !cfs.contains(name) { debug!("Creating new column family in database: {}", name); - // TODO: the workaround for this needs to be extended to rocksdb caches, but i - // dont know that code to safely do that - #[allow(clippy::let_underscore_must_use)] - #[allow(clippy::let_underscore_untyped)] // attributes on expressions are experimental - let _ = self.rocks.create_cf(name, &self.opts); + let mut col_cache = self.col_cache.write().expect("locked"); + let opts = cf_options(&self.server.config, name, self.opts.clone(), &mut col_cache); + if let Err(e) = self.db.create_cf(name, &opts) { + error!("Failed to create new column family: {e}"); + return or_else(e); + } + + cfs.insert(name.to_owned()); } - Ok(Arc::new(RocksDbEngineTree { - name, - db: Self::clone(self), - watchers: Watchers::default(), - })) + Ok(self.cf(name)) } - fn flush(&self) -> Result<()> { result(DBCommon::flush_wal(&self.rocks, false)) } + pub(crate) fn cf<'db>(&'db self, name: &str) -> Arc> { + self.db + .cf_handle(name) + .expect("column was created and exists") + } - fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.rocks, true)) } + pub fn flush(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, false)) } - fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 } + pub fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, true)) } - fn cork(&self) -> Result<()> { + pub(crate) fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 } + + pub(crate) fn cork(&self) { self.corks .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - - Ok(()) } - fn uncork(&self) -> Result<()> { + pub(crate) fn uncork(&self) { self.corks .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); - - Ok(()) } #[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)] - fn memory_usage(&self) -> Result { + pub fn memory_usage(&self) -> Result { let mut res = String::new(); - let stats = get_memory_usage_stats(Some(&[&self.rocks]), Some(&[&self.row_cache])).or_else(or_else)?; + let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&self.row_cache])).or_else(or_else)?; writeln!( res, "Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow cache: {:.2} MiB", @@ -150,7 +144,7 @@ impl KeyValueDatabaseEngine for Arc { ) .expect("should be able to write to string buffer"); - for (name, cache) in &self.col_cache { + for (name, cache) in &*self.col_cache.read().expect("locked") { writeln!(res, "{} cache: {:.2} MiB", name, cache.get_usage() as f64 / 1024.0 / 1024.0,) .expect("should be able to write to string buffer"); } @@ -158,22 +152,23 @@ impl KeyValueDatabaseEngine for Arc { Ok(res) } - fn cleanup(&self) -> Result<()> { + pub fn cleanup(&self) -> Result<()> { debug!("Running flush_opt"); - let flushoptions = rust_rocksdb::FlushOptions::default(); - result(DBCommon::flush_opt(&self.rocks, &flushoptions)) + let flushoptions = rocksdb::FlushOptions::default(); + result(DBCommon::flush_opt(&self.db, &flushoptions)) } - fn backup(&self) -> Result<(), Box> { - let path = self.config.database_backup_path.as_ref(); + pub fn backup(&self) -> Result<(), Box> { + let config = &self.server.config; + let path = config.database_backup_path.as_ref(); if path.is_none() || path.is_some_and(|path| path.as_os_str().is_empty()) { return Ok(()); } let options = BackupEngineOptions::new(path.unwrap())?; let mut engine = BackupEngine::open(&options, &self.env)?; - if self.config.database_backups_to_keep > 0 { - if let Err(e) = engine.create_new_backup_flush(&self.rocks, true) { + if config.database_backups_to_keep > 0 { + if let Err(e) = engine.create_new_backup_flush(&self.db, true) { return Err(Box::new(e)); } @@ -185,8 +180,8 @@ impl KeyValueDatabaseEngine for Arc { ); } - if self.config.database_backups_to_keep >= 0 { - let keep = u32::try_from(self.config.database_backups_to_keep)?; + if config.database_backups_to_keep >= 0 { + let keep = u32::try_from(config.database_backups_to_keep)?; if let Err(e) = engine.purge_old_backups(keep.try_into()?) { error!("Failed to purge old backup: {:?}", e.to_string()); } @@ -195,8 +190,9 @@ impl KeyValueDatabaseEngine for Arc { Ok(()) } - fn backup_list(&self) -> Result { - let path = self.config.database_backup_path.as_ref(); + pub fn backup_list(&self) -> Result { + let config = &self.server.config; + let path = config.database_backup_path.as_ref(); if path.is_none() || path.is_some_and(|path| path.as_os_str().is_empty()) { return Ok( "Configure database_backup_path to enable backups, or the path specified is not valid".to_owned(), @@ -223,8 +219,8 @@ impl KeyValueDatabaseEngine for Arc { Ok(res) } - fn file_list(&self) -> Result { - match self.rocks.live_files() { + pub fn file_list(&self) -> Result { + match self.db.live_files() { Err(e) => Ok(String::from(e)), Ok(files) => { let mut res = String::new(); @@ -242,10 +238,6 @@ impl KeyValueDatabaseEngine for Arc { }, } } - - // TODO: figure out if this is needed for rocksdb - #[allow(dead_code)] - fn clear_caches(&self) {} } impl Drop for Engine { @@ -253,7 +245,7 @@ impl Drop for Engine { const BLOCKING: bool = true; debug!("Waiting for background tasks to finish..."); - self.rocks.cancel_all_background_work(BLOCKING); + self.db.cancel_all_background_work(BLOCKING); debug!("Shutting down background threads"); self.env.set_high_priority_background_threads(0); @@ -265,15 +257,3 @@ impl Drop for Engine { self.env.join_all_threads(); } } - -#[inline] -fn result(r: std::result::Result) -> Result { - r.map_or_else(or_else, and_then) -} - -#[inline(always)] -fn and_then(t: T) -> Result { Ok(t) } - -fn or_else(e: rust_rocksdb::Error) -> Result { Err(map_err(e)) } - -fn map_err(e: rust_rocksdb::Error) -> conduit::Error { conduit::Error::Database(e.into_string()) } diff --git a/src/database/kvdatabase.rs b/src/database/kvdatabase.rs deleted file mode 100644 index 4f5fbacd..00000000 --- a/src/database/kvdatabase.rs +++ /dev/null @@ -1,302 +0,0 @@ -use std::{ - collections::{BTreeMap, HashMap}, - path::Path, - sync::{Arc, Mutex, RwLock}, -}; - -use conduit::{Config, Error, PduCount, Result, Server}; -use lru_cache::LruCache; -use ruma::{CanonicalJsonValue, OwnedDeviceId, OwnedRoomId, OwnedUserId}; -use tracing::debug; - -use crate::{KeyValueDatabaseEngine, KvTree}; - -pub struct KeyValueDatabase { - pub db: Arc, - - //pub globals: globals::Globals, - pub global: Arc, - pub server_signingkeys: Arc, - - pub roomid_inviteviaservers: Arc, - - //pub users: users::Users, - pub userid_password: Arc, - pub userid_displayname: Arc, - pub userid_avatarurl: Arc, - pub userid_blurhash: Arc, - pub userdeviceid_token: Arc, - pub userdeviceid_metadata: Arc, // This is also used to check if a device exists - pub userid_devicelistversion: Arc, // DevicelistVersion = u64 - pub token_userdeviceid: Arc, - - pub onetimekeyid_onetimekeys: Arc, // OneTimeKeyId = UserId + DeviceKeyId - pub userid_lastonetimekeyupdate: Arc, // LastOneTimeKeyUpdate = Count - pub keychangeid_userid: Arc, // KeyChangeId = UserId/RoomId + Count - pub keyid_key: Arc, // KeyId = UserId + KeyId (depends on key type) - pub userid_masterkeyid: Arc, - pub userid_selfsigningkeyid: Arc, - pub userid_usersigningkeyid: Arc, - - pub userfilterid_filter: Arc, // UserFilterId = UserId + FilterId - pub todeviceid_events: Arc, // ToDeviceId = UserId + DeviceId + Count - pub userid_presenceid: Arc, // UserId => Count - pub presenceid_presence: Arc, // Count + UserId => Presence - - //pub uiaa: uiaa::Uiaa, - pub userdevicesessionid_uiaainfo: Arc, // User-interactive authentication - pub userdevicesessionid_uiaarequest: RwLock>, - - //pub edus: RoomEdus, - pub readreceiptid_readreceipt: Arc, // ReadReceiptId = RoomId + Count + UserId - pub roomuserid_privateread: Arc, // RoomUserId = Room + User, PrivateRead = Count - pub roomuserid_lastprivatereadupdate: Arc, // LastPrivateReadUpdate = Count - - //pub rooms: rooms::Rooms, - pub pduid_pdu: Arc, // PduId = ShortRoomId + Count - pub eventid_pduid: Arc, - pub roomid_pduleaves: Arc, - pub alias_roomid: Arc, - pub alias_userid: Arc, // UserId = AliasId (User who created the alias) - pub aliasid_alias: Arc, // AliasId = RoomId + Count - pub publicroomids: Arc, - - pub threadid_userids: Arc, // ThreadId = RoomId + Count - - pub tokenids: Arc, // TokenId = ShortRoomId + Token + PduIdCount - - /// Participating servers in a room. - pub roomserverids: Arc, // RoomServerId = RoomId + ServerName - pub serverroomids: Arc, // ServerRoomId = ServerName + RoomId - - pub userroomid_joined: Arc, - pub roomuserid_joined: Arc, - pub roomid_joinedcount: Arc, - pub roomid_invitedcount: Arc, - pub roomuseroncejoinedids: Arc, - pub userroomid_invitestate: Arc, // InviteState = Vec> - pub roomuserid_invitecount: Arc, // InviteCount = Count - pub userroomid_leftstate: Arc, - pub roomuserid_leftcount: Arc, - - pub disabledroomids: Arc, // Rooms where incoming federation handling is disabled - - pub bannedroomids: Arc, // Rooms where local users are not allowed to join - - pub lazyloadedids: Arc, // LazyLoadedIds = UserId + DeviceId + RoomId + LazyLoadedUserId - - pub userroomid_notificationcount: Arc, // NotifyCount = u64 - pub userroomid_highlightcount: Arc, // HightlightCount = u64 - pub roomuserid_lastnotificationread: Arc, // LastNotificationRead = u64 - - /// Remember the current state hash of a room. - pub roomid_shortstatehash: Arc, - pub roomsynctoken_shortstatehash: Arc, - /// Remember the state hash at events in the past. - pub shorteventid_shortstatehash: Arc, - pub statekey_shortstatekey: Arc, /* StateKey = EventType + StateKey, ShortStateKey = - * Count */ - pub shortstatekey_statekey: Arc, - - pub roomid_shortroomid: Arc, - - pub shorteventid_eventid: Arc, - pub eventid_shorteventid: Arc, - - pub statehash_shortstatehash: Arc, - pub shortstatehash_statediff: Arc, /* StateDiff = parent (or 0) + - * (shortstatekey+shorteventid++) + 0_u64 + - * (shortstatekey+shorteventid--) */ - - pub shorteventid_authchain: Arc, - - /// RoomId + EventId -> outlier PDU. - /// Any pdu that has passed the steps 1-8 in the incoming event - /// /federation/send/txn. - pub eventid_outlierpdu: Arc, - pub softfailedeventids: Arc, - - /// ShortEventId + ShortEventId -> (). - pub tofrom_relation: Arc, - /// RoomId + EventId -> Parent PDU EventId. - pub referencedevents: Arc, - - //pub account_data: account_data::AccountData, - pub roomuserdataid_accountdata: Arc, // RoomUserDataId = Room + User + Count + Type - pub roomusertype_roomuserdataid: Arc, // RoomUserType = Room + User + Type - - //pub media: media::Media, - pub mediaid_file: Arc, // MediaId = MXC + WidthHeight + ContentDisposition + ContentType - pub url_previews: Arc, - pub mediaid_user: Arc, - //pub key_backups: key_backups::KeyBackups, - pub backupid_algorithm: Arc, // BackupId = UserId + Version(Count) - pub backupid_etag: Arc, // BackupId = UserId + Version(Count) - pub backupkeyid_backup: Arc, // BackupKeyId = UserId + Version + RoomId + SessionId - - //pub transaction_ids: transaction_ids::TransactionIds, - pub userdevicetxnid_response: Arc, /* Response can be empty (/sendToDevice) or the event id - * (/send) */ - //pub sending: sending::Sending, - pub servername_educount: Arc, // EduCount: Count of last EDU sync - pub servernameevent_data: Arc, /* ServernameEvent = (+ / $)SenderKey / ServerName / UserId + - * PduId / Id (for edus), Data = EDU content */ - pub servercurrentevent_data: Arc, /* ServerCurrentEvents = (+ / $)ServerName / UserId + PduId - * / Id (for edus), Data = EDU content */ - - //pub appservice: appservice::Appservice, - pub id_appserviceregistrations: Arc, - - //pub pusher: pusher::PushData, - pub senderkey_pusher: Arc, - - pub auth_chain_cache: Mutex, Arc<[u64]>>>, - pub appservice_in_room_cache: RwLock>>, - pub lasttimelinecount_cache: Mutex>, -} - -impl KeyValueDatabase { - /// Load an existing database or create a new one. - #[allow(clippy::too_many_lines)] - pub async fn load_or_create(server: &Arc) -> Result { - let config = &server.config; - check_db_setup(config)?; - let builder = build(config)?; - Ok(Self { - db: builder.clone(), - userid_password: builder.open_tree("userid_password")?, - userid_displayname: builder.open_tree("userid_displayname")?, - userid_avatarurl: builder.open_tree("userid_avatarurl")?, - userid_blurhash: builder.open_tree("userid_blurhash")?, - userdeviceid_token: builder.open_tree("userdeviceid_token")?, - userdeviceid_metadata: builder.open_tree("userdeviceid_metadata")?, - userid_devicelistversion: builder.open_tree("userid_devicelistversion")?, - token_userdeviceid: builder.open_tree("token_userdeviceid")?, - onetimekeyid_onetimekeys: builder.open_tree("onetimekeyid_onetimekeys")?, - userid_lastonetimekeyupdate: builder.open_tree("userid_lastonetimekeyupdate")?, - keychangeid_userid: builder.open_tree("keychangeid_userid")?, - keyid_key: builder.open_tree("keyid_key")?, - userid_masterkeyid: builder.open_tree("userid_masterkeyid")?, - userid_selfsigningkeyid: builder.open_tree("userid_selfsigningkeyid")?, - userid_usersigningkeyid: builder.open_tree("userid_usersigningkeyid")?, - userfilterid_filter: builder.open_tree("userfilterid_filter")?, - todeviceid_events: builder.open_tree("todeviceid_events")?, - userid_presenceid: builder.open_tree("userid_presenceid")?, - presenceid_presence: builder.open_tree("presenceid_presence")?, - - userdevicesessionid_uiaainfo: builder.open_tree("userdevicesessionid_uiaainfo")?, - userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()), - readreceiptid_readreceipt: builder.open_tree("readreceiptid_readreceipt")?, - roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt - roomuserid_lastprivatereadupdate: builder.open_tree("roomuserid_lastprivatereadupdate")?, - pduid_pdu: builder.open_tree("pduid_pdu")?, - eventid_pduid: builder.open_tree("eventid_pduid")?, - roomid_pduleaves: builder.open_tree("roomid_pduleaves")?, - - alias_roomid: builder.open_tree("alias_roomid")?, - alias_userid: builder.open_tree("alias_userid")?, - aliasid_alias: builder.open_tree("aliasid_alias")?, - publicroomids: builder.open_tree("publicroomids")?, - - threadid_userids: builder.open_tree("threadid_userids")?, - - tokenids: builder.open_tree("tokenids")?, - - roomserverids: builder.open_tree("roomserverids")?, - serverroomids: builder.open_tree("serverroomids")?, - userroomid_joined: builder.open_tree("userroomid_joined")?, - roomuserid_joined: builder.open_tree("roomuserid_joined")?, - roomid_joinedcount: builder.open_tree("roomid_joinedcount")?, - roomid_invitedcount: builder.open_tree("roomid_invitedcount")?, - roomuseroncejoinedids: builder.open_tree("roomuseroncejoinedids")?, - userroomid_invitestate: builder.open_tree("userroomid_invitestate")?, - roomuserid_invitecount: builder.open_tree("roomuserid_invitecount")?, - userroomid_leftstate: builder.open_tree("userroomid_leftstate")?, - roomuserid_leftcount: builder.open_tree("roomuserid_leftcount")?, - - disabledroomids: builder.open_tree("disabledroomids")?, - - bannedroomids: builder.open_tree("bannedroomids")?, - - lazyloadedids: builder.open_tree("lazyloadedids")?, - - userroomid_notificationcount: builder.open_tree("userroomid_notificationcount")?, - userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?, - roomuserid_lastnotificationread: builder.open_tree("userroomid_highlightcount")?, - - statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?, - shortstatekey_statekey: builder.open_tree("shortstatekey_statekey")?, - - shorteventid_authchain: builder.open_tree("shorteventid_authchain")?, - - roomid_shortroomid: builder.open_tree("roomid_shortroomid")?, - - shortstatehash_statediff: builder.open_tree("shortstatehash_statediff")?, - eventid_shorteventid: builder.open_tree("eventid_shorteventid")?, - shorteventid_eventid: builder.open_tree("shorteventid_eventid")?, - shorteventid_shortstatehash: builder.open_tree("shorteventid_shortstatehash")?, - roomid_shortstatehash: builder.open_tree("roomid_shortstatehash")?, - roomsynctoken_shortstatehash: builder.open_tree("roomsynctoken_shortstatehash")?, - statehash_shortstatehash: builder.open_tree("statehash_shortstatehash")?, - - eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?, - softfailedeventids: builder.open_tree("softfailedeventids")?, - - tofrom_relation: builder.open_tree("tofrom_relation")?, - referencedevents: builder.open_tree("referencedevents")?, - roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, - roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?, - mediaid_file: builder.open_tree("mediaid_file")?, - url_previews: builder.open_tree("url_previews")?, - mediaid_user: builder.open_tree("mediaid_user")?, - backupid_algorithm: builder.open_tree("backupid_algorithm")?, - backupid_etag: builder.open_tree("backupid_etag")?, - backupkeyid_backup: builder.open_tree("backupkeyid_backup")?, - userdevicetxnid_response: builder.open_tree("userdevicetxnid_response")?, - servername_educount: builder.open_tree("servername_educount")?, - servernameevent_data: builder.open_tree("servernameevent_data")?, - servercurrentevent_data: builder.open_tree("servercurrentevent_data")?, - id_appserviceregistrations: builder.open_tree("id_appserviceregistrations")?, - senderkey_pusher: builder.open_tree("senderkey_pusher")?, - global: builder.open_tree("global")?, - server_signingkeys: builder.open_tree("server_signingkeys")?, - - roomid_inviteviaservers: builder.open_tree("roomid_inviteviaservers")?, - - auth_chain_cache: Mutex::new(LruCache::new( - (f64::from(config.auth_chain_cache_capacity) * config.conduit_cache_capacity_modifier) as usize, - )), - appservice_in_room_cache: RwLock::new(HashMap::new()), - lasttimelinecount_cache: Mutex::new(HashMap::new()), - }) - } -} - -fn build(config: &Config) -> Result> { - match &*config.database_backend { - "rocksdb" => { - debug!("Got rocksdb database backend"); - #[cfg(not(feature = "rocksdb"))] - return Err(Error::bad_config("Database backend not found.")); - #[cfg(feature = "rocksdb")] - Ok(Arc::new(Arc::::open(config)?)) - }, - _ => Err(Error::bad_config( - "Database backend not found. rocksdb is the only supported backend.", - )), - } -} - -fn check_db_setup(config: &Config) -> Result<()> { - let path = Path::new(&config.database_path); - - let rocksdb_exists = path.join("IDENTITY").exists(); - - if rocksdb_exists && config.database_backend != "rocksdb" { - return Err(Error::bad_config( - "Found rocksdb at database_path, but is not specified in config.", - )); - } - - Ok(()) -} diff --git a/src/database/kvengine.rs b/src/database/kvengine.rs deleted file mode 100644 index 1b27c571..00000000 --- a/src/database/kvengine.rs +++ /dev/null @@ -1,38 +0,0 @@ -use std::{error::Error, sync::Arc}; - -use super::{Config, KvTree}; -use crate::Result; - -pub trait KeyValueDatabaseEngine: Send + Sync { - fn open(config: &Config) -> Result - where - Self: Sized; - - fn open_tree(&self, name: &'static str) -> Result>; - - fn flush(&self) -> Result<()>; - - #[allow(dead_code)] - fn sync(&self) -> Result<()> { Ok(()) } - - fn cork(&self) -> Result<()> { Ok(()) } - - fn uncork(&self) -> Result<()> { Ok(()) } - - fn corked(&self) -> bool { false } - - fn cleanup(&self) -> Result<()> { Ok(()) } - - fn memory_usage(&self) -> Result { - Ok("Current database engine does not support memory usage reporting.".to_owned()) - } - - #[allow(dead_code)] - fn clear_caches(&self) {} - - fn backup(&self) -> Result<(), Box> { unimplemented!() } - - fn backup_list(&self) -> Result { Ok(String::new()) } - - fn file_list(&self) -> Result { Ok(String::new()) } -} diff --git a/src/database/kvtree.rs b/src/database/kvtree.rs deleted file mode 100644 index 009e45d5..00000000 --- a/src/database/kvtree.rs +++ /dev/null @@ -1,62 +0,0 @@ -use std::{future::Future, pin::Pin}; - -use crate::Result; - -pub trait KvTree: Send + Sync { - fn get(&self, key: &[u8]) -> Result>>; - - #[allow(dead_code)] - fn multi_get(&self, keys: &[&[u8]]) -> Result>>> { - let mut ret: Vec>> = Vec::with_capacity(keys.len()); - for key in keys { - ret.push(self.get(key)?); - } - - Ok(ret) - } - - fn insert(&self, key: &[u8], value: &[u8]) -> Result<()>; - fn insert_batch(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { - for (key, value) in iter { - self.insert(&key, &value)?; - } - - Ok(()) - } - - fn remove(&self, key: &[u8]) -> Result<()>; - - #[allow(dead_code)] - fn remove_batch(&self, iter: &mut dyn Iterator>) -> Result<()> { - for key in iter { - self.remove(&key)?; - } - - Ok(()) - } - - fn iter<'a>(&'a self) -> Box, Vec)> + 'a>; - - fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box, Vec)> + 'a>; - - fn increment(&self, key: &[u8]) -> Result>; - fn increment_batch(&self, iter: &mut dyn Iterator>) -> Result<()> { - for key in iter { - self.increment(&key)?; - } - - Ok(()) - } - - fn scan_prefix<'a>(&'a self, prefix: Vec) -> Box, Vec)> + 'a>; - - fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>>; - - fn clear(&self) -> Result<()> { - for (key, _) in self.iter() { - self.remove(&key)?; - } - - Ok(()) - } -} diff --git a/src/database/map.rs b/src/database/map.rs new file mode 100644 index 00000000..591de04b --- /dev/null +++ b/src/database/map.rs @@ -0,0 +1,242 @@ +use std::{future::Future, pin::Pin, sync::Arc}; + +use conduit::{utils, Result}; +use rocksdb::{BoundColumnFamily, Direction, IteratorMode, ReadOptions, WriteBatchWithTransaction, WriteOptions}; + +use super::{or_else, result, watchers::Watchers, Engine}; + +pub struct Map { + db: Arc, + name: String, + watchers: Watchers, +} + +type Key = Vec; +type Val = Vec; +type KeyVal = (Key, Val); + +impl Map { + pub(crate) fn open(db: &Arc, name: &str) -> Result> { + db.open_cf(name)?; + Ok(Arc::new(Self { + db: db.clone(), + name: name.to_owned(), + watchers: Watchers::default(), + })) + } + + pub fn get(&self, key: &[u8]) -> Result>> { + let mut readoptions = ReadOptions::default(); + readoptions.set_total_order_seek(true); + + result(self.db.db.get_cf_opt(&self.cf(), key, &readoptions)) + } + + pub fn multi_get(&self, keys: &[&[u8]]) -> Result>>> { + // Optimization can be `true` if key vector is pre-sorted **by the column + // comparator**. + const SORTED: bool = false; + + let mut readoptions = ReadOptions::default(); + readoptions.set_total_order_seek(true); + + let mut ret: Vec>> = Vec::with_capacity(keys.len()); + for res in self + .db + .db + .batched_multi_get_cf_opt(&self.cf(), keys, SORTED, &readoptions) + { + match res { + Ok(Some(res)) => ret.push(Some((*res).to_vec())), + Ok(None) => ret.push(None), + Err(e) => return or_else(e), + } + } + + Ok(ret) + } + + pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + let writeoptions = WriteOptions::default(); + + self.db + .db + .put_cf_opt(&self.cf(), key, value, &writeoptions) + .or_else(or_else)?; + + if !self.db.corked() { + self.db.flush()?; + } + + self.watchers.wake(key); + + Ok(()) + } + + pub fn insert_batch(&self, iter: &mut dyn Iterator) -> Result<()> { + let writeoptions = WriteOptions::default(); + + let mut batch = WriteBatchWithTransaction::::default(); + + for (key, value) in iter { + batch.put_cf(&self.cf(), key, value); + } + + let res = self.db.db.write_opt(batch, &writeoptions); + + if !self.db.corked() { + self.db.flush()?; + } + + result(res) + } + + pub fn remove(&self, key: &[u8]) -> Result<()> { + let writeoptions = WriteOptions::default(); + + let res = self.db.db.delete_cf_opt(&self.cf(), key, &writeoptions); + + if !self.db.corked() { + self.db.flush()?; + } + + result(res) + } + + pub fn remove_batch(&self, iter: &mut dyn Iterator) -> Result<()> { + let writeoptions = WriteOptions::default(); + + let mut batch = WriteBatchWithTransaction::::default(); + + for key in iter { + batch.delete_cf(&self.cf(), key); + } + + let res = self.db.db.write_opt(batch, &writeoptions); + + if !self.db.corked() { + self.db.flush()?; + } + + result(res) + } + + pub fn iter<'a>(&'a self) -> Box + 'a> { + let mut readoptions = ReadOptions::default(); + readoptions.set_total_order_seek(true); + + let it = self + .db + .db + .iterator_cf_opt(&self.cf(), readoptions, IteratorMode::Start) + .map(Result::unwrap) + .map(|(k, v)| (Vec::from(k), Vec::from(v))); + + Box::new(it) + } + + pub fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box + 'a> { + let mut readoptions = ReadOptions::default(); + readoptions.set_total_order_seek(true); + + let it = self + .db + .db + .iterator_cf_opt( + &self.cf(), + readoptions, + IteratorMode::From( + from, + if backwards { + Direction::Reverse + } else { + Direction::Forward + }, + ), + ) + .map(Result::unwrap) + .map(|(k, v)| (Vec::from(k), Vec::from(v))); + + Box::new(it) + } + + pub fn increment(&self, key: &[u8]) -> Result> { + let mut readoptions = ReadOptions::default(); + readoptions.set_total_order_seek(true); + let writeoptions = WriteOptions::default(); + + let old = self + .db + .db + .get_cf_opt(&self.cf(), key, &readoptions) + .or_else(or_else)?; + let new = utils::increment(old.as_deref()); + self.db + .db + .put_cf_opt(&self.cf(), key, new, &writeoptions) + .or_else(or_else)?; + + if !self.db.corked() { + self.db.flush()?; + } + + Ok(new.to_vec()) + } + + pub fn increment_batch(&self, iter: &mut dyn Iterator) -> Result<()> { + let mut readoptions = ReadOptions::default(); + readoptions.set_total_order_seek(true); + let writeoptions = WriteOptions::default(); + + let mut batch = WriteBatchWithTransaction::::default(); + + for key in iter { + let old = self + .db + .db + .get_cf_opt(&self.cf(), &key, &readoptions) + .or_else(or_else)?; + let new = utils::increment(old.as_deref()); + batch.put_cf(&self.cf(), key, new); + } + + self.db + .db + .write_opt(batch, &writeoptions) + .or_else(or_else)?; + + if !self.db.corked() { + self.db.flush()?; + } + + Ok(()) + } + + pub fn scan_prefix<'a>(&'a self, prefix: Vec) -> Box + 'a> { + let mut readoptions = ReadOptions::default(); + readoptions.set_total_order_seek(true); + + let it = self + .db + .db + .iterator_cf_opt(&self.cf(), readoptions, IteratorMode::From(&prefix, Direction::Forward)) + .map(Result::unwrap) + .map(|(k, v)| (Vec::from(k), Vec::from(v))) + .take_while(move |(k, _)| k.starts_with(&prefix)); + + Box::new(it) + } + + pub fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + self.watchers.watch(prefix) + } + + fn cf(&self) -> Arc> { self.db.cf(&self.name) } +} + +impl<'a> IntoIterator for &'a Map { + type IntoIter = Box + 'a>; + type Item = KeyVal; + + fn into_iter(self) -> Self::IntoIter { self.iter() } +} diff --git a/src/database/maps.rs b/src/database/maps.rs new file mode 100644 index 00000000..94e8ba99 --- /dev/null +++ b/src/database/maps.rs @@ -0,0 +1,99 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use conduit::Result; + +use crate::{Engine, Map}; + +pub type Maps = BTreeMap>; + +pub(crate) fn open(db: &Arc) -> Result { open_list(db, MAPS) } + +pub(crate) fn open_list(db: &Arc, maps: &[&str]) -> Result { + Ok(maps + .iter() + .map(|&name| (name.to_owned(), Map::open(db, name).expect("valid column opened"))) + .collect::()) +} + +pub const MAPS: &[&str] = &[ + "alias_roomid", + "alias_userid", + "aliasid_alias", + "backupid_algorithm", + "backupid_etag", + "backupkeyid_backup", + "bannedroomids", + "disabledroomids", + "eventid_outlierpdu", + "eventid_pduid", + "eventid_shorteventid", + "global", + "id_appserviceregistrations", + "keychangeid_userid", + "keyid_key", + "lazyloadedids", + "mediaid_file", + "mediaid_user", + "onetimekeyid_onetimekeys", + "pduid_pdu", + "presenceid_presence", + "publicroomids", + "readreceiptid_readreceipt", + "referencedevents", + "roomid_invitedcount", + "roomid_inviteviaservers", + "roomid_joinedcount", + "roomid_pduleaves", + "roomid_shortroomid", + "roomid_shortstatehash", + "roomserverids", + "roomsynctoken_shortstatehash", + "roomuserdataid_accountdata", + "roomuserid_invitecount", + "roomuserid_joined", + "roomuserid_lastprivatereadupdate", + "roomuserid_leftcount", + "roomuserid_privateread", + "roomuseroncejoinedids", + "roomusertype_roomuserdataid", + "senderkey_pusher", + "server_signingkeys", + "servercurrentevent_data", + "servername_educount", + "servernameevent_data", + "serverroomids", + "shorteventid_authchain", + "shorteventid_eventid", + "shorteventid_shortstatehash", + "shortstatehash_statediff", + "shortstatekey_statekey", + "softfailedeventids", + "statehash_shortstatehash", + "statekey_shortstatekey", + "threadid_userids", + "todeviceid_events", + "tofrom_relation", + "token_userdeviceid", + "tokenids", + "url_previews", + "userdeviceid_metadata", + "userdeviceid_token", + "userdevicesessionid_uiaainfo", + "userdevicetxnid_response", + "userfilterid_filter", + "userid_avatarurl", + "userid_blurhash", + "userid_devicelistversion", + "userid_displayname", + "userid_lastonetimekeyupdate", + "userid_masterkeyid", + "userid_password", + "userid_presenceid", + "userid_selfsigningkeyid", + "userid_usersigningkeyid", + "userroomid_highlightcount", + "userroomid_invitestate", + "userroomid_joined", + "userroomid_leftstate", + "userroomid_notificationcount", +]; diff --git a/src/database/mod.rs b/src/database/mod.rs index 48366e25..8b65e230 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,20 +1,20 @@ pub mod cork; -mod kvdatabase; -mod kvengine; -mod kvtree; - -#[cfg(feature = "rocksdb")] -pub(crate) mod rocksdb; - -#[cfg(feature = "rocksdb")] -pub(crate) mod watchers; +mod database; +mod engine; +mod map; +pub mod maps; +mod opts; +mod util; +mod watchers; extern crate conduit_core as conduit; -pub(crate) use conduit::{Config, Result}; +extern crate rust_rocksdb as rocksdb; + pub use cork::Cork; -pub use kvdatabase::KeyValueDatabase; -pub use kvengine::KeyValueDatabaseEngine; -pub use kvtree::KvTree; +pub use database::Database; +pub(crate) use engine::Engine; +pub use map::Map; +pub(crate) use util::{or_else, result}; conduit::mod_ctor! {} conduit::mod_dtor! {} diff --git a/src/database/rocksdb/opts.rs b/src/database/opts.rs similarity index 96% rename from src/database/rocksdb/opts.rs rename to src/database/opts.rs index 06c90a77..4f9dece5 100644 --- a/src/database/rocksdb/opts.rs +++ b/src/database/opts.rs @@ -1,14 +1,9 @@ -#![allow(dead_code)] use std::{cmp, collections::HashMap}; -use conduit::utils; - -use super::{ - rust_rocksdb::{ - BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, DBRecoveryMode, Env, LogLevel, Options, - UniversalCompactOptions, UniversalCompactionStopStyle, - }, - Config, +use conduit::{utils, Config}; +use rocksdb::{ + BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, DBRecoveryMode, Env, LogLevel, Options, + UniversalCompactOptions, UniversalCompactionStopStyle, }; /// Create database-wide options suitable for opening the database. This also @@ -150,7 +145,7 @@ pub(crate) fn cf_options(cfg: &Config, name: &str, mut opts: Options, cache: &mu ), #[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)] - "pduid_pdu" => set_table_with_new_cache( + "eventid_outlierpdu" => set_table_with_new_cache( &mut opts, cfg, cache, @@ -158,7 +153,7 @@ pub(crate) fn cf_options(cfg: &Config, name: &str, mut opts: Options, cache: &mu (cfg.pdu_cache_capacity as usize).saturating_mul(1536), ), - "eventid_outlierpdu" => set_table_with_shared_cache(&mut opts, cfg, cache, name, "pduid_pdu"), + "pduid_pdu" => set_table_with_shared_cache(&mut opts, cfg, cache, name, "eventid_outlierpdu"), &_ => {}, } @@ -210,6 +205,7 @@ fn set_compression_defaults(opts: &mut Options, config: &Config) { opts.set_compression_type(rocksdb_compression_algo); } +#[allow(dead_code)] fn set_for_random_small_uc(opts: &mut Options, config: &Config) { let uco = uc_options(config); set_for_random_small(opts, config); @@ -224,6 +220,7 @@ fn set_for_sequential_small_uc(opts: &mut Options, config: &Config) { opts.set_compaction_style(DBCompactionStyle::Universal); } +#[allow(dead_code)] fn set_for_random_small(opts: &mut Options, config: &Config) { set_for_random(opts, config); diff --git a/src/database/rocksdb/kvtree.rs b/src/database/rocksdb/kvtree.rs deleted file mode 100644 index 949db84e..00000000 --- a/src/database/rocksdb/kvtree.rs +++ /dev/null @@ -1,227 +0,0 @@ -use std::{future::Future, pin::Pin, sync::Arc}; - -use conduit::{utils, Result}; - -use super::{ - or_else, result, rust_rocksdb::WriteBatchWithTransaction, watchers::Watchers, Engine, KeyValueDatabaseEngine, - KvTree, -}; - -pub(crate) struct RocksDbEngineTree<'a> { - pub(crate) db: Arc, - pub(crate) name: &'a str, - pub(crate) watchers: Watchers, -} - -impl RocksDbEngineTree<'_> { - fn cf(&self) -> Arc> { self.db.rocks.cf_handle(self.name).unwrap() } -} - -impl KvTree for RocksDbEngineTree<'_> { - fn get(&self, key: &[u8]) -> Result>> { - let mut readoptions = rust_rocksdb::ReadOptions::default(); - readoptions.set_total_order_seek(true); - - result(self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)) - } - - fn multi_get(&self, keys: &[&[u8]]) -> Result>>> { - // Optimization can be `true` if key vector is pre-sorted **by the column - // comparator**. - const SORTED: bool = false; - - let mut readoptions = rust_rocksdb::ReadOptions::default(); - readoptions.set_total_order_seek(true); - - let mut ret: Vec>> = Vec::with_capacity(keys.len()); - for res in self - .db - .rocks - .batched_multi_get_cf_opt(&self.cf(), keys, SORTED, &readoptions) - { - match res { - Ok(Some(res)) => ret.push(Some((*res).to_vec())), - Ok(None) => ret.push(None), - Err(e) => return or_else(e), - } - } - - Ok(ret) - } - - fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { - let writeoptions = rust_rocksdb::WriteOptions::default(); - - self.db - .rocks - .put_cf_opt(&self.cf(), key, value, &writeoptions) - .or_else(or_else)?; - - if !self.db.corked() { - self.db.flush()?; - } - - self.watchers.wake(key); - - Ok(()) - } - - fn insert_batch(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { - let writeoptions = rust_rocksdb::WriteOptions::default(); - - let mut batch = WriteBatchWithTransaction::::default(); - - for (key, value) in iter { - batch.put_cf(&self.cf(), key, value); - } - - let res = self.db.rocks.write_opt(batch, &writeoptions); - - if !self.db.corked() { - self.db.flush()?; - } - - result(res) - } - - fn remove(&self, key: &[u8]) -> Result<()> { - let writeoptions = rust_rocksdb::WriteOptions::default(); - - let res = self.db.rocks.delete_cf_opt(&self.cf(), key, &writeoptions); - - if !self.db.corked() { - self.db.flush()?; - } - - result(res) - } - - fn remove_batch(&self, iter: &mut dyn Iterator>) -> Result<()> { - let writeoptions = rust_rocksdb::WriteOptions::default(); - - let mut batch = WriteBatchWithTransaction::::default(); - - for key in iter { - batch.delete_cf(&self.cf(), key); - } - - let res = self.db.rocks.write_opt(batch, &writeoptions); - - if !self.db.corked() { - self.db.flush()?; - } - - result(res) - } - - fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { - let mut readoptions = rust_rocksdb::ReadOptions::default(); - readoptions.set_total_order_seek(true); - - Box::new( - self.db - .rocks - .iterator_cf_opt(&self.cf(), readoptions, rust_rocksdb::IteratorMode::Start) - .map(Result::unwrap) - .map(|(k, v)| (Vec::from(k), Vec::from(v))), - ) - } - - fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box, Vec)> + 'a> { - let mut readoptions = rust_rocksdb::ReadOptions::default(); - readoptions.set_total_order_seek(true); - - Box::new( - self.db - .rocks - .iterator_cf_opt( - &self.cf(), - readoptions, - rust_rocksdb::IteratorMode::From( - from, - if backwards { - rust_rocksdb::Direction::Reverse - } else { - rust_rocksdb::Direction::Forward - }, - ), - ) - .map(Result::unwrap) - .map(|(k, v)| (Vec::from(k), Vec::from(v))), - ) - } - - fn increment(&self, key: &[u8]) -> Result> { - let mut readoptions = rust_rocksdb::ReadOptions::default(); - readoptions.set_total_order_seek(true); - let writeoptions = rust_rocksdb::WriteOptions::default(); - - let old = self - .db - .rocks - .get_cf_opt(&self.cf(), key, &readoptions) - .or_else(or_else)?; - let new = utils::increment(old.as_deref()); - self.db - .rocks - .put_cf_opt(&self.cf(), key, new, &writeoptions) - .or_else(or_else)?; - - if !self.db.corked() { - self.db.flush()?; - } - - Ok(new.to_vec()) - } - - fn increment_batch(&self, iter: &mut dyn Iterator>) -> Result<()> { - let mut readoptions = rust_rocksdb::ReadOptions::default(); - readoptions.set_total_order_seek(true); - let writeoptions = rust_rocksdb::WriteOptions::default(); - - let mut batch = WriteBatchWithTransaction::::default(); - - for key in iter { - let old = self - .db - .rocks - .get_cf_opt(&self.cf(), &key, &readoptions) - .or_else(or_else)?; - let new = utils::increment(old.as_deref()); - batch.put_cf(&self.cf(), key, new); - } - - self.db - .rocks - .write_opt(batch, &writeoptions) - .or_else(or_else)?; - - if !self.db.corked() { - self.db.flush()?; - } - - Ok(()) - } - - fn scan_prefix<'a>(&'a self, prefix: Vec) -> Box, Vec)> + 'a> { - let mut readoptions = rust_rocksdb::ReadOptions::default(); - readoptions.set_total_order_seek(true); - - Box::new( - self.db - .rocks - .iterator_cf_opt( - &self.cf(), - readoptions, - rust_rocksdb::IteratorMode::From(&prefix, rust_rocksdb::Direction::Forward), - ) - .map(Result::unwrap) - .map(|(k, v)| (Vec::from(k), Vec::from(v))) - .take_while(move |(k, _)| k.starts_with(&prefix)), - ) - } - - fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - self.watchers.watch(prefix) - } -} diff --git a/src/database/util.rs b/src/database/util.rs new file mode 100644 index 00000000..513cedc8 --- /dev/null +++ b/src/database/util.rs @@ -0,0 +1,13 @@ +use conduit::Result; + +#[inline] +pub(crate) fn result(r: std::result::Result) -> Result { + r.map_or_else(or_else, and_then) +} + +#[inline(always)] +pub(crate) fn and_then(t: T) -> Result { Ok(t) } + +pub(crate) fn or_else(e: rocksdb::Error) -> Result { Err(map_err(e)) } + +pub(crate) fn map_err(e: rocksdb::Error) -> conduit::Error { conduit::Error::Database(e.into_string()) }