typename additional shortids

cleanup/split state_compressor load

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-11-15 22:23:42 +00:00
parent 90106c4c33
commit 8fedc358e0
2 changed files with 58 additions and 44 deletions

View File

@ -23,8 +23,14 @@ use ruma::{
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId,
};
use super::state_compressor::CompressedStateEvent;
use crate::{globals, rooms, Dep};
use crate::{
globals, rooms,
rooms::{
short::{ShortEventId, ShortStateHash},
state_compressor::CompressedStateEvent,
},
Dep,
};
pub struct Service {
pub mutex: RoomMutexMap,
@ -146,8 +152,9 @@ impl Service {
#[tracing::instrument(skip(self, state_ids_compressed), level = "debug")]
pub async fn set_event_state(
&self, event_id: &EventId, room_id: &RoomId, state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
) -> Result<u64> {
const BUFSIZE: usize = size_of::<u64>();
) -> Result<ShortStateHash> {
const KEY_LEN: usize = size_of::<ShortEventId>();
const VAL_LEN: usize = size_of::<ShortStateHash>();
let shorteventid = self
.services
@ -202,7 +209,7 @@ impl Service {
self.db
.shorteventid_shortstatehash
.aput::<BUFSIZE, BUFSIZE, _, _>(shorteventid, shortstatehash);
.aput::<KEY_LEN, VAL_LEN, _, _>(shorteventid, shortstatehash);
Ok(shortstatehash)
}
@ -343,7 +350,7 @@ impl Service {
.map_err(|e| err!(Request(NotFound("No create event found: {e:?}"))))
}
pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<u64> {
pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<ShortStateHash> {
self.db
.roomid_shortstatehash
.get(room_id)

View File

@ -89,9 +89,10 @@ impl crate::Service for Service {
.map(at!(1))
.flat_map(|vec| vec.iter())
.fold(HashMap::new(), |mut ents, ssi| {
ents.insert(Arc::as_ptr(&ssi.added), compressed_state_size(&ssi.added));
ents.insert(Arc::as_ptr(&ssi.removed), compressed_state_size(&ssi.removed));
ents.insert(Arc::as_ptr(&ssi.full_state), compressed_state_size(&ssi.full_state));
for cs in &[&ssi.added, &ssi.removed, &ssi.full_state] {
ents.insert(Arc::as_ptr(cs), compressed_state_size(cs));
}
ents
});
@ -125,51 +126,57 @@ impl Service {
return Ok(r.clone());
}
let StateDiff {
parent,
added,
removed,
} = self.get_statediff(shortstatehash).await?;
let response = if let Some(parent) = parent {
let mut response = Box::pin(self.load_shortstatehash_info(parent)).await?;
let mut state = (*response.last().expect("at least one response").full_state).clone();
state.extend(added.iter().copied());
let removed = (*removed).clone();
for r in &removed {
state.remove(r);
}
response.push(ShortStateInfo {
shortstatehash,
full_state: Arc::new(state),
added,
removed: Arc::new(removed),
});
response
} else {
vec![ShortStateInfo {
shortstatehash,
full_state: added.clone(),
added,
removed,
}]
};
let stack = self.new_shortstatehash_info(shortstatehash).await?;
debug!(
?parent,
?shortstatehash,
vec_len = %response.len(),
len = %stack.len(),
"cache update"
);
self.stateinfo_cache
.lock()
.expect("locked")
.insert(shortstatehash, response.clone());
.insert(shortstatehash, stack.clone());
Ok(response)
Ok(stack)
}
async fn new_shortstatehash_info(&self, shortstatehash: ShortStateHash) -> Result<ShortStateInfoVec> {
let StateDiff {
parent,
added,
removed,
} = self.get_statediff(shortstatehash).await?;
let Some(parent) = parent else {
return Ok(vec![ShortStateInfo {
shortstatehash,
full_state: added.clone(),
added,
removed,
}]);
};
let mut stack = Box::pin(self.load_shortstatehash_info(parent)).await?;
let top = stack.last().expect("at least one frame");
let mut full_state = (*top.full_state).clone();
full_state.extend(added.iter().copied());
let removed = (*removed).clone();
for r in &removed {
full_state.remove(r);
}
stack.push(ShortStateInfo {
shortstatehash,
added,
removed: Arc::new(removed),
full_state: Arc::new(full_state),
});
Ok(stack)
}
pub async fn compress_state_event(&self, shortstatekey: ShortStateKey, event_id: &EventId) -> CompressedStateEvent {