diff --git a/Cargo.lock b/Cargo.lock index 9f1610b7..db185614 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -484,6 +484,7 @@ dependencies = [ "itertools 0.12.1", "jsonwebtoken", "log", + "loole", "lru-cache", "nix", "num_cpus", @@ -1479,6 +1480,12 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "loole" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6725f0feab07fcf90f6de5417c06d7fef976fa6e5912fa9e21cb5e4dc6ae5da" + [[package]] name = "lru-cache" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 40a67355..00cd6c66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,9 @@ cyborgtime = "2.1.1" bytes = "1.6.0" http = "0.2.12" +# used to replace the channels of the tokio runtime +loole = "0.3.0" + # standard date and time tools [dependencies.chrono] version = "0.4.37" diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 398e1d62..5754a435 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -23,7 +23,7 @@ use ruma::{ EventId, MxcUri, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use serde_json::value::to_raw_value; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::Mutex; use tracing::{error, warn}; use super::pdu::PduBuilder; @@ -91,13 +91,13 @@ pub enum AdminRoomEvent { } pub struct Service { - pub sender: mpsc::UnboundedSender, - receiver: Mutex>, + pub sender: loole::Sender, + receiver: Mutex>, } impl Service { pub fn build() -> Arc { - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = loole::unbounded(); Arc::new(Self { sender, receiver: Mutex::new(receiver), @@ -115,7 +115,7 @@ impl Service { } async fn handler(&self) -> Result<()> { - let mut receiver = self.receiver.lock().await; + let receiver = self.receiver.lock().await; // TODO: Use futures when we have long admin commands //let mut futures = FuturesUnordered::new(); @@ -125,63 +125,72 @@ impl Service { if let Ok(Some(conduit_room)) = Self::get_admin_room() { loop { tokio::select! { - Some(event) = receiver.recv() => { - let (mut message_content, reply) = match event { - AdminRoomEvent::SendMessage(content) => (content, None), - AdminRoomEvent::ProcessMessage(room_message, reply_id) => { - (self.process_admin_message(room_message).await, Some(reply_id)) + event = receiver.recv_async() => { + match event { + Ok(event) => { + let (mut message_content, reply) = match event { + AdminRoomEvent::SendMessage(content) => (content, None), + AdminRoomEvent::ProcessMessage(room_message, reply_id) => { + (self.process_admin_message(room_message).await, Some(reply_id)) + } + }; + + let mutex_state = Arc::clone( + services().globals + .roomid_mutex_state + .write() + .await + .entry(conduit_room.clone()) + .or_default(), + ); + + let state_lock = mutex_state.lock().await; + + if let Some(reply) = reply { + message_content.relates_to = Some(Reply { in_reply_to: InReplyTo { event_id: reply.into() } }); + } + + if let Err(e) = services().rooms.timeline.build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMessage, + content: to_raw_value(&message_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &conduit_room, + &state_lock) + .await { + error!("Failed to build and append admin room response PDU: \"{e}\""); + + let error_room_message = RoomMessageEventContent::text_plain(format!("Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished successfully, but we could not return the output.")); + + services().rooms.timeline.build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMessage, + content: to_raw_value(&error_room_message) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &conduit_room, + &state_lock) + .await?; + } + drop(state_lock); } - }; + Err(_) => { + // TODO: Handle error, Im too unfamiliar with the codebase to know what to do here - let mutex_state = Arc::clone( - services().globals - .roomid_mutex_state - .write() - .await - .entry(conduit_room.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + // recv_async returns an error if all senders have been dropped. If the channel is empty, the returned future will yield to the async runtime. - if let Some(reply) = reply { - message_content.relates_to = Some(Reply { in_reply_to: InReplyTo { event_id: reply.into() } }); + } } - - if let Err(e) = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMessage, - content: to_raw_value(&message_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - &state_lock) - .await { - error!("Failed to build and append admin room response PDU: \"{e}\""); - - let error_room_message = RoomMessageEventContent::text_plain(format!("Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished successfully, but we could not return the output.")); - - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMessage, - content: to_raw_value(&error_room_message) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - &state_lock) - .await?; - } - - - drop(state_lock); } } } diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index 1b8672f7..5f64d30a 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -10,10 +10,7 @@ use ruma::{ OwnedUserId, UInt, UserId, }; use serde::{Deserialize, Serialize}; -use tokio::{ - sync::{mpsc, Mutex}, - time::sleep, -}; +use tokio::{sync::Mutex, time::sleep}; use tracing::{debug, error}; use crate::{services, utils, Config, Error, Result}; @@ -71,14 +68,14 @@ impl Presence { pub struct Service { pub db: &'static dyn Data, - pub timer_sender: mpsc::UnboundedSender<(OwnedUserId, Duration)>, - timer_receiver: Mutex>, + pub timer_sender: loole::Sender<(OwnedUserId, Duration)>, + timer_receiver: Mutex>, timeout_remote_users: bool, } impl Service { pub fn build(db: &'static dyn Data, config: &Config) -> Arc { - let (timer_sender, timer_receiver) = mpsc::unbounded_channel(); + let (timer_sender, timer_receiver) = loole::unbounded(); Arc::new(Self { db, @@ -173,12 +170,21 @@ impl Service { async fn handler(&self) -> Result<()> { let mut presence_timers = FuturesUnordered::new(); - let mut receiver = self.timer_receiver.lock().await; + let receiver = self.timer_receiver.lock().await; loop { tokio::select! { - Some((user_id, timeout)) = receiver.recv() => { - debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len()); - presence_timers.push(presence_timer(user_id, timeout)); + event = receiver.recv_async() => { + + match event { + Ok((user_id, timeout)) => { + debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len()); + presence_timers.push(presence_timer(user_id, timeout)); + } + Err(e) => { + // TODO: Handle error better? I have no idea what to do here. + error!("Failed to receive presence timer: {}", e); + } + } } Some(user_id) = presence_timers.next() => { diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index c802278b..74f1d436 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -25,10 +25,7 @@ use ruma::{ events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, }; -use tokio::{ - select, - sync::{mpsc, Mutex, Semaphore}, -}; +use tokio::sync::{Mutex, Semaphore}; use tracing::{error, warn}; use crate::{services, utils::calculate_hash, Config, Error, PduEvent, Result}; @@ -43,8 +40,8 @@ pub struct Service { /// The state for a given state hash. pub(super) maximum_requests: Arc, - pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec)>, - receiver: Mutex)>>, + pub sender: loole::Sender<(OutgoingKind, SendingEventType, Vec)>, + receiver: Mutex)>>, startup_netburst: bool, startup_netburst_keep: i64, timeout: u64, @@ -73,7 +70,7 @@ enum TransactionStatus { impl Service { pub fn build(db: &'static dyn Data, config: &Config) -> Arc { - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = loole::unbounded(); Arc::new(Self { db, sender, @@ -275,7 +272,7 @@ impl Service { #[tracing::instrument(skip(self), name = "sender")] async fn handler(&self) -> Result<()> { - let mut receiver = self.receiver.lock().await; + let receiver = self.receiver.lock().await; let mut futures = FuturesUnordered::new(); let mut current_transaction_status = HashMap::::new(); @@ -306,7 +303,7 @@ impl Service { } loop { - select! { + tokio::select! { Some(response) = futures.next() => { match response { Ok(outgoing_kind) => { @@ -343,13 +340,17 @@ impl Service { } }; }, - Some((outgoing_kind, event, key)) = receiver.recv() => { - if let Ok(Some(events)) = self.select_events( - &outgoing_kind, - vec![(event, key)], - &mut current_transaction_status, - ) { - futures.push(handle_events(outgoing_kind, events)); + + event = receiver.recv_async() => { + // TODO: Error handling for this + if let Ok((outgoing_kind, event, key)) = event { + if let Ok(Some(events)) = self.select_events( + &outgoing_kind, + vec![(event, key)], + &mut current_transaction_status, + ) { + futures.push(handle_events(outgoing_kind, events)); + } } } }