fix: can't find count from event in db

This commit is contained in:
Timo Kösters 2020-09-16 21:11:38 +02:00
parent 4db6d7e430
commit 506c2a3146
No known key found for this signature in database
GPG Key ID: 24DA7517711A2BA4
2 changed files with 27 additions and 29 deletions

View File

@ -93,7 +93,7 @@ pub async fn sync_events_route(
let mut limited = false; let mut limited = false;
let mut state_pdus = Vec::new(); let mut state_pdus = Vec::new();
for pdu in non_timeline_pdus { for (_, pdu) in non_timeline_pdus {
if pdu.state_key.is_some() { if pdu.state_key.is_some() {
state_pdus.push(pdu); state_pdus.push(pdu);
} }
@ -113,7 +113,7 @@ pub async fn sync_events_route(
.rooms .rooms
.pdus_since(&sender_id, &room_id, since)? .pdus_since(&sender_id, &room_id, since)?
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.filter_map(|pdu| Some((pdu.state_key.clone()?, pdu))) .filter_map(|(_, pdu)| Some((pdu.state_key.clone()?, pdu)))
{ {
if pdu.kind == EventType::RoomMember { if pdu.kind == EventType::RoomMember {
send_member_count = true; send_member_count = true;
@ -188,8 +188,8 @@ pub async fn sync_events_route(
.rooms .rooms
.all_pdus(&sender_id, &room_id)? .all_pdus(&sender_id, &room_id)?
.filter_map(|pdu| pdu.ok()) // Ignore all broken pdus .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
.filter(|pdu| pdu.kind == EventType::RoomMember) .filter(|(_, pdu)| pdu.kind == EventType::RoomMember)
.map(|pdu| { .map(|(_, pdu)| {
let content = serde_json::from_value::< let content = serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>, Raw<ruma::events::room::member::MemberEventContent>,
>(pdu.content.clone()) >(pdu.content.clone())
@ -244,7 +244,7 @@ pub async fn sync_events_route(
(db.rooms (db.rooms
.pdus_since(&sender_id, &room_id, last_read)? .pdus_since(&sender_id, &room_id, last_read)?
.filter_map(|pdu| pdu.ok()) // Filter out buggy events .filter_map(|pdu| pdu.ok()) // Filter out buggy events
.filter(|pdu| { .filter(|(_, pdu)| {
matches!( matches!(
pdu.kind.clone(), pdu.kind.clone(),
EventType::RoomMessage | EventType::RoomEncrypted EventType::RoomMessage | EventType::RoomEncrypted
@ -260,18 +260,15 @@ pub async fn sync_events_route(
None None
}; };
let prev_batch = timeline_pdus.first().map_or(Ok::<_, Error>(None), |e| { let prev_batch = timeline_pdus
Ok(Some( .first()
db.rooms .map_or(Ok::<_, Error>(None), |(pdu_id, _)| {
.get_pdu_count(&e.event_id)? Ok(Some(db.rooms.pdu_count(pdu_id)?.to_string()))
.ok_or_else(|| Error::bad_database("Can't find count from event in db."))? })?;
.to_string(),
))
})?;
let room_events = timeline_pdus let room_events = timeline_pdus
.into_iter() .into_iter()
.map(|pdu| pdu.to_sync_room_event()) .map(|(_, pdu)| pdu.to_sync_room_event())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut edus = db let mut edus = db
@ -380,7 +377,7 @@ pub async fn sync_events_route(
let pdus = db.rooms.pdus_since(&sender_id, &room_id, since)?; let pdus = db.rooms.pdus_since(&sender_id, &room_id, since)?;
let room_events = pdus let room_events = pdus
.filter_map(|pdu| pdu.ok()) // Filter out buggy events .filter_map(|pdu| pdu.ok()) // Filter out buggy events
.map(|pdu| pdu.to_sync_room_event()) .map(|(_, pdu)| pdu.to_sync_room_event())
.collect(); .collect();
let left_room = sync_events::LeftRoom { let left_room = sync_events::LeftRoom {
@ -395,7 +392,7 @@ pub async fn sync_events_route(
let mut left_since_last_sync = false; let mut left_since_last_sync = false;
for pdu in db.rooms.pdus_since(&sender_id, &room_id, since)? { for pdu in db.rooms.pdus_since(&sender_id, &room_id, since)? {
let pdu = pdu?; let (_, pdu) = pdu?;
if pdu.kind == EventType::RoomMember && pdu.state_key == Some(sender_id.to_string()) { if pdu.kind == EventType::RoomMember && pdu.state_key == Some(sender_id.to_string()) {
let content = serde_json::from_value::< let content = serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>, Raw<ruma::events::room::member::MemberEventContent>,
@ -438,7 +435,7 @@ pub async fn sync_events_route(
let room_id = room_id?; let room_id = room_id?;
let mut invited_since_last_sync = false; let mut invited_since_last_sync = false;
for pdu in db.rooms.pdus_since(&sender_id, &room_id, since)? { for pdu in db.rooms.pdus_since(&sender_id, &room_id, since)? {
let pdu = pdu?; let (_, pdu) = pdu?;
if pdu.kind == EventType::RoomMember && pdu.state_key == Some(sender_id.to_string()) { if pdu.kind == EventType::RoomMember && pdu.state_key == Some(sender_id.to_string()) {
let content = serde_json::from_value::< let content = serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>, Raw<ruma::events::room::member::MemberEventContent>,

View File

@ -355,18 +355,19 @@ impl Rooms {
} }
} }
/// Returns the `count` of this pdu's id.
pub fn pdu_count(&self, pdu_id: &[u8]) -> Result<u64> {
Ok(
utils::u64_from_bytes(&pdu_id[pdu_id.len() - mem::size_of::<u64>()..pdu_id.len()])
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))?,
)
}
/// Returns the `count` of this pdu's id. /// Returns the `count` of this pdu's id.
pub fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<u64>> { pub fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<u64>> {
self.eventid_pduid self.eventid_pduid
.get(event_id.as_bytes())? .get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| { .map_or(Ok(None), |pdu_id| self.pdu_count(&pdu_id).map(Some))
Ok(Some(
utils::u64_from_bytes(
&pdu_id[pdu_id.len() - mem::size_of::<u64>()..pdu_id.len()],
)
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))?,
))
})
} }
/// Returns the json of a pdu. /// Returns the json of a pdu.
@ -860,7 +861,7 @@ impl Rooms {
&self, &self,
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
) -> Result<impl Iterator<Item = Result<PduEvent>>> { ) -> Result<impl Iterator<Item = Result<(IVec, PduEvent)>>> {
self.pdus_since(user_id, room_id, 0) self.pdus_since(user_id, room_id, 0)
} }
@ -871,7 +872,7 @@ impl Rooms {
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
since: u64, since: u64,
) -> Result<impl DoubleEndedIterator<Item = Result<PduEvent>>> { ) -> Result<impl DoubleEndedIterator<Item = Result<(IVec, PduEvent)>>> {
let mut prefix = room_id.to_string().as_bytes().to_vec(); let mut prefix = room_id.to_string().as_bytes().to_vec();
prefix.push(0xff); prefix.push(0xff);
@ -887,13 +888,13 @@ impl Rooms {
.pduid_pdu .pduid_pdu
.range(first_pdu_id..last_pdu_id) .range(first_pdu_id..last_pdu_id)
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.map(move |(_, v)| { .map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v) let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id { if pdu.sender != user_id {
pdu.unsigned.remove("transaction_id"); pdu.unsigned.remove("transaction_id");
} }
Ok(pdu) Ok((pdu_id, pdu))
})) }))
} }