slightly better sliding sync

This commit is contained in:
Timo Kösters 2023-07-23 21:57:11 +02:00
parent ad06d475de
commit caddc656fb
No known key found for this signature in database
GPG key ID: 0B25E636FBA7E4CB
4 changed files with 260 additions and 19 deletions

View file

@ -23,7 +23,7 @@ use ruma::{
uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId, uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
}; };
use std::{ use std::{
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
@ -1174,8 +1174,7 @@ pub async fn sync_events_v4_route(
) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> { ) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
let sender_user = body.sender_user.expect("user is authenticated"); let sender_user = body.sender_user.expect("user is authenticated");
let sender_device = body.sender_device.expect("user is authenticated"); let sender_device = body.sender_device.expect("user is authenticated");
let body = dbg!(body.body); let mut body = dbg!(body.body);
// Setup watchers, so if there's no response, we can wait for them // Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device); let watcher = services().globals.watch(&sender_user, &sender_device);
@ -1188,7 +1187,21 @@ pub async fn sync_events_v4_route(
.unwrap_or(0); .unwrap_or(0);
let sincecount = PduCount::Normal(since); let sincecount = PduCount::Normal(since);
let initial = since == 0; if since == 0 {
if let Some(conn_id) = &body.conn_id {
services().users.forget_sync_request_connection(
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
)
}
}
let known_rooms = services().users.update_sync_request_with_cache(
sender_user.clone(),
sender_device.clone(),
&mut body,
);
let all_joined_rooms = services() let all_joined_rooms = services()
.rooms .rooms
@ -1205,8 +1218,10 @@ pub async fn sync_events_v4_route(
continue; continue;
} }
let mut new_known_rooms = BTreeMap::new();
lists.insert( lists.insert(
list_id, list_id.clone(),
sync_events::v4::SyncList { sync_events::v4::SyncList {
ops: list ops: list
.ranges .ranges
@ -1219,14 +1234,27 @@ pub async fn sync_events_v4_route(
let room_ids = all_joined_rooms let room_ids = all_joined_rooms
[(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)] [(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)]
.to_vec(); .to_vec();
todo_rooms.extend(room_ids.iter().cloned().map(|r| { new_known_rooms.extend(room_ids.iter().cloned().map(|r| (r, true)));
for room_id in &room_ids {
let todo_room = todo_rooms.entry(room_id.clone()).or_insert((
BTreeSet::new(),
0,
true,
));
let limit = list let limit = list
.room_details .room_details
.timeline_limit .timeline_limit
.map_or(10, u64::from) .map_or(10, u64::from)
.min(100); .min(100);
(r, (list.room_details.required_state.clone(), limit)) todo_room
})); .0
.extend(list.room_details.required_state.iter().cloned());
todo_room.1 = todo_room.1.min(limit);
if known_rooms.get(&list_id).and_then(|k| k.get(room_id)) != Some(&true)
{
todo_room.2 = false;
}
}
sync_events::v4::SyncOp { sync_events::v4::SyncOp {
op: SlidingOp::Sync, op: SlidingOp::Sync,
range: Some(r.clone()), range: Some(r.clone()),
@ -1239,12 +1267,36 @@ pub async fn sync_events_v4_route(
count: UInt::from(all_joined_rooms.len() as u32), count: UInt::from(all_joined_rooms.len() as u32),
}, },
); );
if let Some(conn_id) = &body.conn_id {
services().users.update_sync_known_rooms(
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
list_id,
new_known_rooms,
);
}
}
for (room_id, room) in body.room_subscriptions {
let todo_room = todo_rooms
.entry(room_id.clone())
.or_insert((BTreeSet::new(), 0, true));
let limit = room.timeline_limit.map_or(10, u64::from).min(100);
todo_room.0.extend(room.required_state.iter().cloned());
todo_room.1 = todo_room.1.min(limit);
todo_room.2 = false;
} }
let mut rooms = BTreeMap::new(); let mut rooms = BTreeMap::new();
for (room_id, (required_state_request, timeline_limit)) in todo_rooms { for (room_id, (required_state_request, timeline_limit, known)) in &todo_rooms {
let (timeline_pdus, limited) = let (timeline_pdus, limited) =
load_timeline(&sender_user, &room_id, sincecount, timeline_limit)?; load_timeline(&sender_user, &room_id, sincecount, *timeline_limit)?;
if *known && timeline_pdus.is_empty() {
continue;
}
let prev_batch = timeline_pdus let prev_batch = timeline_pdus
.first() .first()
@ -1256,7 +1308,14 @@ pub async fn sync_events_v4_route(
} }
PduCount::Normal(c) => c.to_string(), PduCount::Normal(c) => c.to_string(),
})) }))
})?; })?
.or_else(|| {
if since != 0 {
Some(since.to_string())
} else {
None
}
});
let room_events: Vec<_> = timeline_pdus let room_events: Vec<_> = timeline_pdus
.iter() .iter()
@ -1279,8 +1338,41 @@ pub async fn sync_events_v4_route(
rooms.insert( rooms.insert(
room_id.clone(), room_id.clone(),
sync_events::v4::SlidingSyncRoom { sync_events::v4::SlidingSyncRoom {
name: services().rooms.state_accessor.get_name(&room_id)?, name: services()
initial: Some(initial), .rooms
.state_accessor
.get_name(&room_id)?
.or_else(|| {
// Heroes
let mut names = services()
.rooms
.state_cache
.room_members(&room_id)
.filter_map(|r| r.ok())
.filter(|member| member != &sender_user)
.map(|member| {
Ok::<_, Error>(
services()
.rooms
.state_accessor
.get_member(&room_id, &member)?
.and_then(|memberevent| memberevent.displayname)
.unwrap_or(member.to_string()),
)
})
.filter_map(|r| r.ok())
.take(5)
.collect::<Vec<_>>();
if names.len() > 1 {
let last = names.pop().unwrap();
Some(names.join(", ") + " and " + &last)
} else if names.len() == 1 {
Some(names.pop().unwrap())
} else {
None
}
}),
initial: Some(*known),
is_dm: None, is_dm: None,
invite_state: None, invite_state: None,
unread_notifications: UnreadNotificationsCount { unread_notifications: UnreadNotificationsCount {
@ -1326,7 +1418,7 @@ pub async fn sync_events_v4_route(
} }
Ok(dbg!(sync_events::v4::Response { Ok(dbg!(sync_events::v4::Response {
initial: initial, initial: since == 0,
txn_id: body.txn_id.clone(), txn_id: body.txn_id.clone(),
pos: next_batch.to_string(), pos: next_batch.to_string(),
lists, lists,

View file

@ -1,5 +1,5 @@
use std::{ use std::{
collections::HashMap, collections::{BTreeMap, HashMap},
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
@ -105,7 +105,10 @@ impl Services {
}, },
transaction_ids: transaction_ids::Service { db }, transaction_ids: transaction_ids::Service { db },
uiaa: uiaa::Service { db }, uiaa: uiaa::Service { db },
users: users::Service { db }, users: users::Service {
db,
connections: Mutex::new(BTreeMap::new()),
},
account_data: account_data::Service { db }, account_data: account_data::Service { db },
admin: admin::Service::build(), admin: admin::Service::build(),
key_backups: key_backups::Service { db }, key_backups: key_backups::Service { db },

View file

@ -282,4 +282,19 @@ impl Service {
.map_err(|_| Error::bad_database("Invalid room name event in database.")) .map_err(|_| Error::bad_database("Invalid room name event in database."))
}) })
} }
pub fn get_member(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<RoomMemberEventContent>> {
services()
.rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomMember, user_id.as_str())?
.map_or(Ok(None), |s| {
serde_json::from_str(s.content.get())
.map_err(|_| Error::bad_database("Invalid room member event in database."))
})
}
} }

View file

@ -1,20 +1,36 @@
mod data; mod data;
use std::{collections::BTreeMap, mem}; use std::{
collections::BTreeMap,
mem,
sync::{Arc, Mutex},
};
pub use data::Data; pub use data::Data;
use ruma::{ use ruma::{
api::client::{device::Device, error::ErrorKind, filter::FilterDefinition}, api::client::{
device::Device,
error::ErrorKind,
filter::FilterDefinition,
sync::sync_events::{self, v4::SyncRequestList},
},
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
events::AnyToDeviceEvent, events::AnyToDeviceEvent,
serde::Raw, serde::Raw,
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri, DeviceId, DeviceKeyAlgorithm, DeviceKeyId, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri,
OwnedUserId, RoomAliasId, UInt, UserId, OwnedRoomId, OwnedUserId, RoomAliasId, UInt, UserId,
}; };
use crate::{services, Error, Result}; use crate::{services, Error, Result};
pub struct SlidingSyncCache {
lists: BTreeMap<String, SyncRequestList>,
known_rooms: BTreeMap<String, BTreeMap<OwnedRoomId, bool>>,
}
pub struct Service { pub struct Service {
pub db: &'static dyn Data, pub db: &'static dyn Data,
pub connections:
Mutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<Mutex<SlidingSyncCache>>>>,
} }
impl Service { impl Service {
@ -23,6 +39,121 @@ impl Service {
self.db.exists(user_id) self.db.exists(user_id)
} }
pub fn forget_sync_request_connection(
&self,
user_id: OwnedUserId,
device_id: OwnedDeviceId,
conn_id: String,
) {
self.connections
.lock()
.unwrap()
.remove(&(user_id, device_id, conn_id));
}
pub fn update_sync_request_with_cache(
&self,
user_id: OwnedUserId,
device_id: OwnedDeviceId,
request: &mut sync_events::v4::Request,
) -> BTreeMap<String, BTreeMap<OwnedRoomId, bool>> {
let Some(conn_id) = request.conn_id.clone() else { return BTreeMap::new(); };
let cache = &mut self.connections.lock().unwrap();
let cached = Arc::clone(
cache
.entry((user_id, device_id, conn_id))
.or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache {
lists: BTreeMap::new(),
known_rooms: BTreeMap::new(),
}))
}),
);
let cached = &mut cached.lock().unwrap();
drop(cache);
for (list_id, list) in &mut request.lists {
if let Some(cached_list) = cached.lists.remove(list_id) {
if list.sort.is_empty() {
list.sort = cached_list.sort;
};
if list.room_details.required_state.is_empty() {
list.room_details.required_state = cached_list.room_details.required_state;
};
list.room_details.timeline_limit = list
.room_details
.timeline_limit
.or(cached_list.room_details.timeline_limit);
list.include_old_rooms = list
.include_old_rooms
.clone()
.or(cached_list.include_old_rooms);
match (&mut list.filters, cached_list.filters) {
(Some(list_filters), Some(cached_filters)) => {
list_filters.is_dm = list_filters.is_dm.or(cached_filters.is_dm);
if list_filters.spaces.is_empty() {
list_filters.spaces = cached_filters.spaces;
}
list_filters.is_encrypted =
list_filters.is_encrypted.or(cached_filters.is_encrypted);
list_filters.is_invite =
list_filters.is_invite.or(cached_filters.is_invite);
if list_filters.room_types.is_empty() {
list_filters.room_types = cached_filters.room_types;
}
if list_filters.not_room_types.is_empty() {
list_filters.not_room_types = cached_filters.not_room_types;
}
list_filters.room_name_like = list_filters
.room_name_like
.clone()
.or(cached_filters.room_name_like);
if list_filters.tags.is_empty() {
list_filters.tags = cached_filters.tags;
}
if list_filters.not_tags.is_empty() {
list_filters.not_tags = cached_filters.not_tags;
}
}
(_, Some(cached_filters)) => list.filters = Some(cached_filters),
(_, _) => {}
}
if list.bump_event_types.is_empty() {
list.bump_event_types = cached_list.bump_event_types;
};
}
cached.lists.insert(list_id.clone(), list.clone());
}
cached.known_rooms.clone()
}
pub fn update_sync_known_rooms(
&self,
user_id: OwnedUserId,
device_id: OwnedDeviceId,
conn_id: String,
list_id: String,
new_cached_rooms: BTreeMap<OwnedRoomId, bool>,
) {
let cache = &mut self.connections.lock().unwrap();
let cached = Arc::clone(
cache
.entry((user_id, device_id, conn_id))
.or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache {
lists: BTreeMap::new(),
known_rooms: BTreeMap::new(),
}))
}),
);
let cached = &mut cached.lock().unwrap();
drop(cache);
cached.known_rooms.insert(list_id, new_cached_rooms);
}
/// Check if account is deactivated /// Check if account is deactivated
pub fn is_deactivated(&self, user_id: &UserId) -> Result<bool> { pub fn is_deactivated(&self, user_id: &UserId) -> Result<bool> {
self.db.is_deactivated(user_id) self.db.is_deactivated(user_id)