mirror of
https://github.com/girlbossceo/conduwuit.git
synced 2024-11-29 18:36:21 +00:00
simplify cork interface related
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
a1ced0a56f
commit
6dd6e4bfaf
|
@ -138,7 +138,7 @@ pub(crate) async fn sync_events_route(
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
// Coalesce database writes for the remainder of this scope.
|
// Coalesce database writes for the remainder of this scope.
|
||||||
let _cork = services().globals.db.cork_and_flush();
|
let _cork = services().db.cork_and_flush();
|
||||||
|
|
||||||
for room_id in all_joined_rooms {
|
for room_id in all_joined_rooms {
|
||||||
let room_id = room_id?;
|
let room_id = room_id?;
|
||||||
|
|
|
@ -9,7 +9,7 @@ pub struct Cork {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Cork {
|
impl Cork {
|
||||||
pub fn new(db: &Arc<Engine>, flush: bool, sync: bool) -> Self {
|
pub(super) fn new(db: &Arc<Engine>, flush: bool, sync: bool) -> Self {
|
||||||
db.cork();
|
db.cork();
|
||||||
Self {
|
Self {
|
||||||
db: db.clone(),
|
db: db.clone(),
|
||||||
|
|
|
@ -8,7 +8,7 @@ use conduit::{PduCount, Result, Server};
|
||||||
use lru_cache::LruCache;
|
use lru_cache::LruCache;
|
||||||
use ruma::{CanonicalJsonValue, OwnedDeviceId, OwnedRoomId, OwnedUserId};
|
use ruma::{CanonicalJsonValue, OwnedDeviceId, OwnedRoomId, OwnedUserId};
|
||||||
|
|
||||||
use crate::{maps, maps::Maps, Engine, Map};
|
use crate::{cork::Cork, maps, maps::Maps, Engine, Map};
|
||||||
|
|
||||||
pub struct Database {
|
pub struct Database {
|
||||||
pub db: Arc<Engine>,
|
pub db: Arc<Engine>,
|
||||||
|
@ -38,6 +38,15 @@ impl Database {
|
||||||
)),
|
)),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn cork(&self) -> Cork { Cork::new(&self.db, false, false) }
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn cork_and_flush(&self) -> Cork { Cork::new(&self.db, true, false) }
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn cork_and_sync(&self) -> Cork { Cork::new(&self.db, true, true) }
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Index<&str> for Database {
|
impl Index<&str> for Database {
|
||||||
|
|
|
@ -122,7 +122,7 @@ impl Engine {
|
||||||
|
|
||||||
pub fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, true)) }
|
pub fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, true)) }
|
||||||
|
|
||||||
pub(crate) fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 }
|
pub fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 }
|
||||||
|
|
||||||
pub(crate) fn cork(&self) {
|
pub(crate) fn cork(&self) {
|
||||||
self.corks
|
self.corks
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
pub mod cork;
|
mod cork;
|
||||||
mod database;
|
mod database;
|
||||||
mod engine;
|
mod engine;
|
||||||
mod map;
|
mod map;
|
||||||
|
@ -10,7 +10,6 @@ mod watchers;
|
||||||
extern crate conduit_core as conduit;
|
extern crate conduit_core as conduit;
|
||||||
extern crate rust_rocksdb as rocksdb;
|
extern crate rust_rocksdb as rocksdb;
|
||||||
|
|
||||||
pub use cork::Cork;
|
|
||||||
pub use database::Database;
|
pub use database::Database;
|
||||||
pub(crate) use engine::Engine;
|
pub(crate) use engine::Engine;
|
||||||
pub use map::Map;
|
pub use map::Map;
|
||||||
|
|
|
@ -4,7 +4,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::{trace, utils, Error, Result};
|
use conduit::{trace, utils, Error, Result};
|
||||||
use database::{Cork, Database, Map};
|
use database::{Database, Map};
|
||||||
use futures_util::{stream::FuturesUnordered, StreamExt};
|
use futures_util::{stream::FuturesUnordered, StreamExt};
|
||||||
use lru_cache::LruCache;
|
use lru_cache::LruCache;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
|
@ -185,10 +185,6 @@ impl Data {
|
||||||
|
|
||||||
pub fn cleanup(&self) -> Result<()> { self.db.db.cleanup() }
|
pub fn cleanup(&self) -> Result<()> { self.db.db.cleanup() }
|
||||||
|
|
||||||
pub fn cork(&self) -> Cork { Cork::new(&self.db.db, false, false) }
|
|
||||||
|
|
||||||
pub fn cork_and_flush(&self) -> Cork { Cork::new(&self.db.db, true, false) }
|
|
||||||
|
|
||||||
pub fn memory_usage(&self) -> String {
|
pub fn memory_usage(&self) -> String {
|
||||||
let auth_chain_cache = self.db.auth_chain_cache.lock().unwrap().len();
|
let auth_chain_cache = self.db.auth_chain_cache.lock().unwrap().len();
|
||||||
let appservice_in_room_cache = self.db.appservice_in_room_cache.read().unwrap().len();
|
let appservice_in_room_cache = self.db.appservice_in_room_cache.read().unwrap().len();
|
||||||
|
|
|
@ -818,7 +818,7 @@ async fn handle_media_check(
|
||||||
async fn fix_bad_double_separator_in_state_cache(db: &Arc<Database>, _config: &Config) -> Result<()> {
|
async fn fix_bad_double_separator_in_state_cache(db: &Arc<Database>, _config: &Config) -> Result<()> {
|
||||||
warn!("Fixing bad double separator in state_cache roomuserid_joined");
|
warn!("Fixing bad double separator in state_cache roomuserid_joined");
|
||||||
let roomuserid_joined = &db["roomuserid_joined"];
|
let roomuserid_joined = &db["roomuserid_joined"];
|
||||||
let _cork = database::Cork::new(&db.db, true, true);
|
let _cork = db.cork_and_sync();
|
||||||
|
|
||||||
let mut iter_count: usize = 0;
|
let mut iter_count: usize = 0;
|
||||||
for (mut key, value) in roomuserid_joined.iter() {
|
for (mut key, value) in roomuserid_joined.iter() {
|
||||||
|
@ -851,7 +851,7 @@ async fn fix_bad_double_separator_in_state_cache(db: &Arc<Database>, _config: &C
|
||||||
|
|
||||||
async fn retroactively_fix_bad_data_from_roomuserid_joined(db: &Arc<Database>, _config: &Config) -> Result<()> {
|
async fn retroactively_fix_bad_data_from_roomuserid_joined(db: &Arc<Database>, _config: &Config) -> Result<()> {
|
||||||
warn!("Retroactively fixing bad data from broken roomuserid_joined");
|
warn!("Retroactively fixing bad data from broken roomuserid_joined");
|
||||||
let _cork = database::Cork::new(&db.db, true, true);
|
let _cork = db.cork_and_sync();
|
||||||
|
|
||||||
let room_ids = services()
|
let room_ids = services()
|
||||||
.rooms
|
.rooms
|
||||||
|
|
|
@ -207,7 +207,7 @@ impl Service {
|
||||||
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
|
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<Vec<u8>> {
|
) -> Result<Vec<u8>> {
|
||||||
// Coalesce database writes for the remainder of this scope.
|
// Coalesce database writes for the remainder of this scope.
|
||||||
let _cork = services().globals.db.cork_and_flush();
|
let _cork = services().db.cork_and_flush();
|
||||||
|
|
||||||
let shortroomid = services()
|
let shortroomid = services()
|
||||||
.rooms
|
.rooms
|
||||||
|
|
|
@ -85,7 +85,7 @@ impl Service {
|
||||||
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
|
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
|
||||||
let dest = Destination::Push(user.to_owned(), pushkey);
|
let dest = Destination::Push(user.to_owned(), pushkey);
|
||||||
let event = SendingEvent::Pdu(pdu_id.to_owned());
|
let event = SendingEvent::Pdu(pdu_id.to_owned());
|
||||||
let _cork = services().globals.db.cork();
|
let _cork = services().db.cork();
|
||||||
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
||||||
self.dispatch(Msg {
|
self.dispatch(Msg {
|
||||||
dest,
|
dest,
|
||||||
|
@ -98,7 +98,7 @@ impl Service {
|
||||||
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
|
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
|
||||||
let dest = Destination::Appservice(appservice_id);
|
let dest = Destination::Appservice(appservice_id);
|
||||||
let event = SendingEvent::Pdu(pdu_id);
|
let event = SendingEvent::Pdu(pdu_id);
|
||||||
let _cork = services().globals.db.cork();
|
let _cork = services().db.cork();
|
||||||
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
||||||
self.dispatch(Msg {
|
self.dispatch(Msg {
|
||||||
dest,
|
dest,
|
||||||
|
@ -125,7 +125,7 @@ impl Service {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|server| (Destination::Normal(server), SendingEvent::Pdu(pdu_id.to_owned())))
|
.map(|server| (Destination::Normal(server), SendingEvent::Pdu(pdu_id.to_owned())))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let _cork = services().globals.db.cork();
|
let _cork = services().db.cork();
|
||||||
let keys = self.db.queue_requests(
|
let keys = self.db.queue_requests(
|
||||||
&requests
|
&requests
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -147,7 +147,7 @@ impl Service {
|
||||||
pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> {
|
pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> {
|
||||||
let dest = Destination::Normal(server.to_owned());
|
let dest = Destination::Normal(server.to_owned());
|
||||||
let event = SendingEvent::Edu(serialized);
|
let event = SendingEvent::Edu(serialized);
|
||||||
let _cork = services().globals.db.cork();
|
let _cork = services().db.cork();
|
||||||
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
||||||
self.dispatch(Msg {
|
self.dispatch(Msg {
|
||||||
dest,
|
dest,
|
||||||
|
@ -174,7 +174,7 @@ impl Service {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|server| (Destination::Normal(server), SendingEvent::Edu(serialized.clone())))
|
.map(|server| (Destination::Normal(server), SendingEvent::Edu(serialized.clone())))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let _cork = services().globals.db.cork();
|
let _cork = services().db.cork();
|
||||||
let keys = self.db.queue_requests(
|
let keys = self.db.queue_requests(
|
||||||
&requests
|
&requests
|
||||||
.iter()
|
.iter()
|
||||||
|
|
|
@ -100,7 +100,7 @@ impl Service {
|
||||||
fn handle_response_ok(
|
fn handle_response_ok(
|
||||||
&self, dest: &Destination, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus,
|
&self, dest: &Destination, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus,
|
||||||
) {
|
) {
|
||||||
let _cork = services().globals.db.cork();
|
let _cork = services().db.cork();
|
||||||
self.db
|
self.db
|
||||||
.delete_all_active_requests_for(dest)
|
.delete_all_active_requests_for(dest)
|
||||||
.expect("all active requests deleted");
|
.expect("all active requests deleted");
|
||||||
|
@ -173,7 +173,7 @@ impl Service {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let _cork = services().globals.db.cork();
|
let _cork = services().db.cork();
|
||||||
let mut events = Vec::new();
|
let mut events = Vec::new();
|
||||||
|
|
||||||
// Must retry any previous transaction for this remote.
|
// Must retry any previous transaction for this remote.
|
||||||
|
@ -187,7 +187,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compose the next transaction
|
// Compose the next transaction
|
||||||
let _cork = services().globals.db.cork();
|
let _cork = services().db.cork();
|
||||||
if !new_events.is_empty() {
|
if !new_events.is_empty() {
|
||||||
self.db.mark_as_active(&new_events)?;
|
self.db.mark_as_active(&new_events)?;
|
||||||
for (e, _) in new_events {
|
for (e, _) in new_events {
|
||||||
|
|
Loading…
Reference in a new issue