various cleanup tweaks/fixes
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
96fcf7f94d
commit
26dcab272d
|
@ -709,6 +709,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_regex",
|
"serde_regex",
|
||||||
|
"serde_yaml",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tikv-jemalloc-ctl",
|
"tikv-jemalloc-ctl",
|
||||||
"tikv-jemalloc-sys",
|
"tikv-jemalloc-sys",
|
||||||
|
|
|
@ -43,8 +43,13 @@ pub(super) async fn process(subcommand: RoomAliasCommand, context: &Command<'_>)
|
||||||
room_id,
|
room_id,
|
||||||
} => {
|
} => {
|
||||||
let timer = tokio::time::Instant::now();
|
let timer = tokio::time::Instant::now();
|
||||||
let results = services.rooms.alias.local_aliases_for_room(&room_id);
|
let aliases: Vec<_> = services
|
||||||
let aliases: Vec<_> = results.collect().await;
|
.rooms
|
||||||
|
.alias
|
||||||
|
.local_aliases_for_room(&room_id)
|
||||||
|
.map(ToOwned::to_owned)
|
||||||
|
.collect()
|
||||||
|
.await;
|
||||||
let query_time = timer.elapsed();
|
let query_time = timer.elapsed();
|
||||||
|
|
||||||
Ok(RoomMessageEventContent::notice_markdown(format!(
|
Ok(RoomMessageEventContent::notice_markdown(format!(
|
||||||
|
@ -57,6 +62,7 @@ pub(super) async fn process(subcommand: RoomAliasCommand, context: &Command<'_>)
|
||||||
.rooms
|
.rooms
|
||||||
.alias
|
.alias
|
||||||
.all_local_aliases()
|
.all_local_aliases()
|
||||||
|
.map(|(room_id, alias)| (room_id.to_owned(), alias.to_owned()))
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.await;
|
.await;
|
||||||
let query_time = timer.elapsed();
|
let query_time = timer.elapsed();
|
||||||
|
|
|
@ -119,12 +119,12 @@ pub(super) async fn process(command: RoomAliasCommand, context: &Command<'_>) ->
|
||||||
room_id,
|
room_id,
|
||||||
} => {
|
} => {
|
||||||
if let Some(room_id) = room_id {
|
if let Some(room_id) = room_id {
|
||||||
let aliases = services
|
let aliases: Vec<OwnedRoomAliasId> = services
|
||||||
.rooms
|
.rooms
|
||||||
.alias
|
.alias
|
||||||
.local_aliases_for_room(&room_id)
|
.local_aliases_for_room(&room_id)
|
||||||
.map(Into::into)
|
.map(Into::into)
|
||||||
.collect::<Vec<OwnedRoomAliasId>>()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let plain_list = aliases.iter().fold(String::new(), |mut output, alias| {
|
let plain_list = aliases.iter().fold(String::new(), |mut output, alias| {
|
||||||
|
|
|
@ -47,22 +47,22 @@ pub(super) async fn process(command: RoomDirectoryCommand, context: &Command<'_>
|
||||||
} => {
|
} => {
|
||||||
// TODO: i know there's a way to do this with clap, but i can't seem to find it
|
// TODO: i know there's a way to do this with clap, but i can't seem to find it
|
||||||
let page = page.unwrap_or(1);
|
let page = page.unwrap_or(1);
|
||||||
let mut rooms = services
|
let mut rooms: Vec<_> = services
|
||||||
.rooms
|
.rooms
|
||||||
.directory
|
.directory
|
||||||
.public_rooms()
|
.public_rooms()
|
||||||
.then(|room_id| get_room_info(services, room_id))
|
.then(|room_id| get_room_info(services, room_id))
|
||||||
.collect::<Vec<_>>()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
rooms.sort_by_key(|r| r.1);
|
rooms.sort_by_key(|r| r.1);
|
||||||
rooms.reverse();
|
rooms.reverse();
|
||||||
|
|
||||||
let rooms = rooms
|
let rooms: Vec<_> = rooms
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.skip(page.saturating_sub(1).saturating_mul(PAGE_SIZE))
|
.skip(page.saturating_sub(1).saturating_mul(PAGE_SIZE))
|
||||||
.take(PAGE_SIZE)
|
.take(PAGE_SIZE)
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
if rooms.is_empty() {
|
if rooms.is_empty() {
|
||||||
return Ok(RoomMessageEventContent::text_plain("No more rooms."));
|
return Ok(RoomMessageEventContent::text_plain("No more rooms."));
|
||||||
|
|
|
@ -42,14 +42,12 @@ async fn list_joined_members(&self, room_id: Box<RoomId>, local_only: bool) -> R
|
||||||
.state_cache
|
.state_cache
|
||||||
.room_members(&room_id)
|
.room_members(&room_id)
|
||||||
.ready_filter(|user_id| {
|
.ready_filter(|user_id| {
|
||||||
if local_only {
|
local_only
|
||||||
self.services.globals.user_is_local(user_id)
|
.then(|| self.services.globals.user_is_local(user_id))
|
||||||
} else {
|
.unwrap_or(true)
|
||||||
true
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
|
.map(ToOwned::to_owned)
|
||||||
.filter_map(|user_id| async move {
|
.filter_map(|user_id| async move {
|
||||||
let user_id = user_id.to_owned();
|
|
||||||
Some((
|
Some((
|
||||||
self.services
|
self.services
|
||||||
.users
|
.users
|
||||||
|
|
|
@ -555,13 +555,13 @@ async fn unban_room(&self, enable_federation: bool, room: Box<RoomOrAliasId>) ->
|
||||||
|
|
||||||
#[admin_command]
|
#[admin_command]
|
||||||
async fn list_banned_rooms(&self, no_details: bool) -> Result<RoomMessageEventContent> {
|
async fn list_banned_rooms(&self, no_details: bool) -> Result<RoomMessageEventContent> {
|
||||||
let room_ids = self
|
let room_ids: Vec<OwnedRoomId> = self
|
||||||
.services
|
.services
|
||||||
.rooms
|
.rooms
|
||||||
.metadata
|
.metadata
|
||||||
.list_banned_rooms()
|
.list_banned_rooms()
|
||||||
.map(Into::into)
|
.map(Into::into)
|
||||||
.collect::<Vec<OwnedRoomId>>()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if room_ids.is_empty() {
|
if room_ids.is_empty() {
|
||||||
|
|
|
@ -244,7 +244,7 @@ pub(crate) async fn get_key_changes_route(
|
||||||
device_list_updates.extend(
|
device_list_updates.extend(
|
||||||
services
|
services
|
||||||
.users
|
.users
|
||||||
.keys_changed(room_id.as_ref(), from, Some(to))
|
.keys_changed(room_id.as_str(), from, Some(to))
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.await,
|
.await,
|
||||||
|
|
|
@ -167,12 +167,12 @@ pub(crate) async fn join_room_by_id_route(
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// There is no body.server_name for /roomId/join
|
// There is no body.server_name for /roomId/join
|
||||||
let mut servers = services
|
let mut servers: Vec<_> = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.servers_invite_via(&body.room_id)
|
.servers_invite_via(&body.room_id)
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.collect::<Vec<_>>()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
servers.extend(
|
servers.extend(
|
||||||
|
@ -641,12 +641,13 @@ pub(crate) async fn joined_members_route(
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.room_members(&body.room_id)
|
.room_members(&body.room_id)
|
||||||
|
.map(ToOwned::to_owned)
|
||||||
.then(|user| async move {
|
.then(|user| async move {
|
||||||
(
|
(
|
||||||
user.to_owned(),
|
user.clone(),
|
||||||
RoomMember {
|
RoomMember {
|
||||||
display_name: services.users.displayname(user).await.ok(),
|
display_name: services.users.displayname(&user).await.ok(),
|
||||||
avatar_url: services.users.avatar_url(user).await.ok(),
|
avatar_url: services.users.avatar_url(&user).await.ok(),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
@ -1575,7 +1576,7 @@ pub(crate) async fn invite_helper(
|
||||||
// Make a user leave all their joined rooms, forgets all rooms, and ignores
|
// Make a user leave all their joined rooms, forgets all rooms, and ignores
|
||||||
// errors
|
// errors
|
||||||
pub async fn leave_all_rooms(services: &Services, user_id: &UserId) {
|
pub async fn leave_all_rooms(services: &Services, user_id: &UserId) {
|
||||||
let all_rooms = services
|
let all_rooms: Vec<_> = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.rooms_joined(user_id)
|
.rooms_joined(user_id)
|
||||||
|
@ -1587,7 +1588,7 @@ pub async fn leave_all_rooms(services: &Services, user_id: &UserId) {
|
||||||
.rooms_invited(user_id)
|
.rooms_invited(user_id)
|
||||||
.map(|(r, _)| r),
|
.map(|(r, _)| r),
|
||||||
)
|
)
|
||||||
.collect::<Vec<_>>()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
for room_id in all_rooms {
|
for room_id in all_rooms {
|
||||||
|
|
|
@ -77,14 +77,14 @@ pub(crate) async fn search_events_route(
|
||||||
.user_can_see_state_events(sender_user, room_id)
|
.user_can_see_state_events(sender_user, room_id)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
let room_state = services
|
let room_state: Vec<_> = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.room_state_full(room_id)
|
.room_state_full(room_id)
|
||||||
.await?
|
.await?
|
||||||
.values()
|
.values()
|
||||||
.map(|pdu| pdu.to_state_event())
|
.map(|pdu| pdu.to_state_event())
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
debug!("Room state: {:?}", room_state);
|
debug!("Room state: {:?}", room_state);
|
||||||
|
|
||||||
|
|
|
@ -7,13 +7,14 @@ use std::{
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduit::{
|
use conduit::{
|
||||||
debug, err, error, is_equal_to,
|
debug, err, error, is_equal_to,
|
||||||
|
result::IntoIsOk,
|
||||||
utils::{
|
utils::{
|
||||||
math::{ruma_from_u64, ruma_from_usize, usize_from_ruma, usize_from_u64_truncated},
|
math::{ruma_from_u64, ruma_from_usize, usize_from_ruma, usize_from_u64_truncated},
|
||||||
IterStream, ReadyExt,
|
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
||||||
},
|
},
|
||||||
warn, PduCount,
|
warn, PduCount,
|
||||||
};
|
};
|
||||||
use futures::{pin_mut, StreamExt};
|
use futures::{pin_mut, FutureExt, StreamExt, TryFutureExt};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::client::{
|
api::client::{
|
||||||
error::ErrorKind,
|
error::ErrorKind,
|
||||||
|
@ -172,12 +173,12 @@ pub(crate) async fn sync_events_route(
|
||||||
process_presence_updates(&services, &mut presence_updates, since, &sender_user).await?;
|
process_presence_updates(&services, &mut presence_updates, since, &sender_user).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let all_joined_rooms = services
|
let all_joined_rooms: Vec<_> = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.rooms_joined(&sender_user)
|
.rooms_joined(&sender_user)
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.collect::<Vec<_>>()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Coalesce database writes for the remainder of this scope.
|
// Coalesce database writes for the remainder of this scope.
|
||||||
|
@ -869,15 +870,13 @@ async fn load_joined_room(
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.room_members(room_id)
|
.room_members(room_id)
|
||||||
.ready_filter(|user_id| {
|
// Don't send key updates from the sender to the sender
|
||||||
// Don't send key updates from the sender to the sender
|
.ready_filter(|user_id| sender_user != *user_id)
|
||||||
sender_user != *user_id
|
// Only send keys if the sender doesn't share an encrypted room with the target
|
||||||
})
|
// already
|
||||||
.filter_map(|user_id| async move {
|
.filter_map(|user_id| {
|
||||||
// Only send keys if the sender doesn't share an encrypted room with the target
|
share_encrypted_room(services, sender_user, user_id, Some(room_id))
|
||||||
// already
|
.map(|res| res.or_some(user_id.to_owned()))
|
||||||
(!share_encrypted_room(services, sender_user, user_id, Some(room_id)).await)
|
|
||||||
.then_some(user_id.to_owned())
|
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.await,
|
.await,
|
||||||
|
@ -1117,13 +1116,12 @@ async fn share_encrypted_room(
|
||||||
.user
|
.user
|
||||||
.get_shared_rooms(sender_user, user_id)
|
.get_shared_rooms(sender_user, user_id)
|
||||||
.ready_filter(|&room_id| Some(room_id) != ignore_room)
|
.ready_filter(|&room_id| Some(room_id) != ignore_room)
|
||||||
.any(|other_room_id| async move {
|
.any(|other_room_id| {
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.room_state_get(other_room_id, &StateEventType::RoomEncryption, "")
|
.room_state_get(other_room_id, &StateEventType::RoomEncryption, "")
|
||||||
.await
|
.map(Result::into_is_ok)
|
||||||
.is_ok()
|
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -1178,20 +1176,20 @@ pub(crate) async fn sync_events_v4_route(
|
||||||
.sync
|
.sync
|
||||||
.update_sync_request_with_cache(sender_user.clone(), sender_device.clone(), &mut body);
|
.update_sync_request_with_cache(sender_user.clone(), sender_device.clone(), &mut body);
|
||||||
|
|
||||||
let all_joined_rooms = services
|
let all_joined_rooms: Vec<_> = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.rooms_joined(&sender_user)
|
.rooms_joined(&sender_user)
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.collect::<Vec<_>>()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let all_invited_rooms = services
|
let all_invited_rooms: Vec<_> = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.rooms_invited(&sender_user)
|
.rooms_invited(&sender_user)
|
||||||
.map(|r| r.0)
|
.map(|r| r.0)
|
||||||
.collect::<Vec<_>>()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let all_rooms = all_joined_rooms
|
let all_rooms = all_joined_rooms
|
||||||
|
@ -1364,15 +1362,13 @@ pub(crate) async fn sync_events_v4_route(
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.room_members(room_id)
|
.room_members(room_id)
|
||||||
.ready_filter(|user_id| {
|
// Don't send key updates from the sender to the sender
|
||||||
// Don't send key updates from the sender to the sender
|
.ready_filter(|user_id| sender_user != user_id)
|
||||||
sender_user != user_id
|
// Only send keys if the sender doesn't share an encrypted room with the target
|
||||||
})
|
// already
|
||||||
.filter_map(|user_id| async move {
|
.filter_map(|user_id| {
|
||||||
// Only send keys if the sender doesn't share an encrypted room with the target
|
share_encrypted_room(&services, sender_user, user_id, Some(room_id))
|
||||||
// already
|
.map(|res| res.or_some(user_id.to_owned()))
|
||||||
(!share_encrypted_room(&services, sender_user, user_id, Some(room_id)).await)
|
|
||||||
.then_some(user_id.to_owned())
|
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.await,
|
.await,
|
||||||
|
@ -1650,26 +1646,25 @@ pub(crate) async fn sync_events_v4_route(
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Heroes
|
// Heroes
|
||||||
let heroes = services
|
let heroes: Vec<_> = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.room_members(room_id)
|
.room_members(room_id)
|
||||||
.ready_filter(|member| member != &sender_user)
|
.ready_filter(|member| member != &sender_user)
|
||||||
.filter_map(|member| async move {
|
.filter_map(|user_id| {
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.get_member(room_id, member)
|
.get_member(room_id, user_id)
|
||||||
.await
|
.map_ok(|memberevent| SlidingSyncRoomHero {
|
||||||
.map(|memberevent| SlidingSyncRoomHero {
|
user_id: user_id.into(),
|
||||||
user_id: member.to_owned(),
|
|
||||||
name: memberevent.displayname,
|
name: memberevent.displayname,
|
||||||
avatar: memberevent.avatar_url,
|
avatar: memberevent.avatar_url,
|
||||||
})
|
})
|
||||||
.ok()
|
.ok()
|
||||||
})
|
})
|
||||||
.take(5)
|
.take(5)
|
||||||
.collect::<Vec<_>>()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let name = match heroes.len().cmp(&(1_usize)) {
|
let name = match heroes.len().cmp(&(1_usize)) {
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
use conduit::utils::TryFutureExtExt;
|
||||||
use futures::{pin_mut, StreamExt};
|
use futures::{pin_mut, StreamExt};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::client::user_directory::search_users,
|
api::client::user_directory::search_users,
|
||||||
|
@ -56,16 +57,12 @@ pub(crate) async fn search_users_route(
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.rooms_joined(&user.user_id)
|
.rooms_joined(&user.user_id)
|
||||||
.any(|room| async move {
|
.any(|room| {
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.room_state_get(room, &StateEventType::RoomJoinRules, "")
|
.room_state_get_content::<RoomJoinRulesEventContent>(room, &StateEventType::RoomJoinRules, "")
|
||||||
.await
|
.map_ok_or(false, |content| content.join_rule == JoinRule::Public)
|
||||||
.map_or(false, |event| {
|
|
||||||
serde_json::from_str(event.content.get())
|
|
||||||
.map_or(false, |r: RoomJoinRulesEventContent| r.join_rule == JoinRule::Public)
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|
|
@ -83,6 +83,7 @@ ruma.workspace = true
|
||||||
sanitize-filename.workspace = true
|
sanitize-filename.workspace = true
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
serde_regex.workspace = true
|
serde_regex.workspace = true
|
||||||
|
serde_yaml.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
thiserror.workspace = true
|
thiserror.workspace = true
|
||||||
tikv-jemallocator.optional = true
|
tikv-jemallocator.optional = true
|
||||||
|
|
|
@ -75,6 +75,8 @@ pub enum Error {
|
||||||
TracingFilter(#[from] tracing_subscriber::filter::ParseError),
|
TracingFilter(#[from] tracing_subscriber::filter::ParseError),
|
||||||
#[error("Tracing reload error: {0}")]
|
#[error("Tracing reload error: {0}")]
|
||||||
TracingReload(#[from] tracing_subscriber::reload::Error),
|
TracingReload(#[from] tracing_subscriber::reload::Error),
|
||||||
|
#[error(transparent)]
|
||||||
|
Yaml(#[from] serde_yaml::Error),
|
||||||
|
|
||||||
// ruma/conduwuit
|
// ruma/conduwuit
|
||||||
#[error("Arithmetic operation failed: {0}")]
|
#[error("Arithmetic operation failed: {0}")]
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use conduit::{err, utils::stream::TryIgnore, Result};
|
use conduit::{err, utils::stream::TryIgnore, Result};
|
||||||
use database::{Database, Deserialized, Map};
|
use database::{Database, Map};
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
use ruma::api::appservice::Registration;
|
use ruma::api::appservice::Registration;
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ impl Data {
|
||||||
self.id_appserviceregistrations
|
self.id_appserviceregistrations
|
||||||
.get(id)
|
.get(id)
|
||||||
.await
|
.await
|
||||||
.deserialized()
|
.and_then(|ref bytes| serde_yaml::from_slice(bytes).map_err(Into::into))
|
||||||
.map_err(|e| err!(Database("Invalid appservice {id:?} registration: {e:?}")))
|
.map_err(|e| err!(Database("Invalid appservice {id:?} registration: {e:?}")))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@ use itertools::Itertools;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
events::{push_rules::PushRulesEvent, room::member::MembershipState, GlobalAccountDataEventType},
|
events::{push_rules::PushRulesEvent, room::member::MembershipState, GlobalAccountDataEventType},
|
||||||
push::Ruleset,
|
push::Ruleset,
|
||||||
UserId,
|
OwnedUserId, UserId,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{media, Services};
|
use crate::{media, Services};
|
||||||
|
@ -385,11 +385,12 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services)
|
||||||
for room_id in &room_ids {
|
for room_id in &room_ids {
|
||||||
debug_info!("Fixing room {room_id}");
|
debug_info!("Fixing room {room_id}");
|
||||||
|
|
||||||
let users_in_room = services
|
let users_in_room: Vec<OwnedUserId> = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.room_members(room_id)
|
.room_members(room_id)
|
||||||
.collect::<Vec<_>>()
|
.map(ToOwned::to_owned)
|
||||||
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let joined_members = users_in_room
|
let joined_members = users_in_room
|
||||||
|
@ -418,12 +419,12 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services)
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
for user_id in joined_members {
|
for user_id in &joined_members {
|
||||||
debug_info!("User is joined, marking as joined");
|
debug_info!("User is joined, marking as joined");
|
||||||
services.rooms.state_cache.mark_as_joined(user_id, room_id);
|
services.rooms.state_cache.mark_as_joined(user_id, room_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
for user_id in non_joined_members {
|
for user_id in &non_joined_members {
|
||||||
debug_info!("User is left or banned, marking as left");
|
debug_info!("User is left or banned, marking as left");
|
||||||
services.rooms.state_cache.mark_as_left(user_id, room_id);
|
services.rooms.state_cache.mark_as_left(user_id, room_id);
|
||||||
}
|
}
|
||||||
|
|
|
@ -648,35 +648,19 @@ impl Service {
|
||||||
self.db.userroomid_leftstate.remove(&userroom_id);
|
self.db.userroomid_leftstate.remove(&userroom_id);
|
||||||
self.db.roomuserid_leftcount.remove(&roomuser_id);
|
self.db.roomuserid_leftcount.remove(&roomuser_id);
|
||||||
|
|
||||||
if let Some(servers) = invite_via {
|
if let Some(servers) = invite_via.as_deref() {
|
||||||
let mut prev_servers = self
|
self.add_servers_invite_via(room_id, servers).await;
|
||||||
.servers_invite_via(room_id)
|
|
||||||
.map(ToOwned::to_owned)
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.await;
|
|
||||||
#[allow(clippy::redundant_clone)] // this is a necessary clone?
|
|
||||||
prev_servers.append(servers.clone().as_mut());
|
|
||||||
let servers = prev_servers.iter().rev().unique().rev().collect_vec();
|
|
||||||
|
|
||||||
let servers = servers
|
|
||||||
.iter()
|
|
||||||
.map(|server| server.as_bytes())
|
|
||||||
.collect_vec()
|
|
||||||
.join(&[0xFF][..]);
|
|
||||||
|
|
||||||
self.db
|
|
||||||
.roomid_inviteviaservers
|
|
||||||
.insert(room_id.as_bytes(), &servers);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self, servers), level = "debug")]
|
||||||
pub async fn add_servers_invite_via(&self, room_id: &RoomId, servers: &[OwnedServerName]) {
|
pub async fn add_servers_invite_via(&self, room_id: &RoomId, servers: &[OwnedServerName]) {
|
||||||
let mut prev_servers = self
|
let mut prev_servers: Vec<_> = self
|
||||||
.servers_invite_via(room_id)
|
.servers_invite_via(room_id)
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.collect::<Vec<_>>()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
prev_servers.extend(servers.to_owned());
|
prev_servers.extend(servers.to_owned());
|
||||||
prev_servers.sort_unstable();
|
prev_servers.sort_unstable();
|
||||||
prev_servers.dedup();
|
prev_servers.dedup();
|
||||||
|
|
|
@ -408,7 +408,6 @@ impl Service {
|
||||||
.get(None, user, GlobalAccountDataEventType::PushRules.to_string().into())
|
.get(None, user, GlobalAccountDataEventType::PushRules.to_string().into())
|
||||||
.await
|
.await
|
||||||
.and_then(|event| serde_json::from_str::<PushRulesEvent>(event.get()).map_err(Into::into))
|
.and_then(|event| serde_json::from_str::<PushRulesEvent>(event.get()).map_err(Into::into))
|
||||||
.map_err(|e| err!(Database(warn!(?user, ?e, "Invalid push rules event in db for user"))))
|
|
||||||
.map_or_else(|_| Ruleset::server_default(user), |ev: PushRulesEvent| ev.content.global);
|
.map_or_else(|_| Ruleset::server_default(user), |ev: PushRulesEvent| ev.content.global);
|
||||||
|
|
||||||
let mut highlight = false;
|
let mut highlight = false;
|
||||||
|
|
|
@ -623,7 +623,9 @@ impl Service {
|
||||||
|
|
||||||
pub async fn mark_device_key_update(&self, user_id: &UserId) {
|
pub async fn mark_device_key_update(&self, user_id: &UserId) {
|
||||||
let count = self.services.globals.next_count().unwrap().to_be_bytes();
|
let count = self.services.globals.next_count().unwrap().to_be_bytes();
|
||||||
|
|
||||||
let rooms_joined = self.services.state_cache.rooms_joined(user_id);
|
let rooms_joined = self.services.state_cache.rooms_joined(user_id);
|
||||||
|
|
||||||
pin_mut!(rooms_joined);
|
pin_mut!(rooms_joined);
|
||||||
while let Some(room_id) = rooms_joined.next().await {
|
while let Some(room_id) = rooms_joined.next().await {
|
||||||
// Don't send key updates to unencrypted rooms
|
// Don't send key updates to unencrypted rooms
|
||||||
|
|
Loading…
Reference in New Issue