From 8b5b7a1f637322e651c84d11a7b1616c7e29952e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sat, 8 Oct 2022 13:57:01 +0200 Subject: [PATCH] fix: panic on launch Now we start the admin and sending threads at a later time. --- src/api/client_server/directory.rs | 11 ++++---- src/database/mod.rs | 9 +++--- src/main.rs | 2 +- src/service/account_data/mod.rs | 2 +- src/service/admin/mod.rs | 26 ++++++++++-------- src/service/appservice/mod.rs | 1 - src/service/key_backups/mod.rs | 2 +- src/service/rooms/alias/mod.rs | 1 - src/service/rooms/directory/mod.rs | 1 - src/service/rooms/edus/presence/mod.rs | 2 +- src/service/rooms/edus/read_receipt/mod.rs | 1 - src/service/rooms/edus/typing/mod.rs | 1 - src/service/rooms/lazy_loading/mod.rs | 2 +- src/service/rooms/metadata/mod.rs | 1 - src/service/rooms/outlier/mod.rs | 1 - src/service/rooms/search/mod.rs | 1 - src/service/rooms/user/mod.rs | 1 - src/service/sending/mod.rs | 32 +++++++++++----------- src/service/transaction_ids/mod.rs | 1 - src/service/uiaa/mod.rs | 1 - 20 files changed, 46 insertions(+), 53 deletions(-) diff --git a/src/api/client_server/directory.rs b/src/api/client_server/directory.rs index c1b0eda5..90f79a02 100644 --- a/src/api/client_server/directory.rs +++ b/src/api/client_server/directory.rs @@ -29,7 +29,7 @@ use ruma::{ }, ServerName, UInt, }; -use tracing::{info, warn}; +use tracing::{error, info, warn}; /// # `POST /_matrix/client/r0/publicRooms` /// @@ -279,15 +279,14 @@ pub(crate) async fn get_public_rooms_filtered_helper( JoinRule::Knock => Some(PublicRoomJoinRule::Knock), _ => None, }) - .map_err(|_| { - Error::bad_database("Invalid room join rule event in database.") + .map_err(|e| { + error!("Invalid room join rule event in database: {}", e); + Error::BadDatabase("Invalid room join rule event in database.") }) }) .transpose()? .flatten() - .ok_or(Error::bad_database( - "Invalid room join rule event in database.", - ))?, + .ok_or_else(|| Error::bad_database("Missing room join rule event for room."))?, room_id, }; Ok(chunk) diff --git a/src/database/mod.rs b/src/database/mod.rs index 882455f8..967ec885 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,10 +1,7 @@ pub mod abstraction; pub mod key_value; -use crate::{ - services, utils, Config, Error, - PduEvent, Result, Services, SERVICES, -}; +use crate::{services, utils, Config, Error, PduEvent, Result, Services, SERVICES}; use abstraction::KeyValueDatabaseEngine; use abstraction::KvTree; use directories::ProjectDirs; @@ -830,6 +827,8 @@ impl KeyValueDatabase { // This data is probably outdated db.presenceid_presence.clear()?; + services().admin.start_handler(); + // Set emergency access for the conduit user match set_emergency_access() { Ok(pwd_set) => { @@ -846,6 +845,8 @@ impl KeyValueDatabase { } }; + services().sending.start_handler(); + Self::start_cleanup_task().await; Ok(()) diff --git a/src/main.rs b/src/main.rs index 1aad62bd..ce7e5785 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,7 +28,7 @@ use http::{ }; use opentelemetry::trace::{FutureExt, Tracer}; use ruma::api::{client::error::ErrorKind, IncomingRequest}; -use tokio::{signal}; +use tokio::signal; use tower::ServiceBuilder; use tower_http::{ cors::{self, CorsLayer}, diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index 0387b139..f9c49b1a 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -8,7 +8,7 @@ use ruma::{ RoomId, UserId, }; -use std::{collections::HashMap}; +use std::collections::HashMap; use crate::Result; diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 8f33056f..218a4ea4 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -26,7 +26,7 @@ use ruma::{ EventId, RoomAliasId, RoomId, RoomName, RoomVersionId, ServerName, UserId, }; use serde_json::value::to_raw_value; -use tokio::sync::{mpsc, MutexGuard}; +use tokio::sync::{mpsc, Mutex, MutexGuard}; use crate::{ api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH}, @@ -164,25 +164,29 @@ pub enum AdminRoomEvent { SendMessage(RoomMessageEventContent), } -#[derive(Clone)] pub struct Service { pub sender: mpsc::UnboundedSender, + receiver: Mutex>, } impl Service { pub fn build() -> Arc { let (sender, receiver) = mpsc::unbounded_channel(); - let self1 = Arc::new(Self { sender }); - let self2 = Arc::clone(&self1); - - tokio::spawn(async move { - self2.start_handler(receiver).await; - }); - - self1 + Arc::new(Self { + sender, + receiver: Mutex::new(receiver), + }) } - async fn start_handler(&self, mut receiver: mpsc::UnboundedReceiver) { + pub fn start_handler(self: &Arc) { + let self2 = Arc::clone(&self); + tokio::spawn(async move { + self2.handler().await; + }); + } + + async fn handler(&self) { + let mut receiver = self.receiver.lock().await; // TODO: Use futures when we have long admin commands //let mut futures = FuturesUnordered::new(); diff --git a/src/service/appservice/mod.rs b/src/service/appservice/mod.rs index 17402f4c..3052964d 100644 --- a/src/service/appservice/mod.rs +++ b/src/service/appservice/mod.rs @@ -1,6 +1,5 @@ mod data; - pub use data::Data; use crate::Result; diff --git a/src/service/key_backups/mod.rs b/src/service/key_backups/mod.rs index 51117cd5..fef46130 100644 --- a/src/service/key_backups/mod.rs +++ b/src/service/key_backups/mod.rs @@ -7,7 +7,7 @@ use ruma::{ serde::Raw, RoomId, UserId, }; -use std::{collections::BTreeMap}; +use std::collections::BTreeMap; pub struct Service { pub db: &'static dyn Data, diff --git a/src/service/rooms/alias/mod.rs b/src/service/rooms/alias/mod.rs index 600a1201..6b52549a 100644 --- a/src/service/rooms/alias/mod.rs +++ b/src/service/rooms/alias/mod.rs @@ -1,6 +1,5 @@ mod data; - pub use data::Data; use crate::Result; diff --git a/src/service/rooms/directory/mod.rs b/src/service/rooms/directory/mod.rs index fcc0003d..0c1b2cd4 100644 --- a/src/service/rooms/directory/mod.rs +++ b/src/service/rooms/directory/mod.rs @@ -1,6 +1,5 @@ mod data; - pub use data::Data; use ruma::RoomId; diff --git a/src/service/rooms/edus/presence/mod.rs b/src/service/rooms/edus/presence/mod.rs index 0c3a3d6c..36814309 100644 --- a/src/service/rooms/edus/presence/mod.rs +++ b/src/service/rooms/edus/presence/mod.rs @@ -1,5 +1,5 @@ mod data; -use std::{collections::HashMap}; +use std::collections::HashMap; pub use data::Data; use ruma::{events::presence::PresenceEvent, RoomId, UserId}; diff --git a/src/service/rooms/edus/read_receipt/mod.rs b/src/service/rooms/edus/read_receipt/mod.rs index 3664fe9b..1b3ddb12 100644 --- a/src/service/rooms/edus/read_receipt/mod.rs +++ b/src/service/rooms/edus/read_receipt/mod.rs @@ -1,6 +1,5 @@ mod data; - pub use data::Data; use crate::Result; diff --git a/src/service/rooms/edus/typing/mod.rs b/src/service/rooms/edus/typing/mod.rs index 3d8afe68..d05ec900 100644 --- a/src/service/rooms/edus/typing/mod.rs +++ b/src/service/rooms/edus/typing/mod.rs @@ -1,6 +1,5 @@ mod data; - pub use data::Data; use ruma::{events::SyncEphemeralRoomEvent, RoomId, UserId}; diff --git a/src/service/rooms/lazy_loading/mod.rs b/src/service/rooms/lazy_loading/mod.rs index 4ef58fd0..b30bb9c1 100644 --- a/src/service/rooms/lazy_loading/mod.rs +++ b/src/service/rooms/lazy_loading/mod.rs @@ -1,7 +1,7 @@ mod data; use std::{ collections::{HashMap, HashSet}, - sync::{Mutex}, + sync::Mutex, }; pub use data::Data; diff --git a/src/service/rooms/metadata/mod.rs b/src/service/rooms/metadata/mod.rs index 1a36010f..c99ae4a2 100644 --- a/src/service/rooms/metadata/mod.rs +++ b/src/service/rooms/metadata/mod.rs @@ -1,6 +1,5 @@ mod data; - pub use data::Data; use ruma::RoomId; diff --git a/src/service/rooms/outlier/mod.rs b/src/service/rooms/outlier/mod.rs index 2b5976ce..c84e975a 100644 --- a/src/service/rooms/outlier/mod.rs +++ b/src/service/rooms/outlier/mod.rs @@ -1,6 +1,5 @@ mod data; - pub use data::Data; use ruma::{signatures::CanonicalJsonObject, EventId}; diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index d15e42ef..b6f35e79 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -1,6 +1,5 @@ mod data; - pub use data::Data; use crate::Result; diff --git a/src/service/rooms/user/mod.rs b/src/service/rooms/user/mod.rs index b7e2c00f..479e5568 100644 --- a/src/service/rooms/user/mod.rs +++ b/src/service/rooms/user/mod.rs @@ -1,6 +1,5 @@ mod data; - pub use data::Data; use ruma::{RoomId, UserId}; diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index f9e81d63..60fc6f46 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -12,7 +12,7 @@ use std::{ use crate::{ api::{appservice_server, server_server}, services, - utils::{calculate_hash}, + utils::calculate_hash, Config, Error, PduEvent, Result, }; use federation::transactions::send_transaction_message; @@ -37,7 +37,7 @@ use ruma::{ }; use tokio::{ select, - sync::{mpsc, Semaphore}, + sync::{mpsc, Mutex, Semaphore}, }; use tracing::{error, warn}; @@ -88,6 +88,7 @@ pub struct Service { /// The state for a given state hash. pub(super) maximum_requests: Arc, pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec)>, + receiver: Mutex)>>, } enum TransactionStatus { @@ -99,25 +100,24 @@ enum TransactionStatus { impl Service { pub fn build(db: &'static dyn Data, config: &Config) -> Arc { let (sender, receiver) = mpsc::unbounded_channel(); - - let self1 = Arc::new(Self { + Arc::new(Self { db, sender, + receiver: Mutex::new(receiver), maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)), - }); - let self2 = Arc::clone(&self1); - - tokio::spawn(async move { - self2.start_handler(receiver).await.unwrap(); - }); - - self1 + }) } - async fn start_handler( - &self, - mut receiver: mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec)>, - ) -> Result<()> { + pub fn start_handler(self: &Arc) { + let self2 = Arc::clone(&self); + tokio::spawn(async move { + self2.handler().await.unwrap(); + }); + } + + async fn handler(&self) -> Result<()> { + let mut receiver = self.receiver.lock().await; + let mut futures = FuturesUnordered::new(); let mut current_transaction_status = HashMap::::new(); diff --git a/src/service/transaction_ids/mod.rs b/src/service/transaction_ids/mod.rs index 0cc30dbb..2fa3b02e 100644 --- a/src/service/transaction_ids/mod.rs +++ b/src/service/transaction_ids/mod.rs @@ -1,6 +1,5 @@ mod data; - pub use data::Data; use crate::Result; diff --git a/src/service/uiaa/mod.rs b/src/service/uiaa/mod.rs index 1170193f..e827cc89 100644 --- a/src/service/uiaa/mod.rs +++ b/src/service/uiaa/mod.rs @@ -1,6 +1,5 @@ mod data; - pub use data::Data; use ruma::{