fix: pushers

This commit is contained in:
Timo Kösters 2021-02-11 13:16:14 +01:00
parent e4dc7ea8ac
commit 835cf80acd
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
4 changed files with 63 additions and 63 deletions

View file

@ -35,7 +35,10 @@ impl PushData {
} }
pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> { pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> {
println!("CCCCCCCCCCCCCCCCCCCCCc");
dbg!(&pusher);
let mut key = sender.as_bytes().to_vec(); let mut key = sender.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(pusher.pushkey.as_bytes()); key.extend_from_slice(pusher.pushkey.as_bytes());
// There are 2 kinds of pushers but the spec says: null deletes the pusher. // There are 2 kinds of pushers but the spec says: null deletes the pusher.
@ -48,7 +51,7 @@ impl PushData {
} }
self.senderkey_pusher.insert( self.senderkey_pusher.insert(
key, dbg!(key),
&*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"), &*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"),
)?; )?;
@ -56,11 +59,16 @@ impl PushData {
} }
pub fn get_pusher(&self, sender: &UserId) -> Result<Vec<Pusher>> { pub fn get_pusher(&self, sender: &UserId) -> Result<Vec<Pusher>> {
let mut prefix = sender.as_bytes().to_vec();
prefix.push(0xff);
self.senderkey_pusher self.senderkey_pusher
.scan_prefix(sender.as_bytes()) .scan_prefix(dbg!(prefix))
.values() .values()
.map(|push: std::result::Result<sled::IVec, _>| { .map(|push| {
let push = push.map_err(|_| Error::bad_database("Invalid push bytes in db."))?; println!("DDDDDDDDDDDDDDDDDDDDDDDDDD");
let push =
dbg!(push).map_err(|_| Error::bad_database("Invalid push bytes in db."))?;
Ok(serde_json::from_slice(&*push) Ok(serde_json::from_slice(&*push)
.map_err(|_| Error::bad_database("Invalid Pusher in db."))?) .map_err(|_| Error::bad_database("Invalid Pusher in db."))?)
}) })
@ -85,14 +93,17 @@ where
Error::BadServerResponse("Invalid destination") Error::BadServerResponse("Invalid destination")
})?; })?;
let mut reqwest_request = reqwest::Request::try_from(http_request) let reqwest_request = reqwest::Request::try_from(http_request)
.expect("all http requests are valid reqwest requests"); .expect("all http requests are valid reqwest requests");
// TODO: we could keep this very short and let expo backoff do it's thing... // TODO: we could keep this very short and let expo backoff do it's thing...
*reqwest_request.timeout_mut() = Some(Duration::from_secs(5)); //*reqwest_request.timeout_mut() = Some(Duration::from_secs(5));
let url = reqwest_request.url().clone(); let url = reqwest_request.url().clone();
let reqwest_response = globals.reqwest_client().execute(reqwest_request).await; let reqwest_response = globals
.reqwest_client()
.execute(dbg!(reqwest_request))
.await;
// Because reqwest::Response -> http::Response is complicated: // Because reqwest::Response -> http::Response is complicated:
match reqwest_response { match reqwest_response {
@ -154,6 +165,12 @@ pub async fn send_push_notice(
pdu: &PduEvent, pdu: &PduEvent,
db: &Database, db: &Database,
) -> Result<()> { ) -> Result<()> {
if let Some(msgtype) = pdu.content.get("msgtype").and_then(|b| b.as_str()) {
if msgtype == "m.notice" {
return Ok(());
}
}
for rule in ruleset.into_iter() { for rule in ruleset.into_iter() {
// TODO: can actions contain contradictory Actions // TODO: can actions contain contradictory Actions
if rule if rule
@ -165,7 +182,7 @@ pub async fn send_push_notice(
continue; continue;
} }
match rule.rule_id.as_str() { match dbg!(rule.rule_id.as_str()) {
".m.rule.master" => {} ".m.rule.master" => {}
".m.rule.suppress_notices" => { ".m.rule.suppress_notices" => {
if pdu.kind == EventType::RoomMessage if pdu.kind == EventType::RoomMessage
@ -437,7 +454,8 @@ async fn send_notice(
db: &Database, db: &Database,
name: &str, name: &str,
) -> Result<()> { ) -> Result<()> {
let (http, _emails): (Vec<&Pusher>, _) = pushers println!("BBBBBBBBBBBBBBBr");
let (http, _emails): (Vec<&Pusher>, _) = dbg!(pushers)
.iter() .iter()
.partition(|pusher| pusher.kind == Some(PusherKind::Http)); .partition(|pusher| pusher.kind == Some(PusherKind::Http));
@ -445,7 +463,7 @@ async fn send_notice(
// Two problems with this // Two problems with this
// 1. if "event_id_only" is the only format kind it seems we should never add more info // 1. if "event_id_only" is the only format kind it seems we should never add more info
// 2. can pusher/devices have conflicting formats // 2. can pusher/devices have conflicting formats
for pusher in http { for pusher in dbg!(http) {
let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly); let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly);
let url = if let Some(url) = pusher.data.url.as_ref() { let url = if let Some(url) = pusher.data.url.as_ref() {
url url
@ -484,12 +502,12 @@ async fn send_notice(
if event_id_only { if event_id_only {
error!("SEND PUSH NOTICE `{}`", name); error!("SEND PUSH NOTICE `{}`", name);
// send_request( send_request(
// &db.globals, &db.globals,
// &url, &url,
// send_event_notification::v1::Request::new(notifi), send_event_notification::v1::Request::new(notifi),
// ) )
// .await?; .await?;
} else { } else {
notifi.sender = Some(&event.sender); notifi.sender = Some(&event.sender);
notifi.event_type = Some(&event.kind); notifi.event_type = Some(&event.kind);
@ -512,12 +530,12 @@ async fn send_notice(
notifi.room_name = room_name.as_deref(); notifi.room_name = room_name.as_deref();
error!("SEND PUSH NOTICE Full `{}`", name); error!("SEND PUSH NOTICE Full `{}`", name);
// send_request( send_request(
// &db.globals, &db.globals,
// &url, &url,
// send_event_notification::v1::Request::new(notifi), send_event_notification::v1::Request::new(notifi),
// ) )
// .await?; .await?;
} }
} }

View file

@ -1358,6 +1358,7 @@ impl Rooms {
self.alias_roomid self.alias_roomid
.insert(alias.alias(), room_id.as_bytes())?; .insert(alias.alias(), room_id.as_bytes())?;
let mut aliasid = room_id.as_bytes().to_vec(); let mut aliasid = room_id.as_bytes().to_vec();
aliasid.push(0xff);
aliasid.extend_from_slice(&globals.next_count()?.to_be_bytes()); aliasid.extend_from_slice(&globals.next_count()?.to_be_bytes());
self.aliasid_alias.insert(aliasid, &*alias.alias())?; self.aliasid_alias.insert(aliasid, &*alias.alias())?;
} else { } else {
@ -1370,7 +1371,10 @@ impl Rooms {
"Alias does not exist.", "Alias does not exist.",
))?; ))?;
for key in self.aliasid_alias.scan_prefix(room_id).keys() { let mut prefix = room_id.to_vec();
prefix.push(0xff);
for key in self.aliasid_alias.scan_prefix(prefix).keys() {
self.aliasid_alias.remove(key?)?; self.aliasid_alias.remove(key?)?;
} }
} }

View file

@ -1,7 +1,7 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
convert::TryFrom, convert::TryFrom,
fmt::{Debug, Display, Formatter}, fmt::Debug,
sync::Arc, sync::Arc,
time::{Duration, Instant, SystemTime}, time::{Duration, Instant, SystemTime},
}; };
@ -25,16 +25,6 @@ pub enum OutgoingKind {
Normal(Box<ServerName>), Normal(Box<ServerName>),
} }
impl Display for OutgoingKind {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
OutgoingKind::Appservice(name) => f.write_str(name.as_str()),
OutgoingKind::Normal(name) => f.write_str(name.as_str()),
OutgoingKind::Push(_) => f.write_str("Push notification TODO"),
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Sending { pub struct Sending {
/// The state for a given state hash. /// The state for a given state hash.
@ -143,7 +133,7 @@ impl Sending {
} }
} }
Err((outgoing_kind, e)) => { Err((outgoing_kind, e)) => {
info!("Couldn't send transaction to {}\n{}", outgoing_kind, e); info!("Couldn't send transaction to {:?}\n{}", outgoing_kind, e);
let mut prefix = match &outgoing_kind { let mut prefix = match &outgoing_kind {
OutgoingKind::Appservice(serv) => { OutgoingKind::Appservice(serv) => {
let mut p = b"+".to_vec(); let mut p = b"+".to_vec();
@ -278,6 +268,8 @@ impl Sending {
key.extend_from_slice(pdu_id); key.extend_from_slice(pdu_id);
self.servernamepduids.insert(key, b"")?; self.servernamepduids.insert(key, b"")?;
println!("AAAA");
Ok(()) Ok(())
} }
@ -306,7 +298,7 @@ impl Sending {
pdu_ids: Vec<IVec>, pdu_ids: Vec<IVec>,
db: &Database, db: &Database,
) -> std::result::Result<OutgoingKind, (OutgoingKind, Error)> { ) -> std::result::Result<OutgoingKind, (OutgoingKind, Error)> {
match kind { match dbg!(kind) {
OutgoingKind::Appservice(server) => { OutgoingKind::Appservice(server) => {
let pdu_jsons = pdu_ids let pdu_jsons = pdu_ids
.iter() .iter()
@ -364,25 +356,12 @@ impl Sending {
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
for pdu in &pdus { for pdu in dbg!(&pdus) {
// Redacted events are not notification targets (we don't send push for them) // Redacted events are not notification targets (we don't send push for them)
if pdu.unsigned.get("redacted_because").is_some() { if pdu.unsigned.get("redacted_because").is_some() {
continue; continue;
} }
// Skip events that came from the admin room
if db
.rooms
.room_aliases(&pdu.room_id)
.any(|alias| match alias {
Ok(a) => a.as_str().starts_with("#admins:"),
_ => false,
})
|| pdu.sender.as_str().starts_with("@conduit:")
{
continue;
}
for user in db.rooms.room_members(&pdu.room_id) { for user in db.rooms.room_members(&pdu.room_id) {
let user = user.map_err(|e| (OutgoingKind::Push(id.clone()), e))?; let user = user.map_err(|e| (OutgoingKind::Push(id.clone()), e))?;
@ -391,9 +370,7 @@ impl Sending {
continue; continue;
} }
let pushers = db let pushers = dbg!(db.pusher.get_pusher(&user))
.pusher
.get_pusher(&user)
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?; .map_err(|e| (OutgoingKind::Push(id.clone()), e))?;
let rules_for_user = db let rules_for_user = db
@ -426,15 +403,17 @@ impl Sending {
uint!(0) uint!(0)
}; };
crate::database::pusher::send_push_notice( dbg!(
&user, crate::database::pusher::send_push_notice(
unread, &user,
&pushers, unread,
rules_for_user, &pushers,
pdu, rules_for_user,
db, pdu,
db,
)
.await
) )
.await
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?; .map_err(|e| (OutgoingKind::Push(id.clone()), e))?;
} }
} }

View file

@ -1,6 +1,6 @@
use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma}; use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma};
use http::header::{HeaderValue, AUTHORIZATION, HOST}; use http::header::{HeaderValue, AUTHORIZATION, HOST};
use log::{info, warn}; use log::{error, info, warn};
use rocket::{get, post, put, response::content::Json, State}; use rocket::{get, post, put, response::content::Json, State};
use ruma::{ use ruma::{
api::{ api::{
@ -17,7 +17,6 @@ use ruma::{
OutgoingRequest, OutgoingRequest,
}, },
directory::{IncomingFilter, IncomingRoomNetwork}, directory::{IncomingFilter, IncomingRoomNetwork},
events::EventType,
serde::to_canonical_value, serde::to_canonical_value,
signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap}, signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap},
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,