additional database stream deserializations for serde_json::from_ elim

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-10-03 10:02:24 +00:00
parent 187b9ca80c
commit e9e378a6ba
4 changed files with 57 additions and 82 deletions

View file

@ -1,9 +1,9 @@
use std::{collections::BTreeMap, sync::Arc};
use conduit::{
err, implement, utils,
err, implement,
utils::stream::{ReadyExt, TryIgnore},
Err, Error, Result,
Err, Result,
};
use database::{Deserialized, Ignore, Interfix, Map};
use futures::StreamExt;
@ -110,57 +110,35 @@ pub async fn update_backup(
#[implement(Service)]
pub async fn get_latest_backup_version(&self, user_id: &UserId) -> Result<String> {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xFF);
let mut last_possible_key = prefix.clone();
last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes());
type Key<'a> = (&'a UserId, &'a str);
let last_possible_key = (user_id, u64::MAX);
self.db
.backupid_algorithm
.rev_raw_keys_from(&last_possible_key)
.rev_keys_from(&last_possible_key)
.ignore_err()
.ready_take_while(move |key| key.starts_with(&prefix))
.ready_take_while(|(user_id_, _): &Key<'_>| *user_id_ == user_id)
.map(|(_, version): Key<'_>| version.to_owned())
.next()
.await
.ok_or_else(|| err!(Request(NotFound("No backup versions found"))))
.and_then(|key| {
utils::string_from_bytes(
key.rsplit(|&b| b == 0xFF)
.next()
.expect("rsplit always returns an element"),
)
.map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))
})
}
#[implement(Service)]
pub async fn get_latest_backup(&self, user_id: &UserId) -> Result<(String, Raw<BackupAlgorithm>)> {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xFF);
let mut last_possible_key = prefix.clone();
last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes());
type Key<'a> = (&'a UserId, &'a str);
type KeyVal<'a> = (Key<'a>, Raw<BackupAlgorithm>);
let last_possible_key = (user_id, u64::MAX);
self.db
.backupid_algorithm
.rev_raw_stream_from(&last_possible_key)
.rev_stream_from(&last_possible_key)
.ignore_err()
.ready_take_while(move |(key, _)| key.starts_with(&prefix))
.ready_take_while(|((user_id_, _), _): &KeyVal<'_>| *user_id_ == user_id)
.map(|((_, version), algorithm): KeyVal<'_>| (version.to_owned(), algorithm))
.next()
.await
.ok_or_else(|| err!(Request(NotFound("No backup found"))))
.and_then(|(key, val)| {
let version = utils::string_from_bytes(
key.rsplit(|&b| b == 0xFF)
.next()
.expect("rsplit always returns an element"),
)
.map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))?;
let algorithm = serde_json::from_slice(val)
.map_err(|_| Error::bad_database("Algorithm in backupid_algorithm is invalid."))?;
Ok((version, algorithm))
})
}
#[implement(Service)]
@ -223,7 +201,8 @@ pub async fn get_etag(&self, user_id: &UserId, version: &str) -> String {
#[implement(Service)]
pub async fn get_all(&self, user_id: &UserId, version: &str) -> BTreeMap<OwnedRoomId, RoomKeyBackup> {
type KeyVal<'a> = ((Ignore, Ignore, &'a RoomId, &'a str), &'a [u8]);
type Key<'a> = (Ignore, Ignore, &'a RoomId, &'a str);
type KeyVal<'a> = (Key<'a>, Raw<KeyBackupData>);
let mut rooms = BTreeMap::<OwnedRoomId, RoomKeyBackup>::new();
let default = || RoomKeyBackup {
@ -235,13 +214,12 @@ pub async fn get_all(&self, user_id: &UserId, version: &str) -> BTreeMap<OwnedRo
.backupkeyid_backup
.stream_prefix(&prefix)
.ignore_err()
.ready_for_each(|((_, _, room_id, session_id), value): KeyVal<'_>| {
let key_data = serde_json::from_slice(value).expect("Invalid KeyBackupData JSON");
.ready_for_each(|((_, _, room_id, session_id), key_backup_data): KeyVal<'_>| {
rooms
.entry(room_id.into())
.or_insert_with(default)
.sessions
.insert(session_id.into(), key_data);
.insert(session_id.into(), key_backup_data);
})
.await;
@ -252,18 +230,14 @@ pub async fn get_all(&self, user_id: &UserId, version: &str) -> BTreeMap<OwnedRo
pub async fn get_room(
&self, user_id: &UserId, version: &str, room_id: &RoomId,
) -> BTreeMap<String, Raw<KeyBackupData>> {
type KeyVal<'a> = ((Ignore, Ignore, Ignore, &'a str), &'a [u8]);
type KeyVal<'a> = ((Ignore, Ignore, Ignore, &'a str), Raw<KeyBackupData>);
let prefix = (user_id, version, room_id, Interfix);
self.db
.backupkeyid_backup
.stream_prefix(&prefix)
.ignore_err()
.map(|((.., session_id), value): KeyVal<'_>| {
let session_id = session_id.to_owned();
let key_backup_data = serde_json::from_slice(value).expect("Invalid KeyBackupData JSON");
(session_id, key_backup_data)
})
.map(|((.., session_id), key_backup_data): KeyVal<'_>| (session_id.to_owned(), key_backup_data))
.collect()
.await
}

View file

@ -99,7 +99,7 @@ impl Service {
.senderkey_pusher
.stream_prefix(&prefix)
.ignore_err()
.map(|(_, val): (Ignore, &[u8])| serde_json::from_slice(val).expect("Invalid Pusher in db."))
.map(|(_, pusher): (Ignore, Pusher)| pusher)
.collect()
.await
}

View file

@ -3,7 +3,7 @@ use std::{
sync::{Arc, RwLock},
};
use conduit::{utils, utils::stream::TryIgnore, Error, Result};
use conduit::{utils::stream::TryIgnore, Result};
use database::{Deserialized, Interfix, Map};
use futures::{Stream, StreamExt};
use ruma::{
@ -135,20 +135,31 @@ impl Data {
pub(super) fn rooms_invited<'a>(
&'a self, user_id: &'a UserId,
) -> impl Stream<Item = StrippedStateEventItem> + Send + 'a {
type Key<'a> = (&'a UserId, &'a RoomId);
type KeyVal<'a> = (Key<'a>, Raw<Vec<AnyStrippedStateEvent>>);
let prefix = (user_id, Interfix);
self.userroomid_invitestate
.stream_raw_prefix(&prefix)
.stream_prefix(&prefix)
.ignore_err()
.map(|(key, val)| {
let room_id = key.rsplit(|&b| b == 0xFF).next().unwrap();
let room_id = utils::string_from_bytes(room_id).unwrap();
let room_id = RoomId::parse(room_id).unwrap();
let state = serde_json::from_slice(val)
.map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate."))
.unwrap();
.map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
.map(|(room_id, state)| Ok((room_id, state.deserialize_as()?)))
.ignore_err()
}
(room_id, state)
})
/// Returns an iterator over all rooms a user left.
#[inline]
pub(super) fn rooms_left<'a>(&'a self, user_id: &'a UserId) -> impl Stream<Item = SyncStateEventItem> + Send + 'a {
type Key<'a> = (&'a UserId, &'a RoomId);
type KeyVal<'a> = (Key<'a>, Raw<Vec<Raw<AnySyncStateEvent>>>);
let prefix = (user_id, Interfix);
self.userroomid_leftstate
.stream_prefix(&prefix)
.ignore_err()
.map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
.map(|(room_id, state)| Ok((room_id, state.deserialize_as()?)))
.ignore_err()
}
#[tracing::instrument(skip(self), level = "debug")]
@ -156,7 +167,11 @@ impl Data {
&self, user_id: &UserId, room_id: &RoomId,
) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
let key = (user_id, room_id);
self.userroomid_invitestate.qry(&key).await.deserialized()
self.userroomid_invitestate
.qry(&key)
.await
.deserialized()
.and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| val.deserialize_as().map_err(Into::into))
}
#[tracing::instrument(skip(self), level = "debug")]
@ -164,25 +179,10 @@ impl Data {
&self, user_id: &UserId, room_id: &RoomId,
) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
let key = (user_id, room_id);
self.userroomid_leftstate.qry(&key).await.deserialized()
}
/// Returns an iterator over all rooms a user left.
#[inline]
pub(super) fn rooms_left<'a>(&'a self, user_id: &'a UserId) -> impl Stream<Item = SyncStateEventItem> + Send + 'a {
let prefix = (user_id, Interfix);
self.userroomid_leftstate
.stream_raw_prefix(&prefix)
.ignore_err()
.map(|(key, val)| {
let room_id = key.rsplit(|&b| b == 0xFF).next().unwrap();
let room_id = utils::string_from_bytes(room_id).unwrap();
let room_id = RoomId::parse(room_id).unwrap();
let state = serde_json::from_slice(val)
.map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))
.unwrap();
(room_id, state)
})
.qry(&key)
.await
.deserialized()
.and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| val.deserialize_as().map_err(Into::into))
}
}

View file

@ -2,7 +2,7 @@ use std::{collections::BTreeMap, mem, mem::size_of, sync::Arc};
use conduit::{
debug_warn, err, utils,
utils::{stream::TryIgnore, string::Unquoted, ReadyExt, TryReadyExt},
utils::{stream::TryIgnore, string::Unquoted, ReadyExt},
warn, Err, Error, Result, Server,
};
use database::{Deserialized, Ignore, Interfix, Map};
@ -749,9 +749,9 @@ impl Service {
let prefix = (user_id, device_id, Interfix);
self.db
.todeviceid_events
.stream_raw_prefix(&prefix)
.ready_and_then(|(_, val)| serde_json::from_slice(val).map_err(Into::into))
.stream_prefix(&prefix)
.ignore_err()
.map(|(_, val): (Ignore, Raw<AnyToDeviceEvent>)| val)
}
pub async fn remove_to_device_events(&self, user_id: &UserId, device_id: &DeviceId, until: u64) {
@ -812,11 +812,12 @@ impl Service {
}
pub fn all_devices_metadata<'a>(&'a self, user_id: &'a UserId) -> impl Stream<Item = Device> + Send + 'a {
let key = (user_id, Interfix);
self.db
.userdeviceid_metadata
.stream_raw_prefix(&(user_id, Interfix))
.ready_and_then(|(_, val)| serde_json::from_slice::<Device>(val).map_err(Into::into))
.stream_prefix(&key)
.ignore_err()
.map(|(_, val): (Ignore, Device)| val)
}
/// Creates a new sync filter. Returns the filter id.