diff --git a/src/api/client/sync.rs b/src/api/client/sync.rs index 5d881b45..84cf11b6 100644 --- a/src/api/client/sync.rs +++ b/src/api/client/sync.rs @@ -1,7 +1,6 @@ use std::{ cmp::Ordering, collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet}, - sync::Arc, time::Duration, }; @@ -197,20 +196,9 @@ pub(crate) async fn sync_events_route( for result in all_invited_rooms { let (room_id, invite_state_events) = result?; - { - // Get and drop the lock to wait for remaining operations to finish - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; - drop(insert_lock); - }; + // Get and drop the lock to wait for remaining operations to finish + let insert_lock = services().globals.roomid_mutex_insert.lock(&room_id).await; + drop(insert_lock); let invite_count = services() .rooms @@ -332,20 +320,9 @@ async fn handle_left_room( since: u64, room_id: &RoomId, sender_user: &UserId, left_rooms: &mut BTreeMap, next_batch_string: &str, full_state: bool, lazy_load_enabled: bool, ) -> Result<()> { - { - // Get and drop the lock to wait for remaining operations to finish - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; - drop(insert_lock); - }; + // Get and drop the lock to wait for remaining operations to finish + let insert_lock = services().globals.roomid_mutex_insert.lock(room_id).await; + drop(insert_lock); let left_count = services() .rooms @@ -544,21 +521,10 @@ async fn load_joined_room( next_batch: u64, next_batchcount: PduCount, lazy_load_enabled: bool, lazy_load_send_redundant: bool, full_state: bool, device_list_updates: &mut HashSet, left_encrypted_users: &mut HashSet, ) -> Result { - { - // Get and drop the lock to wait for remaining operations to finish - // This will make sure the we have all events until next_batch - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; - drop(insert_lock); - }; + // Get and drop the lock to wait for remaining operations to finish + // This will make sure the we have all events until next_batch + let insert_lock = services().globals.roomid_mutex_insert.lock(room_id).await; + drop(insert_lock); let (timeline_pdus, limited) = load_timeline(sender_user, room_id, sincecount, 10)?; diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 317a0d56..494d1821 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -14,6 +14,7 @@ use std::{ }; use base64::{engine::general_purpose, Engine as _}; +use conduit::utils; use data::Data; use hickory_resolver::TokioAsyncResolver; use ipaddress::IPAddress; @@ -33,6 +34,7 @@ use tokio::{ }; use tracing::{error, trace}; use url::Url; +use utils::MutexMap; use crate::{services, Config, Result}; @@ -52,7 +54,7 @@ pub struct Service { pub bad_event_ratelimiter: Arc>>, pub bad_signature_ratelimiter: Arc, RateLimitState>>>, pub bad_query_ratelimiter: Arc>>, - pub roomid_mutex_insert: RwLock>>>, + pub roomid_mutex_insert: MutexMap, pub roomid_mutex_state: RwLock>>>, pub roomid_mutex_federation: RwLock>>>, // this lock will be held longer pub roomid_federationhandletime: RwLock>, @@ -115,7 +117,7 @@ impl Service { bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())), bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())), roomid_mutex_state: RwLock::new(HashMap::new()), - roomid_mutex_insert: RwLock::new(HashMap::new()), + roomid_mutex_insert: MutexMap::::new(), roomid_mutex_federation: RwLock::new(HashMap::new()), roomid_federationhandletime: RwLock::new(HashMap::new()), updates_handle: Mutex::new(None), diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 7deba4de..4fbfb5df 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -271,16 +271,11 @@ impl Service { .state .set_forward_extremities(&pdu.room_id, leaves, state_lock)?; - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(pdu.room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; + let insert_lock = services() + .globals + .roomid_mutex_insert + .lock(&pdu.room_id) + .await; let count1 = services().globals.next_count()?; // Mark as read first so the sending client doesn't get a notification even if @@ -1165,16 +1160,7 @@ impl Service { .get_shortroomid(&room_id)? .expect("room exists"); - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; + let insert_lock = services().globals.roomid_mutex_insert.lock(&room_id).await; let count = services().globals.next_count()?; let mut pdu_id = shortroomid.to_be_bytes().to_vec();