From 7822a385bbbdc608501a93499b83a3ca89fafb0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sat, 8 Oct 2022 13:03:07 +0200 Subject: [PATCH] cargo fmt --- src/api/client_server/account.rs | 9 +-- src/api/client_server/membership.rs | 7 +- src/api/server_server.rs | 15 ++-- src/database/key_value/account_data.rs | 6 +- src/database/key_value/pusher.rs | 20 ++++-- .../key_value/rooms/edus/read_receipt.rs | 12 ++-- src/database/key_value/rooms/metadata.rs | 3 +- src/database/key_value/rooms/search.rs | 2 +- src/database/key_value/users.rs | 4 +- src/database/mod.rs | 10 ++- src/lib.rs | 6 +- src/main.rs | 2 +- src/service/account_data/mod.rs | 6 +- src/service/admin/mod.rs | 16 ++--- src/service/globals/mod.rs | 4 +- src/service/key_backups/mod.rs | 6 +- src/service/media/mod.rs | 2 +- src/service/mod.rs | 4 +- src/service/pusher/data.rs | 6 +- src/service/pusher/mod.rs | 11 +-- src/service/rooms/edus/read_receipt/data.rs | 12 ++-- src/service/rooms/event_handler/mod.rs | 2 +- src/service/rooms/search/data.rs | 2 +- src/service/rooms/search/mod.rs | 7 +- src/service/rooms/short/mod.rs | 2 +- src/service/rooms/state/mod.rs | 3 +- src/service/rooms/state_cache/mod.rs | 5 +- src/service/rooms/timeline/mod.rs | 6 +- src/service/sending/mod.rs | 72 ++++++++++++++----- 29 files changed, 143 insertions(+), 119 deletions(-) diff --git a/src/api/client_server/account.rs b/src/api/client_server/account.rs index e27d295e..58624a28 100644 --- a/src/api/client_server/account.rs +++ b/src/api/client_server/account.rs @@ -1,5 +1,3 @@ - - use super::{DEVICE_ID_LENGTH, SESSION_ID_LENGTH, TOKEN_LENGTH}; use crate::{api::client_server, services, utils, Error, Result, Ruma}; use ruma::{ @@ -11,12 +9,7 @@ use ruma::{ error::ErrorKind, uiaa::{AuthFlow, AuthType, UiaaInfo}, }, - events::{ - room::{ - message::RoomMessageEventContent, - }, - GlobalAccountDataEventType, - }, + events::{room::message::RoomMessageEventContent, GlobalAccountDataEventType}, push, UserId, }; diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index 0aae9959..d971e6b7 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -11,12 +11,11 @@ use ruma::{ federation::{self, membership::create_invite}, }, events::{ - room::{ - member::{MembershipState, RoomMemberEventContent}, - }, + room::member::{MembershipState, RoomMemberEventContent}, RoomEventType, StateEventType, }, - serde::{to_canonical_value, Base64, CanonicalJsonObject, CanonicalJsonValue}, EventId, RoomId, RoomVersionId, ServerName, UserId, + serde::{to_canonical_value, Base64, CanonicalJsonObject, CanonicalJsonValue}, + EventId, RoomId, RoomVersionId, ServerName, UserId, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ diff --git a/src/api/server_server.rs b/src/api/server_server.rs index bcf893c6..66aac9e9 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -4,7 +4,7 @@ use crate::{ services, utils, Error, PduEvent, Result, Ruma, }; use axum::{response::IntoResponse, Json}; -use futures_util::{StreamExt}; +use futures_util::StreamExt; use get_profile_information::v1::ProfileField; use http::header::{HeaderValue, AUTHORIZATION}; @@ -15,10 +15,7 @@ use ruma::{ authorization::get_event_authorization, device::get_devices::{self, v1::UserDevice}, directory::{get_public_rooms, get_public_rooms_filtered}, - discovery::{ - get_server_keys, - get_server_version, ServerSigningKeys, VerifyKey, - }, + discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey}, event::{get_event, get_missing_events, get_room_state, get_room_state_ids}, keys::{claim_keys, get_keys}, membership::{ @@ -46,13 +43,13 @@ use ruma::{ }, receipt::ReceiptType, serde::{Base64, JsonObject, Raw}, - signatures::{CanonicalJsonValue}, - to_device::DeviceIdOrAllDevices, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName, - ServerSigningKeyId, + signatures::CanonicalJsonValue, + to_device::DeviceIdOrAllDevices, + EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName, ServerSigningKeyId, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ - collections::{BTreeMap}, + collections::BTreeMap, fmt::Debug, mem, net::{IpAddr, SocketAddr}, diff --git a/src/database/key_value/account_data.rs b/src/database/key_value/account_data.rs index 0e8029ff..e1eef966 100644 --- a/src/database/key_value/account_data.rs +++ b/src/database/key_value/account_data.rs @@ -1,12 +1,12 @@ use std::collections::HashMap; use ruma::{ - api::client::{error::ErrorKind}, + api::client::error::ErrorKind, events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, - serde::Raw, RoomId, UserId, + serde::Raw, + RoomId, UserId, }; - use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::account_data::Data for KeyValueDatabase { diff --git a/src/database/key_value/pusher.rs b/src/database/key_value/pusher.rs index 1468a553..42d4030b 100644 --- a/src/database/key_value/pusher.rs +++ b/src/database/key_value/pusher.rs @@ -3,7 +3,7 @@ use ruma::{ UserId, }; -use crate::{database::KeyValueDatabase, service, Error, Result, utils}; +use crate::{database::KeyValueDatabase, service, utils, Error, Result}; impl service::pusher::Data for KeyValueDatabase { fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()> { @@ -28,7 +28,11 @@ impl service::pusher::Data for KeyValueDatabase { Ok(()) } - fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Result> { + fn get_pusher( + &self, + sender: &UserId, + pushkey: &str, + ) -> Result> { let mut senderkey = sender.as_bytes().to_vec(); senderkey.push(0xff); senderkey.extend_from_slice(pushkey.as_bytes()); @@ -55,15 +59,21 @@ impl service::pusher::Data for KeyValueDatabase { .collect() } - fn get_pushkeys<'a>(&'a self, sender: &UserId) -> Box> + 'a> { + fn get_pushkeys<'a>( + &'a self, + sender: &UserId, + ) -> Box> + 'a> { let mut prefix = sender.as_bytes().to_vec(); prefix.push(0xff); Box::new(self.senderkey_pusher.scan_prefix(prefix).map(|(k, _)| { let mut parts = k.splitn(2, |&b| b == 0xff); let _senderkey = parts.next(); - let push_key = parts.next().ok_or_else(|| Error::bad_database("Invalid senderkey_pusher in db"))?; - let push_key_string = utils::string_from_bytes(push_key).map_err(|_| Error::bad_database("Invalid pusher bytes in senderkey_pusher"))?; + let push_key = parts + .next() + .ok_or_else(|| Error::bad_database("Invalid senderkey_pusher in db"))?; + let push_key_string = utils::string_from_bytes(push_key) + .map_err(|_| Error::bad_database("Invalid pusher bytes in senderkey_pusher"))?; Ok(push_key_string) })) diff --git a/src/database/key_value/rooms/edus/read_receipt.rs b/src/database/key_value/rooms/edus/read_receipt.rs index 19c1ced7..a8349f6e 100644 --- a/src/database/key_value/rooms/edus/read_receipt.rs +++ b/src/database/key_value/rooms/edus/read_receipt.rs @@ -54,12 +54,12 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase { since: u64, ) -> Box< dyn Iterator< - Item = Result<( - Box, - u64, - Raw, - )>, - > + 'a, + Item = Result<( + Box, + u64, + Raw, + )>, + > + 'a, > { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); diff --git a/src/database/key_value/rooms/metadata.rs b/src/database/key_value/rooms/metadata.rs index 2ec18bed..0f61dbb4 100644 --- a/src/database/key_value/rooms/metadata.rs +++ b/src/database/key_value/rooms/metadata.rs @@ -1,6 +1,6 @@ use ruma::RoomId; -use crate::{database::KeyValueDatabase, service, services, Result, utils, Error}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::metadata::Data for KeyValueDatabase { fn exists(&self, room_id: &RoomId) -> Result { @@ -27,7 +27,6 @@ impl service::rooms::metadata::Data for KeyValueDatabase { ) .map_err(|_| Error::bad_database("Room ID in roomid_shortroomid is invalid.")) })) - } fn is_disabled(&self, room_id: &RoomId) -> Result { diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index 8aa7a639..788c2965 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -26,7 +26,7 @@ impl service::rooms::search::Data for KeyValueDatabase { &'a self, room_id: &RoomId, search_string: &str, - ) -> Result>+ 'a>, Vec)>> { + ) -> Result> + 'a>, Vec)>> { let prefix = services() .rooms .short diff --git a/src/database/key_value/users.rs b/src/database/key_value/users.rs index 86689f85..8213c5d7 100644 --- a/src/database/key_value/users.rs +++ b/src/database/key_value/users.rs @@ -5,8 +5,7 @@ use ruma::{ encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::{AnyToDeviceEvent, StateEventType}, serde::Raw, - DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, MxcUri, - UInt, UserId, + DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, MxcUri, UInt, UserId, }; use tracing::warn; @@ -956,4 +955,3 @@ fn get_username_with_valid_password(username: &[u8], password: &[u8]) -> Option< } } } - diff --git a/src/database/mod.rs b/src/database/mod.rs index 191cd62f..977daf35 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -2,15 +2,13 @@ pub mod abstraction; pub mod key_value; use crate::{ - service::{ - rooms::{state_compressor::CompressedStateEvent}, - }, - services, utils, Config, Error, PduEvent, Result, Services, SERVICES, + service::rooms::state_compressor::CompressedStateEvent, services, utils, Config, Error, + PduEvent, Result, Services, SERVICES, }; use abstraction::KeyValueDatabaseEngine; use abstraction::KvTree; use directories::ProjectDirs; -use futures_util::{StreamExt}; +use futures_util::StreamExt; use lru_cache::LruCache; use ruma::{ events::{ @@ -29,7 +27,7 @@ use std::{ path::Path, sync::{Arc, Mutex, RwLock}, }; -use tokio::sync::{mpsc}; +use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; pub struct KeyValueDatabase { diff --git a/src/lib.rs b/src/lib.rs index 9c397c08..541b8c8d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,15 +13,13 @@ mod database; mod service; mod utils; -use std::{ - sync::{RwLock}, -}; +use std::sync::RwLock; pub use api::ruma_wrapper::{Ruma, RumaResponse}; pub use config::Config; +pub use database::KeyValueDatabase; pub use service::{pdu::PduEvent, Services}; pub use utils::error::{Error, Result}; -pub use database::KeyValueDatabase; pub static SERVICES: RwLock> = RwLock::new(None); diff --git a/src/main.rs b/src/main.rs index 71eaa660..c7ef62d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,7 +35,7 @@ use tower_http::{ trace::TraceLayer, ServiceBuilderExt as _, }; -use tracing::{warn, info}; +use tracing::{info, warn}; use tracing_subscriber::{prelude::*, EnvFilter}; pub use conduit::*; // Re-export everything from the library crate diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index 975c8203..a4a678d6 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -4,13 +4,13 @@ pub use data::Data; use ruma::{ events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, - serde::Raw, RoomId, UserId, + serde::Raw, + RoomId, UserId, }; use std::{collections::HashMap, sync::Arc}; - -use crate::{Result}; +use crate::Result; pub struct Service { pub db: &'static dyn Data, diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 2c776611..8f33056f 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -29,9 +29,7 @@ use serde_json::value::to_raw_value; use tokio::sync::{mpsc, MutexGuard}; use crate::{ - api::{ - client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH}, - }, + api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH}, services, utils::{self, HtmlEscape}, Error, PduEvent, Result, @@ -177,7 +175,9 @@ impl Service { let self1 = Arc::new(Self { sender }); let self2 = Arc::clone(&self1); - tokio::spawn(async move { self2.start_handler(receiver).await; }); + tokio::spawn(async move { + self2.start_handler(receiver).await; + }); self1 } @@ -186,9 +186,8 @@ impl Service { // TODO: Use futures when we have long admin commands //let mut futures = FuturesUnordered::new(); - let conduit_user = - UserId::parse(format!("@conduit:{}", services().globals.server_name())) - .expect("@conduit:server_name is valid"); + let conduit_user = UserId::parse(format!("@conduit:{}", services().globals.server_name())) + .expect("@conduit:server_name is valid"); let conduit_room = services() .rooms @@ -202,8 +201,7 @@ impl Service { .expect("Database data for admin room alias must be valid") .expect("Admin room must exist"); - let send_message = |message: RoomMessageEventContent, - mutex_lock: &MutexGuard<'_, ()>| { + let send_message = |message: RoomMessageEventContent, mutex_lock: &MutexGuard<'_, ()>| { services() .rooms .timeline diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 477b269d..054df095 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -3,15 +3,13 @@ pub use data::Data; use crate::api::server_server::FedDest; - use crate::{Config, Error, Result}; use ruma::{ api::{ client::sync::sync_events, federation::discovery::{ServerSigningKeys, VerifyKey}, }, - DeviceId, EventId, RoomId, RoomVersionId, ServerName, - ServerSigningKeyId, UserId, + DeviceId, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, }; use std::{ collections::{BTreeMap, HashMap}, diff --git a/src/service/key_backups/mod.rs b/src/service/key_backups/mod.rs index 5d0ad599..c8df0afc 100644 --- a/src/service/key_backups/mod.rs +++ b/src/service/key_backups/mod.rs @@ -1,11 +1,9 @@ mod data; pub use data::Data; -use crate::{Result}; +use crate::Result; use ruma::{ - api::client::{ - backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup}, - }, + api::client::backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup}, serde::Raw, RoomId, UserId, }; diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 29648577..96e9aa34 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -3,7 +3,7 @@ pub use data::Data; use crate::{services, Result}; use image::{imageops::FilterType, GenericImageView}; -use std::{sync::Arc}; +use std::sync::Arc; use tokio::{ fs::File, io::{AsyncReadExt, AsyncWriteExt}, diff --git a/src/service/mod.rs b/src/service/mod.rs index e8696e79..385dcc69 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap}, + collections::HashMap, sync::{Arc, Mutex}, }; @@ -49,7 +49,7 @@ impl Services { + key_backups::Data + media::Data + sending::Data - + 'static + + 'static, >( db: &'static D, config: Config, diff --git a/src/service/pusher/data.rs b/src/service/pusher/data.rs index cb8768d8..e3171210 100644 --- a/src/service/pusher/data.rs +++ b/src/service/pusher/data.rs @@ -7,9 +7,11 @@ use ruma::{ pub trait Data: Send + Sync { fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()>; - fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Result>; + fn get_pusher(&self, sender: &UserId, pushkey: &str) + -> Result>; fn get_pushers(&self, sender: &UserId) -> Result>; - fn get_pushkeys<'a>(&'a self, sender: &UserId) -> Box> + 'a>; + fn get_pushkeys<'a>(&'a self, sender: &UserId) + -> Box> + 'a>; } diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index 3b12f38b..f8e5bca6 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -34,7 +34,11 @@ impl Service { self.db.set_pusher(sender, pusher) } - pub fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Result> { + pub fn get_pusher( + &self, + sender: &UserId, + pushkey: &str, + ) -> Result> { self.db.get_pusher(sender, pushkey) } @@ -42,10 +46,7 @@ impl Service { self.db.get_pushers(sender) } - pub fn get_pushkeys<'a>( - &'a self, - sender: &UserId, - ) -> Box>> { + pub fn get_pushkeys<'a>(&'a self, sender: &UserId) -> Box>> { self.db.get_pushkeys(sender) } diff --git a/src/service/rooms/edus/read_receipt/data.rs b/src/service/rooms/edus/read_receipt/data.rs index 9a02ee40..800c035f 100644 --- a/src/service/rooms/edus/read_receipt/data.rs +++ b/src/service/rooms/edus/read_receipt/data.rs @@ -17,12 +17,12 @@ pub trait Data: Send + Sync { since: u64, ) -> Box< dyn Iterator< - Item = Result<( - Box, - u64, - Raw, - )>, - > + 'a, + Item = Result<( + Box, + u64, + Raw, + )>, + > + 'a, >; /// Sets a private read marker at `count`. diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 0c0bd2ce..e5f8424b 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -34,7 +34,7 @@ use ruma::{ state_res::{self, RoomVersion, StateMap}, uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName, ServerSigningKeyId, }; -use serde_json::value::{RawValue as RawJsonValue}; +use serde_json::value::RawValue as RawJsonValue; use tracing::{debug, error, info, trace, warn}; use crate::{service::*, services, Error, PduEvent, Result}; diff --git a/src/service/rooms/search/data.rs b/src/service/rooms/search/data.rs index bd7d61bb..82c08004 100644 --- a/src/service/rooms/search/data.rs +++ b/src/service/rooms/search/data.rs @@ -8,5 +8,5 @@ pub trait Data: Send + Sync { &'a self, room_id: &RoomId, search_string: &str, - ) -> Result>+ 'a>, Vec)>>; + ) -> Result> + 'a>, Vec)>>; } diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index 1d8d01e1..80356303 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -12,12 +12,7 @@ pub struct Service { impl Service { #[tracing::instrument(skip(self))] - pub fn index_pdu<'a>( - &self, - shortroomid: u64, - pdu_id: &[u8], - message_body: &str, - ) -> Result<()> { + pub fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()> { self.db.index_pdu(shortroomid, pdu_id, message_body) } diff --git a/src/service/rooms/short/mod.rs b/src/service/rooms/short/mod.rs index d847dea2..45fadd74 100644 --- a/src/service/rooms/short/mod.rs +++ b/src/service/rooms/short/mod.rs @@ -4,7 +4,7 @@ use std::sync::Arc; pub use data::Data; use ruma::{events::StateEventType, EventId, RoomId}; -use crate::{Result}; +use crate::Result; pub struct Service { pub db: &'static dyn Data, diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 614236ca..7b8b0fde 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -102,7 +102,8 @@ impl Service { services().rooms.state_cache.update_joined_count(room_id)?; - self.db.set_room_state(room_id, shortstatehash, &state_lock)?; + self.db + .set_room_state(room_id, shortstatehash, &state_lock)?; drop(state_lock); diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index cf4c6655..2b4762ae 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -5,10 +5,11 @@ pub use data::Data; use ruma::{ events::{ - direct::{DirectEvent}, + direct::DirectEvent, ignored_user_list::IgnoredUserListEvent, room::{create::RoomCreateEventContent, member::MembershipState}, - AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, + AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType, + RoomAccountDataEventType, StateEventType, }, serde::Raw, RoomId, ServerName, UserId, diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 01c54a3a..16f50d23 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -2,8 +2,8 @@ mod data; use std::collections::HashMap; +use std::collections::HashSet; use std::sync::{Arc, Mutex}; -use std::{collections::HashSet}; pub use data::Data; use regex::Regex; @@ -305,7 +305,9 @@ impl Service { } for push_key in services().pusher.get_pushkeys(&user) { - services().sending.send_push_pdu(&*pdu_id, &user, push_key?)?; + services() + .sending + .send_push_pdu(&*pdu_id, &user, push_key?)?; } } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index cb16e70d..b67f1e28 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -5,15 +5,16 @@ pub use data::Data; use std::{ collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, + iter, sync::Arc, - time::{Duration, Instant}, iter, + time::{Duration, Instant}, }; use crate::{ api::{appservice_server, server_server}, services, utils::{self, calculate_hash}, - Error, PduEvent, Result, Config, + Config, Error, PduEvent, Result, }; use federation::transactions::send_transaction_message; use futures_util::{stream::FuturesUnordered, StreamExt}; @@ -100,7 +101,11 @@ impl Service { pub fn build(db: &'static dyn Data, config: &Config) -> Arc { let (sender, receiver) = mpsc::unbounded_channel(); - let self1 = Arc::new(Self { db, sender, maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)) }); + let self1 = Arc::new(Self { + db, + sender, + maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)), + }); let self2 = Arc::clone(&self1); tokio::spawn(async move { @@ -110,7 +115,10 @@ impl Service { self1 } - async fn start_handler(&self, mut receiver: mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec)>) -> Result<()> { + async fn start_handler( + &self, + mut receiver: mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec)>, + ) -> Result<()> { let mut futures = FuturesUnordered::new(); let mut current_transaction_status = HashMap::::new(); @@ -118,8 +126,7 @@ impl Service { // Retry requests we could not finish yet let mut initial_transactions = HashMap::>::new(); - for (key, outgoing_kind, event) in self.db.active_requests().filter_map(|r| r.ok()) - { + for (key, outgoing_kind, event) in self.db.active_requests().filter_map(|r| r.ok()) { let entry = initial_transactions .entry(outgoing_kind.clone()) .or_insert_with(Vec::new); @@ -137,8 +144,7 @@ impl Service { } for (outgoing_kind, events) in initial_transactions { - current_transaction_status - .insert(outgoing_kind.clone(), TransactionStatus::Running); + current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); futures.push(Self::handle_events(outgoing_kind.clone(), events)); } @@ -235,7 +241,11 @@ impl Service { if retry { // We retry the previous transaction - for (_, e) in self.db.active_requests_for(outgoing_kind).filter_map(|r| r.ok()) { + for (_, e) in self + .db + .active_requests_for(outgoing_kind) + .filter_map(|r| r.ok()) + { events.push(e); } } else { @@ -276,7 +286,12 @@ impl Service { ); // Look for read receipts in this room - for r in services().rooms.edus.read_receipt.readreceipts_since(&room_id, since) { + for r in services() + .rooms + .edus + .read_receipt + .readreceipts_since(&room_id, since) + { let (user_id, count, read_receipt) = r?; if count > max_edu_count { @@ -359,7 +374,9 @@ impl Service { let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); let event = SendingEventType::Pdu(pdu_id.to_owned()); let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; - self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap(); + self.sender + .send((outgoing_kind, event, keys.into_iter().next().unwrap())) + .unwrap(); Ok(()) } @@ -370,10 +387,25 @@ impl Service { servers: I, pdu_id: &[u8], ) -> Result<()> { - let requests = servers.into_iter().map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned()))).collect::>(); - let keys = self.db.queue_requests(&requests.iter().map(|(o, e)| (o, e.clone())).collect::>())?; + let requests = servers + .into_iter() + .map(|server| { + ( + OutgoingKind::Normal(server), + SendingEventType::Pdu(pdu_id.to_owned()), + ) + }) + .collect::>(); + let keys = self.db.queue_requests( + &requests + .iter() + .map(|(o, e)| (o, e.clone())) + .collect::>(), + )?; for ((outgoing_kind, event), key) in requests.into_iter().zip(keys) { - self.sender.send((outgoing_kind.to_owned(), event, key)).unwrap(); + self.sender + .send((outgoing_kind.to_owned(), event, key)) + .unwrap(); } Ok(()) @@ -389,7 +421,9 @@ impl Service { let outgoing_kind = OutgoingKind::Normal(server.to_owned()); let event = SendingEventType::Edu(serialized); let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; - self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap(); + self.sender + .send((outgoing_kind, event, keys.into_iter().next().unwrap())) + .unwrap(); Ok(()) } @@ -399,7 +433,9 @@ impl Service { let outgoing_kind = OutgoingKind::Appservice(appservice_id); let event = SendingEventType::Pdu(pdu_id); let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; - self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap(); + self.sender + .send((outgoing_kind, event, keys.into_iter().next().unwrap())) + .unwrap(); Ok(()) } @@ -409,7 +445,8 @@ impl Service { /// #[tracing::instrument(skip(self))] pub fn cleanup_events(&self, appservice_id: String) -> Result<()> { - self.db.delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?; + self.db + .delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?; Ok(()) } @@ -638,7 +675,6 @@ impl Service { } } - #[tracing::instrument(skip(self, destination, request))] pub async fn send_federation_request( &self,