diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 8c813970..2323aa57 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -25,7 +25,7 @@ use ruma::{ events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, }; -use tokio::sync::{Mutex, Semaphore}; +use tokio::sync::{mpsc, Mutex, Semaphore}; use tracing::{error, warn}; use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result}; @@ -42,8 +42,8 @@ pub struct Service { /// The state for a given state hash. pub(super) maximum_requests: Arc, - pub sender: loole::Sender<(OutgoingKind, SendingEventType, Vec)>, - receiver: Mutex)>>, + pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec)>, + receiver: Mutex)>>, startup_netburst: bool, startup_netburst_keep: i64, timeout: u64, @@ -72,7 +72,7 @@ enum TransactionStatus { impl Service { pub fn build(db: &'static dyn Data, config: &Config) -> Arc { - let (sender, receiver) = loole::unbounded(); + let (sender, receiver) = mpsc::unbounded_channel(); Arc::new(Self { db, sender, @@ -274,7 +274,7 @@ impl Service { #[tracing::instrument(skip(self), name = "sender")] async fn handler(&self) -> Result<()> { - let receiver = self.receiver.lock().await; + let mut receiver = self.receiver.lock().await; let mut futures = FuturesUnordered::new(); let mut current_transaction_status = HashMap::::new(); @@ -342,16 +342,13 @@ impl Service { } }; }, - - event = receiver.recv_async() => { - if let Ok((outgoing_kind, event, key)) = event { - if let Ok(Some(events)) = self.select_events( - &outgoing_kind, - vec![(event, key)], - &mut current_transaction_status, - ) { - futures.push(handle_events(outgoing_kind, events)); - } + Some((outgoing_kind, event, key)) = receiver.recv() => { + if let Ok(Some(events)) = self.select_events( + &outgoing_kind, + vec![(event, key)], + &mut current_transaction_status, + ) { + futures.push(handle_events(outgoing_kind, events)); } } }