remove queued push keys on pusher deletion, use more refs

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-11-23 13:45:27 -05:00
parent 9d23a2b6f5
commit 3fe98f35f2
No known key found for this signature in database
4 changed files with 48 additions and 18 deletions

View file

@ -441,7 +441,10 @@ pub(crate) async fn set_pushers_route(
) -> Result<set_pusher::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
services.pusher.set_pusher(sender_user, &body.action)?;
services
.pusher
.set_pusher(sender_user, &body.action)
.await?;
Ok(set_pusher::v3::Response::new())
}

View file

@ -79,26 +79,24 @@ impl Service {
///
/// # Arguments
///
/// * `service_name` - the name you send to register the service previously
pub async fn unregister_appservice(&self, service_name: &str) -> Result<()> {
/// * `service_name` - the registration ID of the appservice
pub async fn unregister_appservice(&self, appservice_id: &str) -> Result<()> {
// removes the appservice registration info
self.registration_info
.write()
.await
.remove(service_name)
.remove(appservice_id)
.ok_or(err!("Appservice not found"))?;
// remove the appservice from the database
self.db.id_appserviceregistrations.remove(service_name);
self.db.id_appserviceregistrations.del(appservice_id);
// deletes all active requests for the appservice if there are any so we stop
// sending to the URL
self.services
.sending
.cleanup_events(service_name.to_owned())
.await;
Ok(())
.cleanup_events(Some(appservice_id), None, None)
.await
}
pub async fn get_registration(&self, id: &str) -> Option<Registration> {

View file

@ -26,7 +26,7 @@ use ruma::{
uint, RoomId, UInt, UserId,
};
use crate::{client, globals, rooms, users, Dep};
use crate::{client, globals, rooms, sending, users, Dep};
pub struct Service {
db: Data,
@ -39,6 +39,7 @@ struct Services {
state_accessor: Dep<rooms::state_accessor::Service>,
state_cache: Dep<rooms::state_cache::Service>,
users: Dep<users::Service>,
sending: Dep<sending::Service>,
}
struct Data {
@ -57,6 +58,7 @@ impl crate::Service for Service {
state_accessor: args.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
users: args.depend::<users::Service>("users"),
sending: args.depend::<sending::Service>("sending"),
},
}))
}
@ -65,7 +67,7 @@ impl crate::Service for Service {
}
impl Service {
pub fn set_pusher(&self, sender: &UserId, pusher: &set_pusher::v3::PusherAction) -> Result {
pub async fn set_pusher(&self, sender: &UserId, pusher: &set_pusher::v3::PusherAction) -> Result {
match pusher {
set_pusher::v3::PusherAction::Post(data) => {
let pushkey = data.pusher.ids.pushkey.as_str();
@ -84,6 +86,12 @@ impl Service {
set_pusher::v3::PusherAction::Delete(ids) => {
let key = (sender, ids.pushkey.as_str());
self.db.senderkey_pusher.del(key);
self.services
.sending
.cleanup_events(None, Some(sender), Some(ids.pushkey.as_str()))
.await
.ok();
},
}

View file

@ -8,7 +8,7 @@ use std::{fmt::Debug, iter::once, sync::Arc};
use async_trait::async_trait;
use conduit::{
err,
debug_warn, err,
utils::{ReadyExt, TryReadyExt},
warn, Result, Server,
};
@ -285,13 +285,34 @@ impl Service {
appservice::send_request(client, registration, request).await
}
/// Cleanup event data
/// Used for instance after we remove an appservice registration
/// Clean up queued sending event data
///
/// Used after we remove an appservice registration or a user deletes a push
/// key
#[tracing::instrument(skip(self), level = "debug")]
pub async fn cleanup_events(&self, appservice_id: String) {
self.db
.delete_all_requests_for(&Destination::Appservice(appservice_id))
.await;
pub async fn cleanup_events(
&self, appservice_id: Option<&str>, user_id: Option<&UserId>, push_key: Option<&str>,
) -> Result {
match (appservice_id, user_id, push_key) {
(None, Some(user_id), Some(push_key)) => {
self.db
.delete_all_requests_for(&Destination::Push(user_id.to_owned(), push_key.to_owned()))
.await;
Ok(())
},
(Some(appservice_id), None, None) => {
self.db
.delete_all_requests_for(&Destination::Appservice(appservice_id.to_owned()))
.await;
Ok(())
},
_ => {
debug_warn!("cleanup_events called with too many or too few arguments");
Ok(())
},
}
}
fn dispatch(&self, msg: Msg) -> Result<()> {