From 539aa278150ddeef2822d4d52ef74d1873c37db6 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 14 Jun 2024 21:39:37 +0000 Subject: [PATCH] reduce roomid_mutex_federation Signed-off-by: Jason Volk --- src/api/server/send.rs | 17 ++++++----------- src/api/server/send_join.rs | 17 ++++++----------- src/api/server/send_leave.rs | 17 ++++++----------- src/service/globals/mod.rs | 4 ++-- src/service/rooms/timeline/mod.rs | 15 +++++---------- 5 files changed, 25 insertions(+), 45 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index e2413a4a..6faa7c25 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, sync::Arc, time::Instant}; +use std::{collections::BTreeMap, time::Instant}; use axum_client_ip::InsecureClientIp; use conduit::debug_warn; @@ -107,16 +107,11 @@ pub(crate) async fn send_transaction_message_route( let mut resolved_map = BTreeMap::new(); for (event_id, value, room_id) in parsed_pdus { let pdu_start_time = Instant::now(); - let mutex = Arc::clone( - services() - .globals - .roomid_mutex_federation - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; + let mutex_lock = services() + .globals + .roomid_mutex_federation + .lock(&room_id) + .await; resolved_map.insert( event_id.clone(), services() diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 18d075b4..388280e9 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -1,6 +1,6 @@ #![allow(deprecated)] -use std::{collections::BTreeMap, sync::Arc}; +use std::collections::BTreeMap; use ruma::{ api::{client::error::ErrorKind, federation::membership::create_join_event}, @@ -148,16 +148,11 @@ async fn create_join_event( .fetch_required_signing_keys([&value], &pub_key_map) .await?; - let mutex = Arc::clone( - services() - .globals - .roomid_mutex_federation - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; + let mutex_lock = services() + .globals + .roomid_mutex_federation + .lock(room_id) + .await; let pdu_id: Vec = services() .rooms .event_handler diff --git a/src/api/server/send_leave.rs b/src/api/server/send_leave.rs index ec843588..2680378f 100644 --- a/src/api/server/send_leave.rs +++ b/src/api/server/send_leave.rs @@ -1,6 +1,6 @@ #![allow(deprecated)] -use std::{collections::BTreeMap, sync::Arc}; +use std::collections::BTreeMap; use ruma::{ api::{client::error::ErrorKind, federation::membership::create_leave_event}, @@ -154,16 +154,11 @@ async fn create_leave_event(origin: &ServerName, room_id: &RoomId, pdu: &RawJson .fetch_required_signing_keys([&value], &pub_key_map) .await?; - let mutex = Arc::clone( - services() - .globals - .roomid_mutex_federation - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; + let mutex_lock = services() + .globals + .roomid_mutex_federation + .lock(room_id) + .await; let pdu_id: Vec = services() .rooms .event_handler diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 494d1821..716735df 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -56,7 +56,7 @@ pub struct Service { pub bad_query_ratelimiter: Arc>>, pub roomid_mutex_insert: MutexMap, pub roomid_mutex_state: RwLock>>>, - pub roomid_mutex_federation: RwLock>>>, // this lock will be held longer + pub roomid_mutex_federation: MutexMap, pub roomid_federationhandletime: RwLock>, pub updates_handle: Mutex>>, pub stateres_mutex: Arc>, @@ -118,7 +118,7 @@ impl Service { bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())), roomid_mutex_state: RwLock::new(HashMap::new()), roomid_mutex_insert: MutexMap::::new(), - roomid_mutex_federation: RwLock::new(HashMap::new()), + roomid_mutex_federation: MutexMap::::new(), roomid_federationhandletime: RwLock::new(HashMap::new()), updates_handle: Mutex::new(None), stateres_mutex: Arc::new(Mutex::new(())), diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 4fbfb5df..35c1ffbb 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -1122,16 +1122,11 @@ impl Service { let (event_id, value, room_id) = parse_incoming_pdu(&pdu)?; // Lock so we cannot backfill the same pdu twice at the same time - let mutex = Arc::clone( - services() - .globals - .roomid_mutex_federation - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; + let mutex_lock = services() + .globals + .roomid_mutex_federation + .lock(&room_id) + .await; // Skip the PDU if we already have it as a timeline event if let Some(pdu_id) = self.get_pdu_id(&event_id)? {