From 26dcab272d04eff968997a94f90636df389ffda6 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 1 Oct 2024 02:47:39 +0000 Subject: [PATCH] various cleanup tweaks/fixes Signed-off-by: Jason Volk --- Cargo.lock | 1 + src/admin/query/room_alias.rs | 10 ++++- src/admin/room/alias.rs | 4 +- src/admin/room/directory.rs | 8 ++-- src/admin/room/info.rs | 10 ++--- src/admin/room/moderation.rs | 4 +- src/api/client/keys.rs | 2 +- src/api/client/membership.rs | 15 ++++--- src/api/client/search.rs | 4 +- src/api/client/sync.rs | 67 +++++++++++++--------------- src/api/client/user_directory.rs | 11 ++--- src/core/Cargo.toml | 1 + src/core/error/mod.rs | 2 + src/service/appservice/data.rs | 4 +- src/service/globals/migrations.rs | 11 ++--- src/service/rooms/state_cache/mod.rs | 28 +++--------- src/service/rooms/timeline/mod.rs | 1 - src/service/users/mod.rs | 2 + 18 files changed, 86 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 043d9704..065aa1e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -709,6 +709,7 @@ dependencies = [ "serde", "serde_json", "serde_regex", + "serde_yaml", "thiserror", "tikv-jemalloc-ctl", "tikv-jemalloc-sys", diff --git a/src/admin/query/room_alias.rs b/src/admin/query/room_alias.rs index 05fac42c..382e4a78 100644 --- a/src/admin/query/room_alias.rs +++ b/src/admin/query/room_alias.rs @@ -43,8 +43,13 @@ pub(super) async fn process(subcommand: RoomAliasCommand, context: &Command<'_>) room_id, } => { let timer = tokio::time::Instant::now(); - let results = services.rooms.alias.local_aliases_for_room(&room_id); - let aliases: Vec<_> = results.collect().await; + let aliases: Vec<_> = services + .rooms + .alias + .local_aliases_for_room(&room_id) + .map(ToOwned::to_owned) + .collect() + .await; let query_time = timer.elapsed(); Ok(RoomMessageEventContent::notice_markdown(format!( @@ -57,6 +62,7 @@ pub(super) async fn process(subcommand: RoomAliasCommand, context: &Command<'_>) .rooms .alias .all_local_aliases() + .map(|(room_id, alias)| (room_id.to_owned(), alias.to_owned())) .collect::>() .await; let query_time = timer.elapsed(); diff --git a/src/admin/room/alias.rs b/src/admin/room/alias.rs index 34b6c42e..1ccde47d 100644 --- a/src/admin/room/alias.rs +++ b/src/admin/room/alias.rs @@ -119,12 +119,12 @@ pub(super) async fn process(command: RoomAliasCommand, context: &Command<'_>) -> room_id, } => { if let Some(room_id) = room_id { - let aliases = services + let aliases: Vec = services .rooms .alias .local_aliases_for_room(&room_id) .map(Into::into) - .collect::>() + .collect() .await; let plain_list = aliases.iter().fold(String::new(), |mut output, alias| { diff --git a/src/admin/room/directory.rs b/src/admin/room/directory.rs index 7ccdea6f..1080356a 100644 --- a/src/admin/room/directory.rs +++ b/src/admin/room/directory.rs @@ -47,22 +47,22 @@ pub(super) async fn process(command: RoomDirectoryCommand, context: &Command<'_> } => { // TODO: i know there's a way to do this with clap, but i can't seem to find it let page = page.unwrap_or(1); - let mut rooms = services + let mut rooms: Vec<_> = services .rooms .directory .public_rooms() .then(|room_id| get_room_info(services, room_id)) - .collect::>() + .collect() .await; rooms.sort_by_key(|r| r.1); rooms.reverse(); - let rooms = rooms + let rooms: Vec<_> = rooms .into_iter() .skip(page.saturating_sub(1).saturating_mul(PAGE_SIZE)) .take(PAGE_SIZE) - .collect::>(); + .collect(); if rooms.is_empty() { return Ok(RoomMessageEventContent::text_plain("No more rooms.")); diff --git a/src/admin/room/info.rs b/src/admin/room/info.rs index fc0619e3..13a74a9d 100644 --- a/src/admin/room/info.rs +++ b/src/admin/room/info.rs @@ -42,14 +42,12 @@ async fn list_joined_members(&self, room_id: Box, local_only: bool) -> R .state_cache .room_members(&room_id) .ready_filter(|user_id| { - if local_only { - self.services.globals.user_is_local(user_id) - } else { - true - } + local_only + .then(|| self.services.globals.user_is_local(user_id)) + .unwrap_or(true) }) + .map(ToOwned::to_owned) .filter_map(|user_id| async move { - let user_id = user_id.to_owned(); Some(( self.services .users diff --git a/src/admin/room/moderation.rs b/src/admin/room/moderation.rs index 9a772da4..cfc048bd 100644 --- a/src/admin/room/moderation.rs +++ b/src/admin/room/moderation.rs @@ -555,13 +555,13 @@ async fn unban_room(&self, enable_federation: bool, room: Box) -> #[admin_command] async fn list_banned_rooms(&self, no_details: bool) -> Result { - let room_ids = self + let room_ids: Vec = self .services .rooms .metadata .list_banned_rooms() .map(Into::into) - .collect::>() + .collect() .await; if room_ids.is_empty() { diff --git a/src/api/client/keys.rs b/src/api/client/keys.rs index abf2a22f..254d92cc 100644 --- a/src/api/client/keys.rs +++ b/src/api/client/keys.rs @@ -244,7 +244,7 @@ pub(crate) async fn get_key_changes_route( device_list_updates.extend( services .users - .keys_changed(room_id.as_ref(), from, Some(to)) + .keys_changed(room_id.as_str(), from, Some(to)) .map(ToOwned::to_owned) .collect::>() .await, diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index 5a5d436f..6e3bc894 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -167,12 +167,12 @@ pub(crate) async fn join_room_by_id_route( .await?; // There is no body.server_name for /roomId/join - let mut servers = services + let mut servers: Vec<_> = services .rooms .state_cache .servers_invite_via(&body.room_id) .map(ToOwned::to_owned) - .collect::>() + .collect() .await; servers.extend( @@ -641,12 +641,13 @@ pub(crate) async fn joined_members_route( .rooms .state_cache .room_members(&body.room_id) + .map(ToOwned::to_owned) .then(|user| async move { ( - user.to_owned(), + user.clone(), RoomMember { - display_name: services.users.displayname(user).await.ok(), - avatar_url: services.users.avatar_url(user).await.ok(), + display_name: services.users.displayname(&user).await.ok(), + avatar_url: services.users.avatar_url(&user).await.ok(), }, ) }) @@ -1575,7 +1576,7 @@ pub(crate) async fn invite_helper( // Make a user leave all their joined rooms, forgets all rooms, and ignores // errors pub async fn leave_all_rooms(services: &Services, user_id: &UserId) { - let all_rooms = services + let all_rooms: Vec<_> = services .rooms .state_cache .rooms_joined(user_id) @@ -1587,7 +1588,7 @@ pub async fn leave_all_rooms(services: &Services, user_id: &UserId) { .rooms_invited(user_id) .map(|(r, _)| r), ) - .collect::>() + .collect() .await; for room_id in all_rooms { diff --git a/src/api/client/search.rs b/src/api/client/search.rs index 7a061d49..b073640e 100644 --- a/src/api/client/search.rs +++ b/src/api/client/search.rs @@ -77,14 +77,14 @@ pub(crate) async fn search_events_route( .user_can_see_state_events(sender_user, room_id) .await { - let room_state = services + let room_state: Vec<_> = services .rooms .state_accessor .room_state_full(room_id) .await? .values() .map(|pdu| pdu.to_state_event()) - .collect::>(); + .collect(); debug!("Room state: {:?}", room_state); diff --git a/src/api/client/sync.rs b/src/api/client/sync.rs index 53d4f3c3..adb4d8da 100644 --- a/src/api/client/sync.rs +++ b/src/api/client/sync.rs @@ -7,13 +7,14 @@ use std::{ use axum::extract::State; use conduit::{ debug, err, error, is_equal_to, + result::IntoIsOk, utils::{ math::{ruma_from_u64, ruma_from_usize, usize_from_ruma, usize_from_u64_truncated}, - IterStream, ReadyExt, + BoolExt, IterStream, ReadyExt, TryFutureExtExt, }, warn, PduCount, }; -use futures::{pin_mut, StreamExt}; +use futures::{pin_mut, FutureExt, StreamExt, TryFutureExt}; use ruma::{ api::client::{ error::ErrorKind, @@ -172,12 +173,12 @@ pub(crate) async fn sync_events_route( process_presence_updates(&services, &mut presence_updates, since, &sender_user).await?; } - let all_joined_rooms = services + let all_joined_rooms: Vec<_> = services .rooms .state_cache .rooms_joined(&sender_user) .map(ToOwned::to_owned) - .collect::>() + .collect() .await; // Coalesce database writes for the remainder of this scope. @@ -869,15 +870,13 @@ async fn load_joined_room( .rooms .state_cache .room_members(room_id) - .ready_filter(|user_id| { - // Don't send key updates from the sender to the sender - sender_user != *user_id - }) - .filter_map(|user_id| async move { - // Only send keys if the sender doesn't share an encrypted room with the target - // already - (!share_encrypted_room(services, sender_user, user_id, Some(room_id)).await) - .then_some(user_id.to_owned()) + // Don't send key updates from the sender to the sender + .ready_filter(|user_id| sender_user != *user_id) + // Only send keys if the sender doesn't share an encrypted room with the target + // already + .filter_map(|user_id| { + share_encrypted_room(services, sender_user, user_id, Some(room_id)) + .map(|res| res.or_some(user_id.to_owned())) }) .collect::>() .await, @@ -1117,13 +1116,12 @@ async fn share_encrypted_room( .user .get_shared_rooms(sender_user, user_id) .ready_filter(|&room_id| Some(room_id) != ignore_room) - .any(|other_room_id| async move { + .any(|other_room_id| { services .rooms .state_accessor .room_state_get(other_room_id, &StateEventType::RoomEncryption, "") - .await - .is_ok() + .map(Result::into_is_ok) }) .await } @@ -1178,20 +1176,20 @@ pub(crate) async fn sync_events_v4_route( .sync .update_sync_request_with_cache(sender_user.clone(), sender_device.clone(), &mut body); - let all_joined_rooms = services + let all_joined_rooms: Vec<_> = services .rooms .state_cache .rooms_joined(&sender_user) .map(ToOwned::to_owned) - .collect::>() + .collect() .await; - let all_invited_rooms = services + let all_invited_rooms: Vec<_> = services .rooms .state_cache .rooms_invited(&sender_user) .map(|r| r.0) - .collect::>() + .collect() .await; let all_rooms = all_joined_rooms @@ -1364,15 +1362,13 @@ pub(crate) async fn sync_events_v4_route( .rooms .state_cache .room_members(room_id) - .ready_filter(|user_id| { - // Don't send key updates from the sender to the sender - sender_user != user_id - }) - .filter_map(|user_id| async move { - // Only send keys if the sender doesn't share an encrypted room with the target - // already - (!share_encrypted_room(&services, sender_user, user_id, Some(room_id)).await) - .then_some(user_id.to_owned()) + // Don't send key updates from the sender to the sender + .ready_filter(|user_id| sender_user != user_id) + // Only send keys if the sender doesn't share an encrypted room with the target + // already + .filter_map(|user_id| { + share_encrypted_room(&services, sender_user, user_id, Some(room_id)) + .map(|res| res.or_some(user_id.to_owned())) }) .collect::>() .await, @@ -1650,26 +1646,25 @@ pub(crate) async fn sync_events_v4_route( .await; // Heroes - let heroes = services + let heroes: Vec<_> = services .rooms .state_cache .room_members(room_id) .ready_filter(|member| member != &sender_user) - .filter_map(|member| async move { + .filter_map(|user_id| { services .rooms .state_accessor - .get_member(room_id, member) - .await - .map(|memberevent| SlidingSyncRoomHero { - user_id: member.to_owned(), + .get_member(room_id, user_id) + .map_ok(|memberevent| SlidingSyncRoomHero { + user_id: user_id.into(), name: memberevent.displayname, avatar: memberevent.avatar_url, }) .ok() }) .take(5) - .collect::>() + .collect() .await; let name = match heroes.len().cmp(&(1_usize)) { diff --git a/src/api/client/user_directory.rs b/src/api/client/user_directory.rs index 8ea7f1b8..868811a3 100644 --- a/src/api/client/user_directory.rs +++ b/src/api/client/user_directory.rs @@ -1,4 +1,5 @@ use axum::extract::State; +use conduit::utils::TryFutureExtExt; use futures::{pin_mut, StreamExt}; use ruma::{ api::client::user_directory::search_users, @@ -56,16 +57,12 @@ pub(crate) async fn search_users_route( .rooms .state_cache .rooms_joined(&user.user_id) - .any(|room| async move { + .any(|room| { services .rooms .state_accessor - .room_state_get(room, &StateEventType::RoomJoinRules, "") - .await - .map_or(false, |event| { - serde_json::from_str(event.content.get()) - .map_or(false, |r: RoomJoinRulesEventContent| r.join_rule == JoinRule::Public) - }) + .room_state_get_content::(room, &StateEventType::RoomJoinRules, "") + .map_ok_or(false, |content| content.join_rule == JoinRule::Public) }) .await; diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index cb957bc9..4fe413e9 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -83,6 +83,7 @@ ruma.workspace = true sanitize-filename.workspace = true serde_json.workspace = true serde_regex.workspace = true +serde_yaml.workspace = true serde.workspace = true thiserror.workspace = true tikv-jemallocator.optional = true diff --git a/src/core/error/mod.rs b/src/core/error/mod.rs index 79e3d5b4..ad7f9f3c 100644 --- a/src/core/error/mod.rs +++ b/src/core/error/mod.rs @@ -75,6 +75,8 @@ pub enum Error { TracingFilter(#[from] tracing_subscriber::filter::ParseError), #[error("Tracing reload error: {0}")] TracingReload(#[from] tracing_subscriber::reload::Error), + #[error(transparent)] + Yaml(#[from] serde_yaml::Error), // ruma/conduwuit #[error("Arithmetic operation failed: {0}")] diff --git a/src/service/appservice/data.rs b/src/service/appservice/data.rs index 4eb9d09e..8fb7d958 100644 --- a/src/service/appservice/data.rs +++ b/src/service/appservice/data.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use conduit::{err, utils::stream::TryIgnore, Result}; -use database::{Database, Deserialized, Map}; +use database::{Database, Map}; use futures::Stream; use ruma::api::appservice::Registration; @@ -40,7 +40,7 @@ impl Data { self.id_appserviceregistrations .get(id) .await - .deserialized() + .and_then(|ref bytes| serde_yaml::from_slice(bytes).map_err(Into::into)) .map_err(|e| err!(Database("Invalid appservice {id:?} registration: {e:?}"))) } diff --git a/src/service/globals/migrations.rs b/src/service/globals/migrations.rs index 469159fc..fc6e477b 100644 --- a/src/service/globals/migrations.rs +++ b/src/service/globals/migrations.rs @@ -9,7 +9,7 @@ use itertools::Itertools; use ruma::{ events::{push_rules::PushRulesEvent, room::member::MembershipState, GlobalAccountDataEventType}, push::Ruleset, - UserId, + OwnedUserId, UserId, }; use crate::{media, Services}; @@ -385,11 +385,12 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) for room_id in &room_ids { debug_info!("Fixing room {room_id}"); - let users_in_room = services + let users_in_room: Vec = services .rooms .state_cache .room_members(room_id) - .collect::>() + .map(ToOwned::to_owned) + .collect() .await; let joined_members = users_in_room @@ -418,12 +419,12 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) .collect::>() .await; - for user_id in joined_members { + for user_id in &joined_members { debug_info!("User is joined, marking as joined"); services.rooms.state_cache.mark_as_joined(user_id, room_id); } - for user_id in non_joined_members { + for user_id in &non_joined_members { debug_info!("User is left or banned, marking as left"); services.rooms.state_cache.mark_as_left(user_id, room_id); } diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index 25388084..dbe38561 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -648,35 +648,19 @@ impl Service { self.db.userroomid_leftstate.remove(&userroom_id); self.db.roomuserid_leftcount.remove(&roomuser_id); - if let Some(servers) = invite_via { - let mut prev_servers = self - .servers_invite_via(room_id) - .map(ToOwned::to_owned) - .collect::>() - .await; - #[allow(clippy::redundant_clone)] // this is a necessary clone? - prev_servers.append(servers.clone().as_mut()); - let servers = prev_servers.iter().rev().unique().rev().collect_vec(); - - let servers = servers - .iter() - .map(|server| server.as_bytes()) - .collect_vec() - .join(&[0xFF][..]); - - self.db - .roomid_inviteviaservers - .insert(room_id.as_bytes(), &servers); + if let Some(servers) = invite_via.as_deref() { + self.add_servers_invite_via(room_id, servers).await; } } - #[tracing::instrument(skip(self), level = "debug")] + #[tracing::instrument(skip(self, servers), level = "debug")] pub async fn add_servers_invite_via(&self, room_id: &RoomId, servers: &[OwnedServerName]) { - let mut prev_servers = self + let mut prev_servers: Vec<_> = self .servers_invite_via(room_id) .map(ToOwned::to_owned) - .collect::>() + .collect() .await; + prev_servers.extend(servers.to_owned()); prev_servers.sort_unstable(); prev_servers.dedup(); diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 5360d2c9..6a26a1d5 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -408,7 +408,6 @@ impl Service { .get(None, user, GlobalAccountDataEventType::PushRules.to_string().into()) .await .and_then(|event| serde_json::from_str::(event.get()).map_err(Into::into)) - .map_err(|e| err!(Database(warn!(?user, ?e, "Invalid push rules event in db for user")))) .map_or_else(|_| Ruleset::server_default(user), |ev: PushRulesEvent| ev.content.global); let mut highlight = false; diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index eb77ef35..438c220b 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -623,7 +623,9 @@ impl Service { pub async fn mark_device_key_update(&self, user_id: &UserId) { let count = self.services.globals.next_count().unwrap().to_be_bytes(); + let rooms_joined = self.services.state_cache.rooms_joined(user_id); + pin_mut!(rooms_joined); while let Some(room_id) = rooms_joined.next().await { // Don't send key updates to unencrypted rooms