From 6e59135a7d776d7aa124e564aef8b31d4d164248 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 5 Jun 2024 21:57:10 +0000 Subject: [PATCH] eliminate RotationHandler Signed-off-by: Jason Volk --- src/core/server.rs | 5 ++++- src/router/run.rs | 1 - src/service/globals/data.rs | 8 +++++++- src/service/globals/mod.rs | 34 +--------------------------------- src/service/services.rs | 1 - 5 files changed, 12 insertions(+), 37 deletions(-) diff --git a/src/core/server.rs b/src/core/server.rs index 6cf477cb..7be6fa58 100644 --- a/src/core/server.rs +++ b/src/core/server.rs @@ -1,5 +1,5 @@ use std::{ - sync::atomic::{AtomicBool, AtomicU32}, + sync::atomic::{AtomicBool, AtomicU32, Ordering}, time::SystemTime, }; @@ -65,4 +65,7 @@ impl Server { .as_ref() .expect("runtime handle available in Server") } + + #[inline] + pub fn running(&self) -> bool { !self.stopping.load(Ordering::Acquire) } } diff --git a/src/router/run.rs b/src/router/run.rs index 85e058cf..80a7d1ee 100644 --- a/src/router/run.rs +++ b/src/router/run.rs @@ -97,7 +97,6 @@ async fn signal(server: Arc, tx: Sender<()>, handle: axum_server::Handle } server.stopping.store(true, Ordering::Release); - services().globals.rotate.fire(); if let Err(e) = tx.send(()) { error!("failed sending shutdown transaction to channel: {e}"); } diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index 354ff2f7..b49fc4e9 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -159,7 +159,13 @@ impl Data for KeyValueDatabase { // One time keys futures.push(self.userid_lastonetimekeyupdate.watch_prefix(&userid_bytes)); - futures.push(Box::pin(services().globals.rotate.watch())); + futures.push(Box::pin(async move { + let _result = services().server.signal.subscribe().recv().await; + })); + + if !services().server.running() { + return Ok(()); + } // Wait until one of them finds something trace!(futures = futures.len(), "watch started"); diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 7543741f..e806a6b0 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -8,7 +8,6 @@ pub(super) mod updates; use std::{ collections::{BTreeMap, HashMap}, fs, - future::Future, path::PathBuf, sync::Arc, time::Instant, @@ -29,7 +28,7 @@ use ruma::{ ServerName, UserId, }; use tokio::{ - sync::{broadcast, Mutex, RwLock}, + sync::{Mutex, RwLock}, task::JoinHandle, }; use tracing::{error, trace}; @@ -59,36 +58,6 @@ pub struct Service { pub roomid_federationhandletime: RwLock>, pub updates_handle: Mutex>>, pub stateres_mutex: Arc>, - pub rotate: RotationHandler, -} - -/// Handles "rotation" of long-polling requests. "Rotation" in this context is -/// similar to "rotation" of log files and the like. -/// -/// This is utilized to have sync workers return early and release read locks on -/// the database. -pub struct RotationHandler(broadcast::Sender<()>, ()); - -impl RotationHandler { - fn new() -> Self { - let (s, _r) = broadcast::channel(1); - Self(s, ()) - } - - pub fn watch(&self) -> impl Future { - let mut r = self.0.subscribe(); - #[allow(clippy::let_underscore_must_use)] - async move { - _ = r.recv().await; - } - } - - #[allow(clippy::let_underscore_must_use)] - pub fn fire(&self) { _ = self.0.send(()); } -} - -impl Default for RotationHandler { - fn default() -> Self { Self::new() } } impl Service { @@ -149,7 +118,6 @@ impl Service { roomid_federationhandletime: RwLock::new(HashMap::new()), updates_handle: Mutex::new(None), stateres_mutex: Arc::new(Mutex::new(())), - rotate: RotationHandler::new(), }; fs::create_dir_all(s.get_media_folder())?; diff --git a/src/service/services.rs b/src/service/services.rs index a62fa7e1..d9805010 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -303,7 +303,6 @@ bad_signature_ratelimiter: {bad_signature_ratelimiter} trace!("Interrupting services..."); self.server.stopping.store(true, atomic::Ordering::Release); - self.globals.rotate.fire(); self.sending.interrupt(); self.presence.interrupt(); self.admin.interrupt();