dont use loole for sending channel code

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-04-17 12:52:59 -04:00 committed by June
parent 11a2da3819
commit d0a9666a29

View file

@ -25,7 +25,7 @@ use ruma::{
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
}; };
use tokio::sync::{Mutex, Semaphore}; use tokio::sync::{mpsc, Mutex, Semaphore};
use tracing::{error, warn}; use tracing::{error, warn};
use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result}; use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result};
@ -42,8 +42,8 @@ pub struct Service {
/// The state for a given state hash. /// The state for a given state hash.
pub(super) maximum_requests: Arc<Semaphore>, pub(super) maximum_requests: Arc<Semaphore>,
pub sender: loole::Sender<(OutgoingKind, SendingEventType, Vec<u8>)>, pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>,
receiver: Mutex<loole::Receiver<(OutgoingKind, SendingEventType, Vec<u8>)>>, receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>,
startup_netburst: bool, startup_netburst: bool,
startup_netburst_keep: i64, startup_netburst_keep: i64,
timeout: u64, timeout: u64,
@ -72,7 +72,7 @@ enum TransactionStatus {
impl Service { impl Service {
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
let (sender, receiver) = loole::unbounded(); let (sender, receiver) = mpsc::unbounded_channel();
Arc::new(Self { Arc::new(Self {
db, db,
sender, sender,
@ -274,7 +274,7 @@ impl Service {
#[tracing::instrument(skip(self), name = "sender")] #[tracing::instrument(skip(self), name = "sender")]
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let receiver = self.receiver.lock().await; let mut receiver = self.receiver.lock().await;
let mut futures = FuturesUnordered::new(); let mut futures = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new(); let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new();
@ -342,16 +342,13 @@ impl Service {
} }
}; };
}, },
Some((outgoing_kind, event, key)) = receiver.recv() => {
event = receiver.recv_async() => { if let Ok(Some(events)) = self.select_events(
if let Ok((outgoing_kind, event, key)) = event { &outgoing_kind,
if let Ok(Some(events)) = self.select_events( vec![(event, key)],
&outgoing_kind, &mut current_transaction_status,
vec![(event, key)], ) {
&mut current_transaction_status, futures.push(handle_events(outgoing_kind, events));
) {
futures.push(handle_events(outgoing_kind, events));
}
} }
} }
} }