diff --git a/src/api/client/sync.rs b/src/api/client/sync.rs index 738c94d7..2ea766a4 100644 --- a/src/api/client/sync.rs +++ b/src/api/client/sync.rs @@ -138,7 +138,7 @@ pub(crate) async fn sync_events_route( .collect::>(); // Coalesce database writes for the remainder of this scope. - let _cork = services().globals.db.cork_and_flush(); + let _cork = services().db.cork_and_flush(); for room_id in all_joined_rooms { let room_id = room_id?; diff --git a/src/database/cork.rs b/src/database/cork.rs index faa3a491..b7e1f60f 100644 --- a/src/database/cork.rs +++ b/src/database/cork.rs @@ -9,7 +9,7 @@ pub struct Cork { } impl Cork { - pub fn new(db: &Arc, flush: bool, sync: bool) -> Self { + pub(super) fn new(db: &Arc, flush: bool, sync: bool) -> Self { db.cork(); Self { db: db.clone(), diff --git a/src/database/database.rs b/src/database/database.rs index e995d987..833b0047 100644 --- a/src/database/database.rs +++ b/src/database/database.rs @@ -8,7 +8,7 @@ use conduit::{PduCount, Result, Server}; use lru_cache::LruCache; use ruma::{CanonicalJsonValue, OwnedDeviceId, OwnedRoomId, OwnedUserId}; -use crate::{maps, maps::Maps, Engine, Map}; +use crate::{cork::Cork, maps, maps::Maps, Engine, Map}; pub struct Database { pub db: Arc, @@ -38,6 +38,15 @@ impl Database { )), }) } + + #[must_use] + pub fn cork(&self) -> Cork { Cork::new(&self.db, false, false) } + + #[must_use] + pub fn cork_and_flush(&self) -> Cork { Cork::new(&self.db, true, false) } + + #[must_use] + pub fn cork_and_sync(&self) -> Cork { Cork::new(&self.db, true, true) } } impl Index<&str> for Database { diff --git a/src/database/engine.rs b/src/database/engine.rs index c6f9eaa5..7acd470f 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -122,7 +122,7 @@ impl Engine { pub fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, true)) } - pub(crate) fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 } + pub fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 } pub(crate) fn cork(&self) { self.corks diff --git a/src/database/mod.rs b/src/database/mod.rs index 8b65e230..e10f6cee 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,4 +1,4 @@ -pub mod cork; +mod cork; mod database; mod engine; mod map; @@ -10,7 +10,6 @@ mod watchers; extern crate conduit_core as conduit; extern crate rust_rocksdb as rocksdb; -pub use cork::Cork; pub use database::Database; pub(crate) use engine::Engine; pub use map::Map; diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index bf3eb68b..ccd50ee9 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -4,7 +4,7 @@ use std::{ }; use conduit::{trace, utils, Error, Result}; -use database::{Cork, Database, Map}; +use database::{Database, Map}; use futures_util::{stream::FuturesUnordered, StreamExt}; use lru_cache::LruCache; use ruma::{ @@ -185,10 +185,6 @@ impl Data { pub fn cleanup(&self) -> Result<()> { self.db.db.cleanup() } - pub fn cork(&self) -> Cork { Cork::new(&self.db.db, false, false) } - - pub fn cork_and_flush(&self) -> Cork { Cork::new(&self.db.db, true, false) } - pub fn memory_usage(&self) -> String { let auth_chain_cache = self.db.auth_chain_cache.lock().unwrap().len(); let appservice_in_room_cache = self.db.appservice_in_room_cache.read().unwrap().len(); diff --git a/src/service/globals/migrations.rs b/src/service/globals/migrations.rs index 0f52d5af..c05e8df8 100644 --- a/src/service/globals/migrations.rs +++ b/src/service/globals/migrations.rs @@ -818,7 +818,7 @@ async fn handle_media_check( async fn fix_bad_double_separator_in_state_cache(db: &Arc, _config: &Config) -> Result<()> { warn!("Fixing bad double separator in state_cache roomuserid_joined"); let roomuserid_joined = &db["roomuserid_joined"]; - let _cork = database::Cork::new(&db.db, true, true); + let _cork = db.cork_and_sync(); let mut iter_count: usize = 0; for (mut key, value) in roomuserid_joined.iter() { @@ -851,7 +851,7 @@ async fn fix_bad_double_separator_in_state_cache(db: &Arc, _config: &C async fn retroactively_fix_bad_data_from_roomuserid_joined(db: &Arc, _config: &Config) -> Result<()> { warn!("Retroactively fixing bad data from broken roomuserid_joined"); - let _cork = database::Cork::new(&db.db, true, true); + let _cork = db.cork_and_sync(); let room_ids = services() .rooms diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index d18c76ef..aba712fa 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -207,7 +207,7 @@ impl Service { state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result> { // Coalesce database writes for the remainder of this scope. - let _cork = services().globals.db.cork_and_flush(); + let _cork = services().db.cork_and_flush(); let shortroomid = services() .rooms diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 3894ded1..9bda6cad 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -85,7 +85,7 @@ impl Service { pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { let dest = Destination::Push(user.to_owned(), pushkey); let event = SendingEvent::Pdu(pdu_id.to_owned()); - let _cork = services().globals.db.cork(); + let _cork = services().db.cork(); let keys = self.db.queue_requests(&[(&dest, event.clone())])?; self.dispatch(Msg { dest, @@ -98,7 +98,7 @@ impl Service { pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec) -> Result<()> { let dest = Destination::Appservice(appservice_id); let event = SendingEvent::Pdu(pdu_id); - let _cork = services().globals.db.cork(); + let _cork = services().db.cork(); let keys = self.db.queue_requests(&[(&dest, event.clone())])?; self.dispatch(Msg { dest, @@ -125,7 +125,7 @@ impl Service { .into_iter() .map(|server| (Destination::Normal(server), SendingEvent::Pdu(pdu_id.to_owned()))) .collect::>(); - let _cork = services().globals.db.cork(); + let _cork = services().db.cork(); let keys = self.db.queue_requests( &requests .iter() @@ -147,7 +147,7 @@ impl Service { pub fn send_edu_server(&self, server: &ServerName, serialized: Vec) -> Result<()> { let dest = Destination::Normal(server.to_owned()); let event = SendingEvent::Edu(serialized); - let _cork = services().globals.db.cork(); + let _cork = services().db.cork(); let keys = self.db.queue_requests(&[(&dest, event.clone())])?; self.dispatch(Msg { dest, @@ -174,7 +174,7 @@ impl Service { .into_iter() .map(|server| (Destination::Normal(server), SendingEvent::Edu(serialized.clone()))) .collect::>(); - let _cork = services().globals.db.cork(); + let _cork = services().db.cork(); let keys = self.db.queue_requests( &requests .iter() diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index e1a14dfd..aa2865ec 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -100,7 +100,7 @@ impl Service { fn handle_response_ok( &self, dest: &Destination, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus, ) { - let _cork = services().globals.db.cork(); + let _cork = services().db.cork(); self.db .delete_all_active_requests_for(dest) .expect("all active requests deleted"); @@ -173,7 +173,7 @@ impl Service { return Ok(None); } - let _cork = services().globals.db.cork(); + let _cork = services().db.cork(); let mut events = Vec::new(); // Must retry any previous transaction for this remote. @@ -187,7 +187,7 @@ impl Service { } // Compose the next transaction - let _cork = services().globals.db.cork(); + let _cork = services().db.cork(); if !new_events.is_empty() { self.db.mark_as_active(&new_events)?; for (e, _) in new_events {