split keys_changed for stronger-type overloads

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-10-22 04:03:07 +00:00 committed by strawberry
parent 0e0438e1f9
commit a74461fc9a
5 changed files with 27 additions and 10 deletions

View File

@ -232,7 +232,7 @@ pub(crate) async fn get_key_changes_route(
device_list_updates.extend( device_list_updates.extend(
services services
.users .users
.keys_changed(sender_user.as_str(), from, Some(to)) .keys_changed(sender_user, from, Some(to))
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await, .await,
@ -244,7 +244,8 @@ pub(crate) async fn get_key_changes_route(
device_list_updates.extend( device_list_updates.extend(
services services
.users .users
.keys_changed(room_id.as_str(), from, Some(to)) .room_keys_changed(room_id, from, Some(to))
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await, .await,

View File

@ -138,7 +138,7 @@ pub(crate) async fn sync_events_route(
device_list_updates.extend( device_list_updates.extend(
services services
.users .users
.keys_changed(sender_user.as_ref(), since, None) .keys_changed(&sender_user, since, None)
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await, .await,
@ -917,7 +917,8 @@ async fn load_joined_room(
device_list_updates.extend( device_list_updates.extend(
services services
.users .users
.keys_changed(room_id.as_ref(), since, None) .room_keys_changed(room_id, since, None)
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await, .await,

View File

@ -162,7 +162,7 @@ pub(crate) async fn sync_events_v4_route(
device_list_changes.extend( device_list_changes.extend(
services services
.users .users
.keys_changed(sender_user.as_ref(), globalsince, None) .keys_changed(sender_user, globalsince, None)
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await, .await,
@ -285,7 +285,8 @@ pub(crate) async fn sync_events_v4_route(
device_list_changes.extend( device_list_changes.extend(
services services
.users .users
.keys_changed(room_id.as_ref(), globalsince, None) .room_keys_changed(room_id, globalsince, None)
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await, .await,

View File

@ -6,7 +6,6 @@ extern crate conduit_core as conduit;
extern crate conduit_service as service; extern crate conduit_service as service;
pub(crate) use conduit::{debug_info, pdu::PduEvent, utils, Error, Result}; pub(crate) use conduit::{debug_info, pdu::PduEvent, utils, Error, Result};
pub(crate) use service::services;
pub(crate) use self::router::{Ruma, RumaResponse, State}; pub(crate) use self::router::{Ruma, RumaResponse, State};

View File

@ -13,7 +13,7 @@ use ruma::{
events::{ignored_user_list::IgnoredUserListEvent, AnyToDeviceEvent, GlobalAccountDataEventType}, events::{ignored_user_list::IgnoredUserListEvent, AnyToDeviceEvent, GlobalAccountDataEventType},
serde::Raw, serde::Raw,
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedDeviceKeyId, DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedDeviceKeyId,
OwnedMxcUri, OwnedUserId, UInt, UserId, OwnedMxcUri, OwnedUserId, RoomId, UInt, UserId,
}; };
use serde_json::json; use serde_json::json;
@ -585,9 +585,24 @@ impl Service {
Ok(()) Ok(())
} }
#[inline]
pub fn keys_changed<'a>( pub fn keys_changed<'a>(
&'a self, user_or_room_id: &'a str, from: u64, to: Option<u64>, &'a self, user_id: &'a UserId, from: u64, to: Option<u64>,
) -> impl Stream<Item = &UserId> + Send + 'a { ) -> impl Stream<Item = &UserId> + Send + 'a {
self.keys_changed_user_or_room(user_id.as_str(), from, to)
.map(|(user_id, ..)| user_id)
}
#[inline]
pub fn room_keys_changed<'a>(
&'a self, room_id: &'a RoomId, from: u64, to: Option<u64>,
) -> impl Stream<Item = (&UserId, u64)> + Send + 'a {
self.keys_changed_user_or_room(room_id.as_str(), from, to)
}
fn keys_changed_user_or_room<'a>(
&'a self, user_or_room_id: &'a str, from: u64, to: Option<u64>,
) -> impl Stream<Item = (&UserId, u64)> + Send + 'a {
type KeyVal<'a> = ((&'a str, u64), &'a UserId); type KeyVal<'a> = ((&'a str, u64), &'a UserId);
let to = to.unwrap_or(u64::MAX); let to = to.unwrap_or(u64::MAX);
@ -597,7 +612,7 @@ impl Service {
.stream_from(&start) .stream_from(&start)
.ignore_err() .ignore_err()
.ready_take_while(move |((prefix, count), _): &KeyVal<'_>| *prefix == user_or_room_id && *count <= to) .ready_take_while(move |((prefix, count), _): &KeyVal<'_>| *prefix == user_or_room_id && *count <= to)
.map(|((..), user_id): KeyVal<'_>| user_id) .map(|((_, count), user_id): KeyVal<'_>| (user_id, count))
} }
pub async fn mark_device_key_update(&self, user_id: &UserId) { pub async fn mark_device_key_update(&self, user_id: &UserId) {