mirror of
https://github.com/girlbossceo/conduwuit.git
synced 2024-12-02 10:18:40 +00:00
Merge branch 'threads' into 'next'
fix: threads get updated properly See merge request famedly/conduit!524
This commit is contained in:
commit
10da9485a5
|
@ -674,7 +674,7 @@ async fn join_room_by_id_helper(
|
||||||
};
|
};
|
||||||
|
|
||||||
let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| {
|
let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| {
|
||||||
warn!("{:?}: {}", value, e);
|
warn!("Invalid PDU in send_join response: {} {:?}", e, value);
|
||||||
Error::BadServerResponse("Invalid PDU in send_join response.")
|
Error::BadServerResponse("Invalid PDU in send_join response.")
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
|
|
@ -445,6 +445,9 @@ pub async fn get_room_event_route(
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut event = (*event).clone();
|
||||||
|
event.add_age()?;
|
||||||
|
|
||||||
Ok(get_room_event::v3::Response {
|
Ok(get_room_event::v3::Response {
|
||||||
event: event.to_room_event(),
|
event: event.to_room_event(),
|
||||||
})
|
})
|
||||||
|
|
|
@ -20,8 +20,9 @@ use ruma::{
|
||||||
StateEventType, TimelineEventType,
|
StateEventType, TimelineEventType,
|
||||||
},
|
},
|
||||||
serde::Raw,
|
serde::Raw,
|
||||||
uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
|
uint, DeviceId, OwnedDeviceId, OwnedEventId, OwnedUserId, RoomId, UInt, UserId,
|
||||||
};
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
|
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
|
|
|
@ -813,7 +813,7 @@ pub async fn send_transaction_message_route(
|
||||||
.readreceipt_update(&user_id, &room_id, event)?;
|
.readreceipt_update(&user_id, &room_id, event)?;
|
||||||
} else {
|
} else {
|
||||||
// TODO fetch missing events
|
// TODO fetch missing events
|
||||||
info!("No known event ids in read receipt: {:?}", user_updates);
|
debug!("No known event ids in read receipt: {:?}", user_updates);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1011,7 +1011,7 @@ pub async fn get_backfill_route(
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("server is authenticated");
|
.expect("server is authenticated");
|
||||||
|
|
||||||
info!("Got backfill request from: {}", sender_servername);
|
debug!("Got backfill request from: {}", sender_servername);
|
||||||
|
|
||||||
if !services()
|
if !services()
|
||||||
.rooms
|
.rooms
|
||||||
|
|
|
@ -246,6 +246,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
if pdu.sender != user_id {
|
if pdu.sender != user_id {
|
||||||
pdu.remove_transaction_id()?;
|
pdu.remove_transaction_id()?;
|
||||||
}
|
}
|
||||||
|
pdu.add_age()?;
|
||||||
let count = pdu_count(&pdu_id)?;
|
let count = pdu_count(&pdu_id)?;
|
||||||
Ok((count, pdu))
|
Ok((count, pdu))
|
||||||
}),
|
}),
|
||||||
|
@ -272,6 +273,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
if pdu.sender != user_id {
|
if pdu.sender != user_id {
|
||||||
pdu.remove_transaction_id()?;
|
pdu.remove_transaction_id()?;
|
||||||
}
|
}
|
||||||
|
pdu.add_age()?;
|
||||||
let count = pdu_count(&pdu_id)?;
|
let count = pdu_count(&pdu_id)?;
|
||||||
Ok((count, pdu))
|
Ok((count, pdu))
|
||||||
}),
|
}),
|
||||||
|
|
|
@ -85,6 +85,8 @@ async fn main() {
|
||||||
|
|
||||||
config.warn_deprecated();
|
config.warn_deprecated();
|
||||||
|
|
||||||
|
let log = format!("{},ruma_state_res=error,_=off,sled=off", config.log);
|
||||||
|
|
||||||
if config.allow_jaeger {
|
if config.allow_jaeger {
|
||||||
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
|
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
|
||||||
let tracer = opentelemetry_jaeger::new_agent_pipeline()
|
let tracer = opentelemetry_jaeger::new_agent_pipeline()
|
||||||
|
@ -94,7 +96,7 @@ async fn main() {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
|
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
|
||||||
|
|
||||||
let filter_layer = match EnvFilter::try_new(&config.log) {
|
let filter_layer = match EnvFilter::try_new(&log) {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
|
@ -121,7 +123,7 @@ async fn main() {
|
||||||
} else {
|
} else {
|
||||||
let registry = tracing_subscriber::Registry::default();
|
let registry = tracing_subscriber::Registry::default();
|
||||||
let fmt_layer = tracing_subscriber::fmt::Layer::new();
|
let fmt_layer = tracing_subscriber::fmt::Layer::new();
|
||||||
let filter_layer = match EnvFilter::try_new(&config.log) {
|
let filter_layer = match EnvFilter::try_new(&log) {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
eprintln!("It looks like your config is invalid. The following error occured while parsing it: {e}");
|
eprintln!("It looks like your config is invalid. The following error occured while parsing it: {e}");
|
||||||
|
|
|
@ -103,6 +103,19 @@ impl PduEvent {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn add_age(&mut self) -> crate::Result<()> {
|
||||||
|
let mut unsigned: BTreeMap<String, Box<RawJsonValue>> = self
|
||||||
|
.unsigned
|
||||||
|
.as_ref()
|
||||||
|
.map_or_else(|| Ok(BTreeMap::new()), |u| serde_json::from_str(u.get()))
|
||||||
|
.map_err(|_| Error::bad_database("Invalid unsigned in pdu event"))?;
|
||||||
|
|
||||||
|
unsigned.insert("age".to_owned(), to_raw_value(&1).unwrap());
|
||||||
|
self.unsigned = Some(to_raw_value(&unsigned).expect("unsigned is valid"));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> {
|
pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> {
|
||||||
let mut json = json!({
|
let mut json = json!({
|
||||||
|
|
|
@ -357,7 +357,7 @@ impl Service {
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events
|
// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events
|
||||||
info!(
|
debug!(
|
||||||
"Auth check for {} based on auth events",
|
"Auth check for {} based on auth events",
|
||||||
incoming_pdu.event_id
|
incoming_pdu.event_id
|
||||||
);
|
);
|
||||||
|
@ -419,7 +419,7 @@ impl Service {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Validation successful.");
|
debug!("Validation successful.");
|
||||||
|
|
||||||
// 7. Persist the event as an outlier.
|
// 7. Persist the event as an outlier.
|
||||||
services()
|
services()
|
||||||
|
@ -427,7 +427,7 @@ impl Service {
|
||||||
.outlier
|
.outlier
|
||||||
.add_pdu_outlier(&incoming_pdu.event_id, &val)?;
|
.add_pdu_outlier(&incoming_pdu.event_id, &val)?;
|
||||||
|
|
||||||
info!("Added pdu as outlier.");
|
debug!("Added pdu as outlier.");
|
||||||
|
|
||||||
Ok((Arc::new(incoming_pdu), val))
|
Ok((Arc::new(incoming_pdu), val))
|
||||||
})
|
})
|
||||||
|
@ -476,7 +476,7 @@ impl Service {
|
||||||
// TODO: if we know the prev_events of the incoming event we can avoid the request and build
|
// TODO: if we know the prev_events of the incoming event we can avoid the request and build
|
||||||
// the state from a known point and resolve if > 1 prev_event
|
// the state from a known point and resolve if > 1 prev_event
|
||||||
|
|
||||||
info!("Requesting state at event");
|
debug!("Requesting state at event");
|
||||||
let mut state_at_incoming_event = None;
|
let mut state_at_incoming_event = None;
|
||||||
|
|
||||||
if incoming_pdu.prev_events.len() == 1 {
|
if incoming_pdu.prev_events.len() == 1 {
|
||||||
|
@ -499,7 +499,7 @@ impl Service {
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(Ok(mut state)) = state {
|
if let Some(Ok(mut state)) = state {
|
||||||
info!("Using cached state");
|
debug!("Using cached state");
|
||||||
let prev_pdu = services()
|
let prev_pdu = services()
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
|
@ -523,7 +523,7 @@ impl Service {
|
||||||
state_at_incoming_event = Some(state);
|
state_at_incoming_event = Some(state);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
info!("Calculating state at event using state res");
|
debug!("Calculating state at event using state res");
|
||||||
let mut extremity_sstatehashes = HashMap::new();
|
let mut extremity_sstatehashes = HashMap::new();
|
||||||
|
|
||||||
let mut okay = true;
|
let mut okay = true;
|
||||||
|
@ -632,7 +632,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
if state_at_incoming_event.is_none() {
|
if state_at_incoming_event.is_none() {
|
||||||
info!("Calling /state_ids");
|
debug!("Calling /state_ids");
|
||||||
// Call /state_ids to find out what the state at this pdu is. We trust the server's
|
// Call /state_ids to find out what the state at this pdu is. We trust the server's
|
||||||
// response to some extend, but we still do a lot of checks on the events
|
// response to some extend, but we still do a lot of checks on the events
|
||||||
match services()
|
match services()
|
||||||
|
@ -647,7 +647,7 @@ impl Service {
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(res) => {
|
Ok(res) => {
|
||||||
info!("Fetching state events at event.");
|
debug!("Fetching state events at event.");
|
||||||
let state_vec = self
|
let state_vec = self
|
||||||
.fetch_and_handle_outliers(
|
.fetch_and_handle_outliers(
|
||||||
origin,
|
origin,
|
||||||
|
@ -710,7 +710,7 @@ impl Service {
|
||||||
let state_at_incoming_event =
|
let state_at_incoming_event =
|
||||||
state_at_incoming_event.expect("we always set this to some above");
|
state_at_incoming_event.expect("we always set this to some above");
|
||||||
|
|
||||||
info!("Starting auth check");
|
debug!("Starting auth check");
|
||||||
// 11. Check the auth of the event passes based on the state of the event
|
// 11. Check the auth of the event passes based on the state of the event
|
||||||
let check_result = state_res::event_auth::auth_check(
|
let check_result = state_res::event_auth::auth_check(
|
||||||
&room_version,
|
&room_version,
|
||||||
|
@ -734,7 +734,7 @@ impl Service {
|
||||||
"Event has failed auth check with state at the event.",
|
"Event has failed auth check with state at the event.",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
info!("Auth check succeeded");
|
debug!("Auth check succeeded");
|
||||||
|
|
||||||
// Soft fail check before doing state res
|
// Soft fail check before doing state res
|
||||||
let auth_events = services().rooms.state.get_auth_events(
|
let auth_events = services().rooms.state.get_auth_events(
|
||||||
|
@ -769,7 +769,7 @@ impl Service {
|
||||||
|
|
||||||
// Now we calculate the set of extremities this room has after the incoming event has been
|
// Now we calculate the set of extremities this room has after the incoming event has been
|
||||||
// applied. We start with the previous extremities (aka leaves)
|
// applied. We start with the previous extremities (aka leaves)
|
||||||
info!("Calculating extremities");
|
debug!("Calculating extremities");
|
||||||
let mut extremities = services().rooms.state.get_forward_extremities(room_id)?;
|
let mut extremities = services().rooms.state.get_forward_extremities(room_id)?;
|
||||||
|
|
||||||
// Remove any forward extremities that are referenced by this incoming event's prev_events
|
// Remove any forward extremities that are referenced by this incoming event's prev_events
|
||||||
|
@ -790,7 +790,7 @@ impl Service {
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
info!("Compressing state at event");
|
debug!("Compressing state at event");
|
||||||
let state_ids_compressed = Arc::new(
|
let state_ids_compressed = Arc::new(
|
||||||
state_at_incoming_event
|
state_at_incoming_event
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -804,7 +804,7 @@ impl Service {
|
||||||
);
|
);
|
||||||
|
|
||||||
if incoming_pdu.state_key.is_some() {
|
if incoming_pdu.state_key.is_some() {
|
||||||
info!("Preparing for stateres to derive new room state");
|
debug!("Preparing for stateres to derive new room state");
|
||||||
|
|
||||||
// We also add state after incoming event to the fork states
|
// We also add state after incoming event to the fork states
|
||||||
let mut state_after = state_at_incoming_event.clone();
|
let mut state_after = state_at_incoming_event.clone();
|
||||||
|
@ -822,7 +822,7 @@ impl Service {
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Set the new room state to the resolved state
|
// Set the new room state to the resolved state
|
||||||
info!("Forcing new room state");
|
debug!("Forcing new room state");
|
||||||
|
|
||||||
let (sstatehash, new, removed) = services()
|
let (sstatehash, new, removed) = services()
|
||||||
.rooms
|
.rooms
|
||||||
|
@ -837,7 +837,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it
|
// 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it
|
||||||
info!("Starting soft fail auth check");
|
debug!("Starting soft fail auth check");
|
||||||
|
|
||||||
if soft_fail {
|
if soft_fail {
|
||||||
services().rooms.timeline.append_incoming_pdu(
|
services().rooms.timeline.append_incoming_pdu(
|
||||||
|
@ -861,7 +861,7 @@ impl Service {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Appending pdu to timeline");
|
debug!("Appending pdu to timeline");
|
||||||
extremities.insert(incoming_pdu.event_id.clone());
|
extremities.insert(incoming_pdu.event_id.clone());
|
||||||
|
|
||||||
// Now that the event has passed all auth it is added into the timeline.
|
// Now that the event has passed all auth it is added into the timeline.
|
||||||
|
@ -877,7 +877,7 @@ impl Service {
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
info!("Appended incoming pdu");
|
debug!("Appended incoming pdu");
|
||||||
|
|
||||||
// Event has passed all auth/stateres checks
|
// Event has passed all auth/stateres checks
|
||||||
drop(state_lock);
|
drop(state_lock);
|
||||||
|
@ -890,7 +890,7 @@ impl Service {
|
||||||
room_version_id: &RoomVersionId,
|
room_version_id: &RoomVersionId,
|
||||||
incoming_state: HashMap<u64, Arc<EventId>>,
|
incoming_state: HashMap<u64, Arc<EventId>>,
|
||||||
) -> Result<Arc<HashSet<CompressedStateEvent>>> {
|
) -> Result<Arc<HashSet<CompressedStateEvent>>> {
|
||||||
info!("Loading current room state ids");
|
debug!("Loading current room state ids");
|
||||||
let current_sstatehash = services()
|
let current_sstatehash = services()
|
||||||
.rooms
|
.rooms
|
||||||
.state
|
.state
|
||||||
|
@ -917,7 +917,7 @@ impl Service {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Loading fork states");
|
debug!("Loading fork states");
|
||||||
|
|
||||||
let fork_states: Vec<_> = fork_states
|
let fork_states: Vec<_> = fork_states
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -935,7 +935,7 @@ impl Service {
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
info!("Resolving state");
|
debug!("Resolving state");
|
||||||
|
|
||||||
let lock = services().globals.stateres_mutex.lock();
|
let lock = services().globals.stateres_mutex.lock();
|
||||||
let state = match state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| {
|
let state = match state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| {
|
||||||
|
@ -953,7 +953,7 @@ impl Service {
|
||||||
|
|
||||||
drop(lock);
|
drop(lock);
|
||||||
|
|
||||||
info!("State resolution done. Compressing state");
|
debug!("State resolution done. Compressing state");
|
||||||
|
|
||||||
let new_room_state = state
|
let new_room_state = state
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
|
|
@ -9,7 +9,7 @@ use ruma::{
|
||||||
OwnedServerName,
|
OwnedServerName,
|
||||||
};
|
};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::{error, warn};
|
use tracing::{error, info};
|
||||||
|
|
||||||
#[cfg(feature = "persy")]
|
#[cfg(feature = "persy")]
|
||||||
use persy::PersyError;
|
use persy::PersyError;
|
||||||
|
@ -131,7 +131,7 @@ impl Error {
|
||||||
_ => (Unknown, StatusCode::INTERNAL_SERVER_ERROR),
|
_ => (Unknown, StatusCode::INTERNAL_SERVER_ERROR),
|
||||||
};
|
};
|
||||||
|
|
||||||
warn!("{}: {}", status_code, message);
|
info!("Returning an error: {}: {}", status_code, message);
|
||||||
|
|
||||||
RumaResponse(UiaaResponse::MatrixError(RumaError {
|
RumaResponse(UiaaResponse::MatrixError(RumaError {
|
||||||
body: ErrorBody::Standard { kind, message },
|
body: ErrorBody::Standard { kind, message },
|
||||||
|
|
Loading…
Reference in a new issue