cleanup on drop for utils::mutex_map.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
01b2928d55
commit
2d251eb19c
|
@ -7,9 +7,8 @@ use std::{
|
||||||
|
|
||||||
use axum_client_ip::InsecureClientIp;
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduit::{
|
use conduit::{
|
||||||
debug, debug_warn, error, info, trace, utils,
|
debug, debug_warn, error, info, trace, utils, utils::math::continue_exponential_backoff_secs, warn, Error,
|
||||||
utils::{math::continue_exponential_backoff_secs, mutex_map},
|
PduEvent, Result,
|
||||||
warn, Error, PduEvent, Result,
|
|
||||||
};
|
};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::{
|
api::{
|
||||||
|
@ -36,13 +35,14 @@ use ruma::{
|
||||||
OwnedUserId, RoomId, RoomVersionId, ServerName, UserId,
|
OwnedUserId, RoomId, RoomVersionId, ServerName, UserId,
|
||||||
};
|
};
|
||||||
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
||||||
use service::sending::convert_to_outgoing_federation_event;
|
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
client::{update_avatar_url, update_displayname},
|
client::{update_avatar_url, update_displayname},
|
||||||
service::{
|
service::{
|
||||||
|
globals::RoomMutexGuard,
|
||||||
pdu::{gen_event_id_canonical_json, PduBuilder},
|
pdu::{gen_event_id_canonical_json, PduBuilder},
|
||||||
|
sending::convert_to_outgoing_federation_event,
|
||||||
server_is_ours, user_is_local,
|
server_is_ours, user_is_local,
|
||||||
},
|
},
|
||||||
services, Ruma,
|
services, Ruma,
|
||||||
|
@ -682,7 +682,7 @@ pub async fn join_room_by_id_helper(
|
||||||
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote")]
|
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote")]
|
||||||
async fn join_room_by_id_helper_remote(
|
async fn join_room_by_id_helper_remote(
|
||||||
sender_user: &UserId, room_id: &RoomId, reason: Option<String>, servers: &[OwnedServerName],
|
sender_user: &UserId, room_id: &RoomId, reason: Option<String>, servers: &[OwnedServerName],
|
||||||
_third_party_signed: Option<&ThirdPartySigned>, state_lock: mutex_map::Guard<()>,
|
_third_party_signed: Option<&ThirdPartySigned>, state_lock: RoomMutexGuard,
|
||||||
) -> Result<join_room_by_id::v3::Response> {
|
) -> Result<join_room_by_id::v3::Response> {
|
||||||
info!("Joining {room_id} over federation.");
|
info!("Joining {room_id} over federation.");
|
||||||
|
|
||||||
|
@ -1018,7 +1018,7 @@ async fn join_room_by_id_helper_remote(
|
||||||
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local")]
|
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local")]
|
||||||
async fn join_room_by_id_helper_local(
|
async fn join_room_by_id_helper_local(
|
||||||
sender_user: &UserId, room_id: &RoomId, reason: Option<String>, servers: &[OwnedServerName],
|
sender_user: &UserId, room_id: &RoomId, reason: Option<String>, servers: &[OwnedServerName],
|
||||||
_third_party_signed: Option<&ThirdPartySigned>, state_lock: mutex_map::Guard<()>,
|
_third_party_signed: Option<&ThirdPartySigned>, state_lock: RoomMutexGuard,
|
||||||
) -> Result<join_room_by_id::v3::Response> {
|
) -> Result<join_room_by_id::v3::Response> {
|
||||||
debug!("We can join locally");
|
debug!("We can join locally");
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ pub use debug::slice_truncated as debug_slice_truncated;
|
||||||
pub use hash::calculate_hash;
|
pub use hash::calculate_hash;
|
||||||
pub use html::Escape as HtmlEscape;
|
pub use html::Escape as HtmlEscape;
|
||||||
pub use json::{deserialize_from_str, to_canonical_object};
|
pub use json::{deserialize_from_str, to_canonical_object};
|
||||||
pub use mutex_map::MutexMap;
|
pub use mutex_map::{Guard as MutexMapGuard, MutexMap};
|
||||||
pub use rand::string as random_string;
|
pub use rand::string as random_string;
|
||||||
pub use string::{str_from_bytes, string_from_bytes};
|
pub use string::{str_from_bytes, string_from_bytes};
|
||||||
pub use sys::available_parallelism;
|
pub use sys::available_parallelism;
|
||||||
|
|
|
@ -1,20 +1,22 @@
|
||||||
use std::{hash::Hash, sync::Arc};
|
use std::{fmt::Debug, hash::Hash, sync::Arc};
|
||||||
|
|
||||||
type Value<Val> = tokio::sync::Mutex<Val>;
|
use tokio::sync::OwnedMutexGuard as Omg;
|
||||||
type ArcMutex<Val> = Arc<Value<Val>>;
|
|
||||||
type HashMap<Key, Val> = std::collections::HashMap<Key, ArcMutex<Val>>;
|
|
||||||
type MapMutex<Key, Val> = std::sync::Mutex<HashMap<Key, Val>>;
|
|
||||||
type Map<Key, Val> = MapMutex<Key, Val>;
|
|
||||||
|
|
||||||
/// Map of Mutexes
|
/// Map of Mutexes
|
||||||
pub struct MutexMap<Key, Val> {
|
pub struct MutexMap<Key, Val> {
|
||||||
map: Map<Key, Val>,
|
map: Map<Key, Val>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Guard<Val> {
|
pub struct Guard<Key, Val> {
|
||||||
_guard: tokio::sync::OwnedMutexGuard<Val>,
|
map: Map<Key, Val>,
|
||||||
|
val: Omg<Val>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Map<Key, Val> = Arc<MapMutex<Key, Val>>;
|
||||||
|
type MapMutex<Key, Val> = std::sync::Mutex<HashMap<Key, Val>>;
|
||||||
|
type HashMap<Key, Val> = std::collections::HashMap<Key, Value<Val>>;
|
||||||
|
type Value<Val> = Arc<tokio::sync::Mutex<Val>>;
|
||||||
|
|
||||||
impl<Key, Val> MutexMap<Key, Val>
|
impl<Key, Val> MutexMap<Key, Val>
|
||||||
where
|
where
|
||||||
Key: Send + Hash + Eq + Clone,
|
Key: Send + Hash + Eq + Clone,
|
||||||
|
@ -23,28 +25,38 @@ where
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
map: Map::<Key, Val>::new(HashMap::<Key, Val>::new()),
|
map: Map::new(MapMutex::new(HashMap::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn lock<K>(&self, k: &K) -> Guard<Val>
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
|
pub async fn lock<K>(&self, k: &K) -> Guard<Key, Val>
|
||||||
where
|
where
|
||||||
K: ?Sized + Send + Sync,
|
K: ?Sized + Send + Sync + Debug,
|
||||||
Key: for<'a> From<&'a K>,
|
Key: for<'a> From<&'a K>,
|
||||||
{
|
{
|
||||||
let val = self
|
let val = self
|
||||||
.map
|
.map
|
||||||
.lock()
|
.lock()
|
||||||
.expect("map mutex locked")
|
.expect("locked")
|
||||||
.entry(k.into())
|
.entry(k.into())
|
||||||
.or_default()
|
.or_default()
|
||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
let guard = val.lock_owned().await;
|
Guard::<Key, Val> {
|
||||||
Guard::<Val> {
|
map: Arc::clone(&self.map),
|
||||||
_guard: guard,
|
val: val.lock_owned().await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn contains(&self, k: &Key) -> bool { self.map.lock().expect("locked").contains_key(k) }
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn is_empty(&self) -> bool { self.map.lock().expect("locked").is_empty() }
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn len(&self) -> usize { self.map.lock().expect("locked").len() }
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Key, Val> Default for MutexMap<Key, Val>
|
impl<Key, Val> Default for MutexMap<Key, Val>
|
||||||
|
@ -54,3 +66,14 @@ where
|
||||||
{
|
{
|
||||||
fn default() -> Self { Self::new() }
|
fn default() -> Self { Self::new() }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<Key, Val> Drop for Guard<Key, Val> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if Arc::strong_count(Omg::mutex(&self.val)) <= 2 {
|
||||||
|
self.map
|
||||||
|
.lock()
|
||||||
|
.expect("locked")
|
||||||
|
.retain(|_, val| !Arc::ptr_eq(val, Omg::mutex(&self.val)) || Arc::strong_count(val) > 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -81,3 +81,56 @@ fn checked_add_overflow() {
|
||||||
let res = checked!(a + 1).expect("overflow");
|
let res = checked!(a + 1).expect("overflow");
|
||||||
assert_eq!(res, 0);
|
assert_eq!(res, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn mutex_map_cleanup() {
|
||||||
|
use crate::utils::MutexMap;
|
||||||
|
|
||||||
|
let map = MutexMap::<String, ()>::new();
|
||||||
|
|
||||||
|
let lock = map.lock("foo").await;
|
||||||
|
assert!(!map.is_empty(), "map must not be empty");
|
||||||
|
|
||||||
|
drop(lock);
|
||||||
|
assert!(map.is_empty(), "map must be empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn mutex_map_contend() {
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::sync::Barrier;
|
||||||
|
|
||||||
|
use crate::utils::MutexMap;
|
||||||
|
|
||||||
|
let map = Arc::new(MutexMap::<String, ()>::new());
|
||||||
|
let seq = Arc::new([Barrier::new(2), Barrier::new(2)]);
|
||||||
|
let str = "foo".to_owned();
|
||||||
|
|
||||||
|
let seq_ = seq.clone();
|
||||||
|
let map_ = map.clone();
|
||||||
|
let str_ = str.clone();
|
||||||
|
let join_a = tokio::spawn(async move {
|
||||||
|
let _lock = map_.lock(&str_).await;
|
||||||
|
assert!(!map_.is_empty(), "A0 must not be empty");
|
||||||
|
seq_[0].wait().await;
|
||||||
|
assert!(map_.contains(&str_), "A1 must contain key");
|
||||||
|
});
|
||||||
|
|
||||||
|
let seq_ = seq.clone();
|
||||||
|
let map_ = map.clone();
|
||||||
|
let str_ = str.clone();
|
||||||
|
let join_b = tokio::spawn(async move {
|
||||||
|
let _lock = map_.lock(&str_).await;
|
||||||
|
assert!(!map_.is_empty(), "B0 must not be empty");
|
||||||
|
seq_[1].wait().await;
|
||||||
|
assert!(map_.contains(&str_), "B1 must contain key");
|
||||||
|
});
|
||||||
|
|
||||||
|
seq[0].wait().await;
|
||||||
|
assert!(map.contains(&str), "Must contain key");
|
||||||
|
seq[1].wait().await;
|
||||||
|
|
||||||
|
tokio::try_join!(join_b, join_a).expect("joined");
|
||||||
|
assert!(map.is_empty(), "Must be empty");
|
||||||
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use conduit::{error, utils::mutex_map, Error, Result};
|
use conduit::{error, Error, Result};
|
||||||
pub use create::create_admin_room;
|
pub use create::create_admin_room;
|
||||||
pub use grant::make_user_admin;
|
pub use grant::make_user_admin;
|
||||||
use loole::{Receiver, Sender};
|
use loole::{Receiver, Sender};
|
||||||
|
@ -26,7 +26,7 @@ use tokio::{
|
||||||
task::JoinHandle,
|
task::JoinHandle,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{pdu::PduBuilder, services, user_is_local, PduEvent};
|
use crate::{globals::RoomMutexGuard, pdu::PduBuilder, services, user_is_local, PduEvent};
|
||||||
|
|
||||||
const COMMAND_QUEUE_LIMIT: usize = 512;
|
const COMMAND_QUEUE_LIMIT: usize = 512;
|
||||||
|
|
||||||
|
@ -270,7 +270,7 @@ async fn respond_to_room(content: RoomMessageEventContent, room_id: &RoomId, use
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_response_error(
|
async fn handle_response_error(
|
||||||
e: &Error, room_id: &RoomId, user_id: &UserId, state_lock: &mutex_map::Guard<()>,
|
e: &Error, room_id: &RoomId, user_id: &UserId, state_lock: &RoomMutexGuard,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
error!("Failed to build and append admin room response PDU: \"{e}\"");
|
error!("Failed to build and append admin room response PDU: \"{e}\"");
|
||||||
let error_room_message = RoomMessageEventContent::text_plain(format!(
|
let error_room_message = RoomMessageEventContent::text_plain(format!(
|
||||||
|
|
|
@ -12,7 +12,11 @@ use std::{
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::{error, trace, utils::MutexMap, Config, Result};
|
use conduit::{
|
||||||
|
error, trace,
|
||||||
|
utils::{MutexMap, MutexMapGuard},
|
||||||
|
Config, Result,
|
||||||
|
};
|
||||||
use data::Data;
|
use data::Data;
|
||||||
use ipaddress::IPAddress;
|
use ipaddress::IPAddress;
|
||||||
use regex::RegexSet;
|
use regex::RegexSet;
|
||||||
|
@ -27,8 +31,6 @@ use url::Url;
|
||||||
|
|
||||||
use crate::services;
|
use crate::services;
|
||||||
|
|
||||||
type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries
|
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
pub db: Data,
|
pub db: Data,
|
||||||
|
|
||||||
|
@ -43,9 +45,9 @@ pub struct Service {
|
||||||
pub bad_event_ratelimiter: Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
|
pub bad_event_ratelimiter: Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
|
||||||
pub bad_signature_ratelimiter: Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
|
pub bad_signature_ratelimiter: Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
|
||||||
pub bad_query_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
|
pub bad_query_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
|
||||||
pub roomid_mutex_insert: MutexMap<OwnedRoomId, ()>,
|
pub roomid_mutex_insert: RoomMutexMap,
|
||||||
pub roomid_mutex_state: MutexMap<OwnedRoomId, ()>,
|
pub roomid_mutex_state: RoomMutexMap,
|
||||||
pub roomid_mutex_federation: MutexMap<OwnedRoomId, ()>,
|
pub roomid_mutex_federation: RoomMutexMap,
|
||||||
pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
|
pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
|
||||||
pub updates_handle: Mutex<Option<JoinHandle<()>>>,
|
pub updates_handle: Mutex<Option<JoinHandle<()>>>,
|
||||||
pub stateres_mutex: Arc<Mutex<()>>,
|
pub stateres_mutex: Arc<Mutex<()>>,
|
||||||
|
@ -53,6 +55,10 @@ pub struct Service {
|
||||||
pub admin_alias: OwnedRoomAliasId,
|
pub admin_alias: OwnedRoomAliasId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
|
||||||
|
pub type RoomMutexGuard = MutexMapGuard<OwnedRoomId, ()>;
|
||||||
|
type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries
|
||||||
|
|
||||||
impl crate::Service for Service {
|
impl crate::Service for Service {
|
||||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||||
let config = &args.server.config;
|
let config = &args.server.config;
|
||||||
|
|
|
@ -3,7 +3,8 @@ use std::{collections::HashSet, sync::Arc};
|
||||||
use conduit::{utils, Error, Result};
|
use conduit::{utils, Error, Result};
|
||||||
use database::{Database, Map};
|
use database::{Database, Map};
|
||||||
use ruma::{EventId, OwnedEventId, RoomId};
|
use ruma::{EventId, OwnedEventId, RoomId};
|
||||||
use utils::mutex_map;
|
|
||||||
|
use crate::globals::RoomMutexGuard;
|
||||||
|
|
||||||
pub(super) struct Data {
|
pub(super) struct Data {
|
||||||
shorteventid_shortstatehash: Arc<Map>,
|
shorteventid_shortstatehash: Arc<Map>,
|
||||||
|
@ -35,7 +36,7 @@ impl Data {
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
new_shortstatehash: u64,
|
new_shortstatehash: u64,
|
||||||
_mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
|
_mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
self.roomid_shortstatehash
|
self.roomid_shortstatehash
|
||||||
.insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?;
|
.insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?;
|
||||||
|
@ -68,7 +69,7 @@ impl Data {
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
event_ids: Vec<OwnedEventId>,
|
event_ids: Vec<OwnedEventId>,
|
||||||
_mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
|
_mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut prefix = room_id.as_bytes().to_vec();
|
let mut prefix = room_id.as_bytes().to_vec();
|
||||||
prefix.push(0xFF);
|
prefix.push(0xFF);
|
||||||
|
|
|
@ -5,10 +5,7 @@ use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::{
|
use conduit::{utils::calculate_hash, warn, Error, Result};
|
||||||
utils::{calculate_hash, mutex_map},
|
|
||||||
warn, Error, Result,
|
|
||||||
};
|
|
||||||
use data::Data;
|
use data::Data;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::client::error::ErrorKind,
|
api::client::error::ErrorKind,
|
||||||
|
@ -22,7 +19,7 @@ use ruma::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::state_compressor::CompressedStateEvent;
|
use super::state_compressor::CompressedStateEvent;
|
||||||
use crate::{services, PduEvent};
|
use crate::{globals::RoomMutexGuard, services, PduEvent};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
db: Data,
|
db: Data,
|
||||||
|
@ -46,7 +43,7 @@ impl Service {
|
||||||
shortstatehash: u64,
|
shortstatehash: u64,
|
||||||
statediffnew: Arc<HashSet<CompressedStateEvent>>,
|
statediffnew: Arc<HashSet<CompressedStateEvent>>,
|
||||||
_statediffremoved: Arc<HashSet<CompressedStateEvent>>,
|
_statediffremoved: Arc<HashSet<CompressedStateEvent>>,
|
||||||
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
|
state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
for event_id in statediffnew.iter().filter_map(|new| {
|
for event_id in statediffnew.iter().filter_map(|new| {
|
||||||
services()
|
services()
|
||||||
|
@ -318,7 +315,7 @@ impl Service {
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
shortstatehash: u64,
|
shortstatehash: u64,
|
||||||
mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
|
mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
self.db.set_room_state(room_id, shortstatehash, mutex_lock)
|
self.db.set_room_state(room_id, shortstatehash, mutex_lock)
|
||||||
}
|
}
|
||||||
|
@ -358,7 +355,7 @@ impl Service {
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
event_ids: Vec<OwnedEventId>,
|
event_ids: Vec<OwnedEventId>,
|
||||||
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
|
state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
self.db
|
self.db
|
||||||
.set_forward_extremities(room_id, event_ids, state_lock)
|
.set_forward_extremities(room_id, event_ids, state_lock)
|
||||||
|
|
|
@ -6,11 +6,7 @@ use std::{
|
||||||
sync::{Arc, Mutex as StdMutex, Mutex},
|
sync::{Arc, Mutex as StdMutex, Mutex},
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::{
|
use conduit::{error, utils::math::usize_from_f64, warn, Error, Result};
|
||||||
error,
|
|
||||||
utils::{math::usize_from_f64, mutex_map},
|
|
||||||
warn, Error, Result,
|
|
||||||
};
|
|
||||||
use data::Data;
|
use data::Data;
|
||||||
use lru_cache::LruCache;
|
use lru_cache::LruCache;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
|
@ -37,7 +33,7 @@ use ruma::{
|
||||||
};
|
};
|
||||||
use serde_json::value::to_raw_value;
|
use serde_json::value::to_raw_value;
|
||||||
|
|
||||||
use crate::{pdu::PduBuilder, services, PduEvent};
|
use crate::{globals::RoomMutexGuard, pdu::PduBuilder, services, PduEvent};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
db: Data,
|
db: Data,
|
||||||
|
@ -333,7 +329,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn user_can_invite(
|
pub fn user_can_invite(
|
||||||
&self, room_id: &RoomId, sender: &UserId, target_user: &UserId, state_lock: &mutex_map::Guard<()>,
|
&self, room_id: &RoomId, sender: &UserId, target_user: &UserId, state_lock: &RoomMutexGuard,
|
||||||
) -> Result<bool> {
|
) -> Result<bool> {
|
||||||
let content = to_raw_value(&RoomMemberEventContent::new(MembershipState::Invite))
|
let content = to_raw_value(&RoomMemberEventContent::new(MembershipState::Invite))
|
||||||
.expect("Event content always serializes");
|
.expect("Event content always serializes");
|
||||||
|
|
|
@ -6,7 +6,7 @@ use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::{debug, error, info, utils, utils::mutex_map, validated, warn, Error, Result};
|
use conduit::{debug, error, info, utils, validated, warn, Error, Result};
|
||||||
use data::Data;
|
use data::Data;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
|
@ -36,6 +36,7 @@ use tokio::sync::RwLock;
|
||||||
use crate::{
|
use crate::{
|
||||||
admin,
|
admin,
|
||||||
appservice::NamespaceRegex,
|
appservice::NamespaceRegex,
|
||||||
|
globals::RoomMutexGuard,
|
||||||
pdu::{EventHash, PduBuilder},
|
pdu::{EventHash, PduBuilder},
|
||||||
rooms::{event_handler::parse_incoming_pdu, state_compressor::CompressedStateEvent},
|
rooms::{event_handler::parse_incoming_pdu, state_compressor::CompressedStateEvent},
|
||||||
server_is_ours, services, PduCount, PduEvent,
|
server_is_ours, services, PduCount, PduEvent,
|
||||||
|
@ -203,7 +204,7 @@ impl Service {
|
||||||
pdu: &PduEvent,
|
pdu: &PduEvent,
|
||||||
mut pdu_json: CanonicalJsonObject,
|
mut pdu_json: CanonicalJsonObject,
|
||||||
leaves: Vec<OwnedEventId>,
|
leaves: Vec<OwnedEventId>,
|
||||||
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
|
state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<Vec<u8>> {
|
) -> Result<Vec<u8>> {
|
||||||
// Coalesce database writes for the remainder of this scope.
|
// Coalesce database writes for the remainder of this scope.
|
||||||
let _cork = services().db.cork_and_flush();
|
let _cork = services().db.cork_and_flush();
|
||||||
|
@ -593,7 +594,7 @@ impl Service {
|
||||||
pdu_builder: PduBuilder,
|
pdu_builder: PduBuilder,
|
||||||
sender: &UserId,
|
sender: &UserId,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
_mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
|
_mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<(PduEvent, CanonicalJsonObject)> {
|
) -> Result<(PduEvent, CanonicalJsonObject)> {
|
||||||
let PduBuilder {
|
let PduBuilder {
|
||||||
event_type,
|
event_type,
|
||||||
|
@ -780,7 +781,7 @@ impl Service {
|
||||||
pdu_builder: PduBuilder,
|
pdu_builder: PduBuilder,
|
||||||
sender: &UserId,
|
sender: &UserId,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
|
state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<Arc<EventId>> {
|
) -> Result<Arc<EventId>> {
|
||||||
let (pdu, pdu_json) = self.create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)?;
|
let (pdu, pdu_json) = self.create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)?;
|
||||||
if let Some(admin_room) = admin::Service::get_admin_room()? {
|
if let Some(admin_room) = admin::Service::get_admin_room()? {
|
||||||
|
@ -963,7 +964,7 @@ impl Service {
|
||||||
new_room_leaves: Vec<OwnedEventId>,
|
new_room_leaves: Vec<OwnedEventId>,
|
||||||
state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
|
state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
|
||||||
soft_fail: bool,
|
soft_fail: bool,
|
||||||
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
|
state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<Option<Vec<u8>>> {
|
) -> Result<Option<Vec<u8>>> {
|
||||||
// We append to state before appending the pdu, so we don't have a moment in
|
// We append to state before appending the pdu, so we don't have a moment in
|
||||||
// time with the pdu without it's state. This is okay because append_pdu can't
|
// time with the pdu without it's state. This is okay because append_pdu can't
|
||||||
|
|
Loading…
Reference in New Issue