Merge branch 'make-join' into 'master'
feat: make_join, send_join and /directory See merge request famedly/conduit!59
This commit is contained in:
commit
0b89ffe566
|
@ -564,7 +564,7 @@ async fn join_room_by_id_helper(
|
||||||
pdu_id.extend_from_slice(&count.to_be_bytes());
|
pdu_id.extend_from_slice(&count.to_be_bytes());
|
||||||
|
|
||||||
let pdu = PduEvent::from_id_val(&event_id, join_event.clone())
|
let pdu = PduEvent::from_id_val(&event_id, join_event.clone())
|
||||||
.map_err(|_| Error::BadServerResponse("Invalid PDU in send_join response."))?;
|
.map_err(|_| Error::BadServerResponse("Invalid join event PDU."))?;
|
||||||
|
|
||||||
let mut state = BTreeMap::new();
|
let mut state = BTreeMap::new();
|
||||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||||
|
@ -588,7 +588,7 @@ async fn join_room_by_id_helper(
|
||||||
Error::BadServerResponse("Invalid PDU in send_join response.")
|
Error::BadServerResponse("Invalid PDU in send_join response.")
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
db.rooms.add_pdu_outlier(&pdu)?;
|
db.rooms.add_pdu_outlier(&event_id, &value)?;
|
||||||
if let Some(state_key) = &pdu.state_key {
|
if let Some(state_key) = &pdu.state_key {
|
||||||
if pdu.kind == EventType::RoomMember {
|
if pdu.kind == EventType::RoomMember {
|
||||||
let target_user_id = UserId::try_from(state_key.clone()).map_err(|e| {
|
let target_user_id = UserId::try_from(state_key.clone()).map_err(|e| {
|
||||||
|
@ -632,7 +632,11 @@ async fn join_room_by_id_helper(
|
||||||
pdu.event_id.clone(),
|
pdu.event_id.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
db.rooms.force_state(room_id, state, &db.globals)?;
|
if state.get(&(EventType::RoomCreate, "".to_owned())).is_none() {
|
||||||
|
return Err(Error::BadServerResponse("State contained no create event."));
|
||||||
|
}
|
||||||
|
|
||||||
|
db.rooms.force_state(room_id, state, &db)?;
|
||||||
|
|
||||||
for result in futures::future::join_all(
|
for result in futures::future::join_all(
|
||||||
send_join_response
|
send_join_response
|
||||||
|
@ -648,11 +652,7 @@ async fn join_room_by_id_helper(
|
||||||
Err(_) => continue,
|
Err(_) => continue,
|
||||||
};
|
};
|
||||||
|
|
||||||
let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| {
|
db.rooms.add_pdu_outlier(&event_id, &value)?;
|
||||||
warn!("{:?}: {}", value, e);
|
|
||||||
Error::BadServerResponse("Invalid PDU in send_join response.")
|
|
||||||
})?;
|
|
||||||
db.rooms.add_pdu_outlier(&pdu)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We append to state before appending the pdu, so we don't have a moment in time with the
|
// We append to state before appending the pdu, so we don't have a moment in time with the
|
||||||
|
|
|
@ -241,7 +241,7 @@ impl Rooms {
|
||||||
kind: &EventType,
|
kind: &EventType,
|
||||||
sender: &UserId,
|
sender: &UserId,
|
||||||
state_key: Option<&str>,
|
state_key: Option<&str>,
|
||||||
content: serde_json::Value,
|
content: &serde_json::Value,
|
||||||
) -> Result<StateMap<Arc<PduEvent>>> {
|
) -> Result<StateMap<Arc<PduEvent>>> {
|
||||||
let auth_events = state_res::auth_types_for_event(
|
let auth_events = state_res::auth_types_for_event(
|
||||||
kind,
|
kind,
|
||||||
|
@ -295,7 +295,7 @@ impl Rooms {
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
state: BTreeMap<(EventType, String), EventId>,
|
state: BTreeMap<(EventType, String), EventId>,
|
||||||
globals: &super::globals::Globals,
|
db: &Database,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let state_hash = self.calculate_hash(
|
let state_hash = self.calculate_hash(
|
||||||
&state
|
&state
|
||||||
|
@ -304,57 +304,109 @@ impl Rooms {
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? {
|
let (shortstatehash, already_existed) =
|
||||||
Some(shortstatehash) => {
|
match self.statehash_shortstatehash.get(&state_hash)? {
|
||||||
// State already existed in db
|
Some(shortstatehash) => (
|
||||||
self.roomid_shortstatehash
|
utils::u64_from_bytes(&shortstatehash)
|
||||||
.insert(room_id.as_bytes(), &*shortstatehash)?;
|
.map_err(|_| Error::bad_database("Invalid shortstatehash in db."))?,
|
||||||
return Ok(());
|
true,
|
||||||
}
|
),
|
||||||
None => {
|
None => {
|
||||||
let shortstatehash = globals.next_count()?;
|
let shortstatehash = db.globals.next_count()?;
|
||||||
self.statehash_shortstatehash
|
self.statehash_shortstatehash
|
||||||
.insert(&state_hash, &shortstatehash.to_be_bytes())?;
|
.insert(&state_hash, &shortstatehash.to_be_bytes())?;
|
||||||
shortstatehash.to_be_bytes().to_vec()
|
(shortstatehash, false)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_state = if !already_existed {
|
||||||
|
let mut new_state = HashSet::new();
|
||||||
|
|
||||||
|
for ((event_type, state_key), eventid) in state {
|
||||||
|
new_state.insert(eventid.clone());
|
||||||
|
|
||||||
|
let mut statekey = event_type.as_ref().as_bytes().to_vec();
|
||||||
|
statekey.push(0xff);
|
||||||
|
statekey.extend_from_slice(&state_key.as_bytes());
|
||||||
|
|
||||||
|
let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? {
|
||||||
|
Some(shortstatekey) => shortstatekey.to_vec(),
|
||||||
|
None => {
|
||||||
|
let shortstatekey = db.globals.next_count()?;
|
||||||
|
self.statekey_shortstatekey
|
||||||
|
.insert(&statekey, &shortstatekey.to_be_bytes())?;
|
||||||
|
shortstatekey.to_be_bytes().to_vec()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let shorteventid = match self.eventid_shorteventid.get(eventid.as_bytes())? {
|
||||||
|
Some(shorteventid) => shorteventid.to_vec(),
|
||||||
|
None => {
|
||||||
|
let shorteventid = db.globals.next_count()?;
|
||||||
|
self.eventid_shorteventid
|
||||||
|
.insert(eventid.as_bytes(), &shorteventid.to_be_bytes())?;
|
||||||
|
self.shorteventid_eventid
|
||||||
|
.insert(&shorteventid.to_be_bytes(), eventid.as_bytes())?;
|
||||||
|
shorteventid.to_be_bytes().to_vec()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut state_id = shortstatehash.to_be_bytes().to_vec();
|
||||||
|
state_id.extend_from_slice(&shortstatekey);
|
||||||
|
|
||||||
|
self.stateid_shorteventid
|
||||||
|
.insert(&state_id, &*shorteventid)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
new_state
|
||||||
|
} else {
|
||||||
|
self.state_full_ids(shortstatehash)?.into_iter().collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
for ((event_type, state_key), eventid) in state {
|
let old_state = self
|
||||||
let mut statekey = event_type.as_ref().as_bytes().to_vec();
|
.current_shortstatehash(&room_id)?
|
||||||
statekey.push(0xff);
|
.map(|s| self.state_full_ids(s))
|
||||||
statekey.extend_from_slice(&state_key.as_bytes());
|
.transpose()?
|
||||||
|
.map(|vec| vec.into_iter().collect::<HashSet<_>>())
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? {
|
for event_id in new_state.difference(&old_state) {
|
||||||
Some(shortstatekey) => shortstatekey.to_vec(),
|
if let Some(pdu) = self.get_pdu_json(event_id)? {
|
||||||
None => {
|
if pdu.get("event_type")
|
||||||
let shortstatekey = globals.next_count()?;
|
== Some(&CanonicalJsonValue::String("m.room.member".to_owned()))
|
||||||
self.statekey_shortstatekey
|
{
|
||||||
.insert(&statekey, &shortstatekey.to_be_bytes())?;
|
if let Ok(pdu) = serde_json::from_value::<PduEvent>(
|
||||||
shortstatekey.to_be_bytes().to_vec()
|
serde_json::to_value(&pdu).expect("CanonicalJsonObj is a valid JsonValue"),
|
||||||
|
) {
|
||||||
|
if let Some(membership) =
|
||||||
|
pdu.content.get("membership").and_then(|membership| {
|
||||||
|
serde_json::from_value::<member::MembershipState>(
|
||||||
|
membership.clone(),
|
||||||
|
)
|
||||||
|
.ok()
|
||||||
|
})
|
||||||
|
{
|
||||||
|
if let Some(state_key) = pdu
|
||||||
|
.state_key
|
||||||
|
.and_then(|state_key| UserId::try_from(state_key).ok())
|
||||||
|
{
|
||||||
|
self.update_membership(
|
||||||
|
room_id,
|
||||||
|
&state_key,
|
||||||
|
membership,
|
||||||
|
&pdu.sender,
|
||||||
|
None,
|
||||||
|
db,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
let shorteventid = match self.eventid_shorteventid.get(eventid.as_bytes())? {
|
|
||||||
Some(shorteventid) => shorteventid.to_vec(),
|
|
||||||
None => {
|
|
||||||
let shorteventid = globals.next_count()?;
|
|
||||||
self.eventid_shorteventid
|
|
||||||
.insert(eventid.as_bytes(), &shorteventid.to_be_bytes())?;
|
|
||||||
self.shorteventid_eventid
|
|
||||||
.insert(&shorteventid.to_be_bytes(), eventid.as_bytes())?;
|
|
||||||
shorteventid.to_be_bytes().to_vec()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut state_id = shortstatehash.clone();
|
|
||||||
state_id.extend_from_slice(&shortstatekey);
|
|
||||||
|
|
||||||
self.stateid_shorteventid
|
|
||||||
.insert(&*state_id, &*shorteventid)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self.roomid_shortstatehash
|
self.roomid_shortstatehash
|
||||||
.insert(room_id.as_bytes(), &*shortstatehash)?;
|
.insert(room_id.as_bytes(), &shortstatehash.to_be_bytes())?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -591,10 +643,10 @@ impl Rooms {
|
||||||
/// Append the PDU as an outlier.
|
/// Append the PDU as an outlier.
|
||||||
///
|
///
|
||||||
/// Any event given to this will be processed (state-res) on another thread.
|
/// Any event given to this will be processed (state-res) on another thread.
|
||||||
pub fn add_pdu_outlier(&self, pdu: &PduEvent) -> Result<()> {
|
pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()> {
|
||||||
self.eventid_outlierpdu.insert(
|
self.eventid_outlierpdu.insert(
|
||||||
&pdu.event_id.as_bytes(),
|
&event_id.as_bytes(),
|
||||||
&*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"),
|
&*serde_json::to_string(&pdu).expect("CanonicalJsonObject is valid string"),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1193,7 +1245,7 @@ impl Rooms {
|
||||||
&event_type,
|
&event_type,
|
||||||
&sender,
|
&sender,
|
||||||
state_key.as_deref(),
|
state_key.as_deref(),
|
||||||
content.clone(),
|
&content,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// Our depth is the maximum depth of prev_events + 1
|
// Our depth is the maximum depth of prev_events + 1
|
||||||
|
|
|
@ -168,7 +168,10 @@ fn setup_rocket() -> (rocket::Rocket, Config) {
|
||||||
server_server::get_event_route,
|
server_server::get_event_route,
|
||||||
server_server::get_missing_events_route,
|
server_server::get_missing_events_route,
|
||||||
server_server::get_room_state_ids_route,
|
server_server::get_room_state_ids_route,
|
||||||
|
server_server::create_join_event_template_route,
|
||||||
|
server_server::create_join_event_route,
|
||||||
server_server::create_invite_route,
|
server_server::create_invite_route,
|
||||||
|
server_server::get_room_information_route,
|
||||||
server_server::get_profile_information_route,
|
server_server::get_profile_information_route,
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
@ -14,25 +14,32 @@ use ruma::{
|
||||||
VerifyKey,
|
VerifyKey,
|
||||||
},
|
},
|
||||||
event::{get_event, get_missing_events, get_room_state_ids},
|
event::{get_event, get_missing_events, get_room_state_ids},
|
||||||
membership::create_invite,
|
membership::{
|
||||||
query::get_profile_information,
|
create_invite,
|
||||||
|
create_join_event::{self, RoomState},
|
||||||
|
create_join_event_template,
|
||||||
|
},
|
||||||
|
query::{get_profile_information, get_room_information},
|
||||||
transactions::send_transaction_message,
|
transactions::send_transaction_message,
|
||||||
},
|
},
|
||||||
IncomingResponse, OutgoingRequest, OutgoingResponse,
|
IncomingResponse, OutgoingRequest, OutgoingResponse,
|
||||||
},
|
},
|
||||||
directory::{IncomingFilter, IncomingRoomNetwork},
|
directory::{IncomingFilter, IncomingRoomNetwork},
|
||||||
events::{
|
events::{
|
||||||
room::{create::CreateEventContent, member::MembershipState},
|
room::{
|
||||||
|
create::CreateEventContent,
|
||||||
|
member::{MemberEventContent, MembershipState},
|
||||||
|
},
|
||||||
EventType,
|
EventType,
|
||||||
},
|
},
|
||||||
serde::{to_canonical_value, Raw},
|
serde::{to_canonical_value, Raw},
|
||||||
signatures::CanonicalJsonValue,
|
signatures::{CanonicalJsonObject, CanonicalJsonValue},
|
||||||
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,
|
uint, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,
|
||||||
};
|
};
|
||||||
use state_res::{Event, EventMap, StateMap};
|
use state_res::{Event, EventMap, StateMap};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{btree_map::Entry, BTreeMap, BTreeSet, HashSet},
|
collections::{btree_map::Entry, BTreeMap, BTreeSet, HashSet},
|
||||||
convert::TryFrom,
|
convert::{TryFrom, TryInto},
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
future::Future,
|
future::Future,
|
||||||
net::{IpAddr, SocketAddr},
|
net::{IpAddr, SocketAddr},
|
||||||
|
@ -589,8 +596,8 @@ pub async fn send_transaction_message_route<'a>(
|
||||||
Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into())
|
Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An async function that can recursively calls itself.
|
/// An async function that can recursively call itself.
|
||||||
type AsyncRecursiveResult<'a, T> = Pin<Box<dyn Future<Output = StdResult<T, String>> + 'a + Send>>;
|
type AsyncRecursiveResult<'a, T, E> = Pin<Box<dyn Future<Output = StdResult<T, E>> + 'a + Send>>;
|
||||||
|
|
||||||
/// When receiving an event one needs to:
|
/// When receiving an event one needs to:
|
||||||
/// 0. Skip the PDU if we already know about it
|
/// 0. Skip the PDU if we already know about it
|
||||||
|
@ -624,13 +631,13 @@ fn handle_incoming_pdu<'a>(
|
||||||
db: &'a Database,
|
db: &'a Database,
|
||||||
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
||||||
auth_cache: &'a mut EventMap<Arc<PduEvent>>,
|
auth_cache: &'a mut EventMap<Arc<PduEvent>>,
|
||||||
) -> AsyncRecursiveResult<'a, Arc<PduEvent>> {
|
) -> AsyncRecursiveResult<'a, Option<Vec<u8>>, String> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
|
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
|
||||||
|
|
||||||
// 0. Skip the PDU if we already know about it
|
// 0. Skip the PDU if we already have it as a timeline event
|
||||||
if let Ok(Some(pdu)) = db.rooms.get_non_outlier_pdu(&event_id) {
|
if let Ok(Some(pdu_id)) = db.rooms.get_pdu_id(&event_id) {
|
||||||
return Ok(Arc::new(pdu));
|
return Ok(Some(pdu_id.to_vec()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1. Check the server is in the room
|
// 1. Check the server is in the room
|
||||||
|
@ -690,6 +697,7 @@ fn handle_incoming_pdu<'a>(
|
||||||
}
|
}
|
||||||
Ok(ruma::signatures::Verified::Signatures) => {
|
Ok(ruma::signatures::Verified::Signatures) => {
|
||||||
// Redact
|
// Redact
|
||||||
|
warn!("Calculated hash does not match: {}", event_id);
|
||||||
match ruma::signatures::redact(&value, &room_version) {
|
match ruma::signatures::redact(&value, &room_version) {
|
||||||
Ok(obj) => obj,
|
Ok(obj) => obj,
|
||||||
Err(_) => return Err("Redaction failed".to_string()),
|
Err(_) => return Err("Redaction failed".to_string()),
|
||||||
|
@ -705,7 +713,7 @@ fn handle_incoming_pdu<'a>(
|
||||||
to_canonical_value(&event_id).expect("EventId is a valid CanonicalJsonValue"),
|
to_canonical_value(&event_id).expect("EventId is a valid CanonicalJsonValue"),
|
||||||
);
|
);
|
||||||
let incoming_pdu = serde_json::from_value::<PduEvent>(
|
let incoming_pdu = serde_json::from_value::<PduEvent>(
|
||||||
serde_json::to_value(val).expect("CanonicalJsonObj is a valid JsonValue"),
|
serde_json::to_value(&val).expect("CanonicalJsonObj is a valid JsonValue"),
|
||||||
)
|
)
|
||||||
.map_err(|_| "Event is not a valid PDU.".to_string())?;
|
.map_err(|_| "Event is not a valid PDU.".to_string())?;
|
||||||
|
|
||||||
|
@ -793,13 +801,13 @@ fn handle_incoming_pdu<'a>(
|
||||||
|
|
||||||
// 7. Persist the event as an outlier.
|
// 7. Persist the event as an outlier.
|
||||||
db.rooms
|
db.rooms
|
||||||
.add_pdu_outlier(&incoming_pdu)
|
.add_pdu_outlier(&incoming_pdu.event_id, &val)
|
||||||
.map_err(|_| "Failed to add pdu as outlier.".to_owned())?;
|
.map_err(|_| "Failed to add pdu as outlier.".to_owned())?;
|
||||||
debug!("Added pdu as outlier.");
|
debug!("Added pdu as outlier.");
|
||||||
|
|
||||||
// 8. if not timeline event: stop
|
// 8. if not timeline event: stop
|
||||||
if !is_timeline_event {
|
if !is_timeline_event {
|
||||||
return Ok(incoming_pdu);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
// TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
||||||
|
@ -811,7 +819,54 @@ fn handle_incoming_pdu<'a>(
|
||||||
// the state from a known point and resolve if > 1 prev_event
|
// the state from a known point and resolve if > 1 prev_event
|
||||||
|
|
||||||
debug!("Requesting state at event.");
|
debug!("Requesting state at event.");
|
||||||
let (state_at_incoming_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) =
|
let mut state_at_incoming_event = None;
|
||||||
|
let mut incoming_auth_events = Vec::new();
|
||||||
|
|
||||||
|
if incoming_pdu.prev_events.len() == 1 {
|
||||||
|
let prev_event = &incoming_pdu.prev_events[0];
|
||||||
|
let state_vec = db
|
||||||
|
.rooms
|
||||||
|
.pdu_shortstatehash(prev_event)
|
||||||
|
.map_err(|_| "Failed talking to db".to_owned())?
|
||||||
|
.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash).ok())
|
||||||
|
.flatten();
|
||||||
|
if let Some(mut state_vec) = state_vec {
|
||||||
|
if db
|
||||||
|
.rooms
|
||||||
|
.get_pdu(prev_event)
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.ok_or_else(|| "Could not find prev event, but we know the state.".to_owned())?
|
||||||
|
.state_key
|
||||||
|
.is_some()
|
||||||
|
{
|
||||||
|
state_vec.push(prev_event.clone());
|
||||||
|
}
|
||||||
|
state_at_incoming_event = Some(
|
||||||
|
fetch_and_handle_events(db, origin, &state_vec, pub_key_map, auth_cache)
|
||||||
|
.await
|
||||||
|
.map_err(|_| "Failed to fetch state events locally".to_owned())?
|
||||||
|
.into_iter()
|
||||||
|
.map(|pdu| {
|
||||||
|
(
|
||||||
|
(
|
||||||
|
pdu.kind.clone(),
|
||||||
|
pdu.state_key
|
||||||
|
.clone()
|
||||||
|
.expect("events from state_full_ids are state events"),
|
||||||
|
),
|
||||||
|
pdu,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
&state_at_incoming_event;
|
||||||
|
|
||||||
|
// TODO: set incoming_auth_events?
|
||||||
|
}
|
||||||
|
|
||||||
|
if state_at_incoming_event.is_none() {
|
||||||
// Call /state_ids to find out what the state at this pdu is. We trust the server's
|
// Call /state_ids to find out what the state at this pdu is. We trust the server's
|
||||||
// response to some extend, but we still do a lot of checks on the events
|
// response to some extend, but we still do a lot of checks on the events
|
||||||
match db
|
match db
|
||||||
|
@ -856,12 +911,16 @@ fn handle_incoming_pdu<'a>(
|
||||||
}
|
}
|
||||||
|
|
||||||
// The original create event must still be in the state
|
// The original create event must still be in the state
|
||||||
if state.get(&(EventType::RoomCreate, "".to_owned())).map(|a| a.as_ref()) != Some(&create_event) {
|
if state
|
||||||
|
.get(&(EventType::RoomCreate, "".to_owned()))
|
||||||
|
.map(|a| a.as_ref())
|
||||||
|
!= Some(&create_event)
|
||||||
|
{
|
||||||
return Err("Incoming event refers to wrong create event.".to_owned());
|
return Err("Incoming event refers to wrong create event.".to_owned());
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Fetching auth chain events at event.");
|
debug!("Fetching auth chain events at event.");
|
||||||
let incoming_auth_events = match fetch_and_handle_events(
|
incoming_auth_events = match fetch_and_handle_events(
|
||||||
&db,
|
&db,
|
||||||
origin,
|
origin,
|
||||||
&res.auth_chain_ids,
|
&res.auth_chain_ids,
|
||||||
|
@ -874,12 +933,16 @@ fn handle_incoming_pdu<'a>(
|
||||||
Err(_) => return Err("Failed to fetch auth chain.".to_owned()),
|
Err(_) => return Err("Failed to fetch auth chain.".to_owned()),
|
||||||
};
|
};
|
||||||
|
|
||||||
(state, incoming_auth_events)
|
state_at_incoming_event = Some(state);
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
return Err("Fetching state for event failed".into());
|
return Err("Fetching state for event failed".into());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let state_at_incoming_event =
|
||||||
|
state_at_incoming_event.expect("we always set this to some above");
|
||||||
|
|
||||||
// 11. Check the auth of the event passes based on the state of the event
|
// 11. Check the auth of the event passes based on the state of the event
|
||||||
if !state_res::event_auth::auth_check(
|
if !state_res::event_auth::auth_check(
|
||||||
|
@ -1079,20 +1142,26 @@ fn handle_incoming_pdu<'a>(
|
||||||
// Now that the event has passed all auth it is added into the timeline.
|
// Now that the event has passed all auth it is added into the timeline.
|
||||||
// We use the `state_at_event` instead of `state_after` so we accurately
|
// We use the `state_at_event` instead of `state_after` so we accurately
|
||||||
// represent the state for this event.
|
// represent the state for this event.
|
||||||
append_incoming_pdu(&db, &incoming_pdu, extremities, &state_at_incoming_event)
|
let pdu_id = append_incoming_pdu(
|
||||||
.map_err(|_| "Failed to add pdu to db.".to_owned())?;
|
&db,
|
||||||
|
&incoming_pdu,
|
||||||
|
val,
|
||||||
|
extremities,
|
||||||
|
&state_at_incoming_event,
|
||||||
|
)
|
||||||
|
.map_err(|_| "Failed to add pdu to db.".to_owned())?;
|
||||||
debug!("Appended incoming pdu.");
|
debug!("Appended incoming pdu.");
|
||||||
|
|
||||||
// Set the new room state to the resolved state
|
// Set the new room state to the resolved state
|
||||||
if update_state {
|
if update_state {
|
||||||
db.rooms
|
db.rooms
|
||||||
.force_state(&room_id, new_room_state, &db.globals)
|
.force_state(&room_id, new_room_state, &db)
|
||||||
.map_err(|_| "Failed to set new room state.".to_owned())?;
|
.map_err(|_| "Failed to set new room state.".to_owned())?;
|
||||||
}
|
}
|
||||||
debug!("Updated resolved state");
|
debug!("Updated resolved state");
|
||||||
|
|
||||||
// Event has passed all auth/stateres checks
|
// Event has passed all auth/stateres checks
|
||||||
Ok(incoming_pdu)
|
Ok(Some(pdu_id))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1108,77 +1177,93 @@ fn handle_incoming_pdu<'a>(
|
||||||
/// If the event is unknown to the `auth_cache` it is added. This guarantees that any
|
/// If the event is unknown to the `auth_cache` it is added. This guarantees that any
|
||||||
/// event we need to know of will be present.
|
/// event we need to know of will be present.
|
||||||
//#[tracing::instrument(skip(db, key_map, auth_cache))]
|
//#[tracing::instrument(skip(db, key_map, auth_cache))]
|
||||||
pub(crate) async fn fetch_and_handle_events(
|
pub(crate) fn fetch_and_handle_events<'a>(
|
||||||
db: &Database,
|
db: &'a Database,
|
||||||
origin: &ServerName,
|
origin: &'a ServerName,
|
||||||
events: &[EventId],
|
events: &'a [EventId],
|
||||||
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
||||||
auth_cache: &mut EventMap<Arc<PduEvent>>,
|
auth_cache: &'a mut EventMap<Arc<PduEvent>>,
|
||||||
) -> Result<Vec<Arc<PduEvent>>> {
|
) -> AsyncRecursiveResult<'a, Vec<Arc<PduEvent>>, Error> {
|
||||||
let mut pdus = vec![];
|
Box::pin(async move {
|
||||||
for id in events {
|
let mut pdus = vec![];
|
||||||
// a. Look at auth cache
|
for id in events {
|
||||||
let pdu = match auth_cache.get(id) {
|
// a. Look at auth cache
|
||||||
Some(pdu) => {
|
let pdu =
|
||||||
debug!("Found {} in cache", id);
|
match auth_cache.get(id) {
|
||||||
pdu.clone()
|
Some(pdu) => {
|
||||||
}
|
debug!("Found {} in cache", id);
|
||||||
// b. Look in the main timeline (pduid_pdu tree)
|
// We already have the auth chain for events in cache
|
||||||
// c. Look at outlier pdu tree
|
pdu.clone()
|
||||||
// (get_pdu checks both)
|
}
|
||||||
None => match db.rooms.get_pdu(&id)? {
|
// b. Look in the main timeline (pduid_pdu tree)
|
||||||
Some(pdu) => {
|
// c. Look at outlier pdu tree
|
||||||
debug!("Found {} in outliers", id);
|
// (get_pdu checks both)
|
||||||
Arc::new(pdu)
|
None => match db.rooms.get_pdu(&id)? {
|
||||||
}
|
Some(pdu) => {
|
||||||
None => {
|
debug!("Found {} in db", id);
|
||||||
// d. Ask origin server over federation
|
// We need to fetch the auth chain
|
||||||
debug!("Fetching {} over federation.", id);
|
let _ = fetch_and_handle_events(
|
||||||
match db
|
|
||||||
.sending
|
|
||||||
.send_federation_request(
|
|
||||||
&db.globals,
|
|
||||||
origin,
|
|
||||||
get_event::v1::Request { event_id: &id },
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(res) => {
|
|
||||||
debug!("Got {} over federation: {:?}", id, res);
|
|
||||||
let (event_id, value) =
|
|
||||||
crate::pdu::gen_event_id_canonical_json(&res.pdu)?;
|
|
||||||
let pdu = match handle_incoming_pdu(
|
|
||||||
origin,
|
|
||||||
&event_id,
|
|
||||||
value,
|
|
||||||
false,
|
|
||||||
db,
|
db,
|
||||||
|
origin,
|
||||||
|
&pdu.auth_events,
|
||||||
pub_key_map,
|
pub_key_map,
|
||||||
auth_cache,
|
auth_cache,
|
||||||
)
|
)
|
||||||
.await
|
.await?;
|
||||||
|
Arc::new(pdu)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// d. Ask origin server over federation
|
||||||
|
debug!("Fetching {} over federation.", id);
|
||||||
|
match db
|
||||||
|
.sending
|
||||||
|
.send_federation_request(
|
||||||
|
&db.globals,
|
||||||
|
origin,
|
||||||
|
get_event::v1::Request { event_id: &id },
|
||||||
|
)
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
Ok(pdu) => pdu,
|
Ok(res) => {
|
||||||
Err(e) => {
|
debug!("Got {} over federation: {:?}", id, res);
|
||||||
warn!("Authentication of event {} failed: {:?}", id, e);
|
let (event_id, value) =
|
||||||
|
crate::pdu::gen_event_id_canonical_json(&res.pdu)?;
|
||||||
|
// This will also fetch the auth chain
|
||||||
|
match handle_incoming_pdu(
|
||||||
|
origin,
|
||||||
|
&event_id,
|
||||||
|
value.clone(),
|
||||||
|
false,
|
||||||
|
db,
|
||||||
|
pub_key_map,
|
||||||
|
auth_cache,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => Arc::new(serde_json::from_value(
|
||||||
|
serde_json::to_value(value)
|
||||||
|
.expect("canonicaljsonobject is valid value"),
|
||||||
|
)
|
||||||
|
.expect("This is possible because handle_incoming_pdu worked")),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Authentication of event {} failed: {:?}", id, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
warn!("Failed to fetch event: {}", id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
pdu
|
|
||||||
}
|
}
|
||||||
Err(_) => {
|
},
|
||||||
warn!("Failed to fetch event: {}", id);
|
};
|
||||||
continue;
|
auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone());
|
||||||
}
|
pdus.push(pdu);
|
||||||
}
|
}
|
||||||
}
|
Ok(pdus)
|
||||||
},
|
})
|
||||||
};
|
|
||||||
auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone());
|
|
||||||
pdus.push(pdu);
|
|
||||||
}
|
|
||||||
Ok(pdus)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Search the DB for the signing keys of the given server, if we don't have them
|
/// Search the DB for the signing keys of the given server, if we don't have them
|
||||||
|
@ -1280,9 +1365,10 @@ pub(crate) async fn fetch_signing_keys(
|
||||||
pub(crate) fn append_incoming_pdu(
|
pub(crate) fn append_incoming_pdu(
|
||||||
db: &Database,
|
db: &Database,
|
||||||
pdu: &PduEvent,
|
pdu: &PduEvent,
|
||||||
|
pdu_json: CanonicalJsonObject,
|
||||||
new_room_leaves: HashSet<EventId>,
|
new_room_leaves: HashSet<EventId>,
|
||||||
state: &StateMap<Arc<PduEvent>>,
|
state: &StateMap<Arc<PduEvent>>,
|
||||||
) -> Result<()> {
|
) -> Result<Vec<u8>> {
|
||||||
let count = db.globals.next_count()?;
|
let count = db.globals.next_count()?;
|
||||||
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
|
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
|
||||||
pdu_id.push(0xff);
|
pdu_id.push(0xff);
|
||||||
|
@ -1295,7 +1381,7 @@ pub(crate) fn append_incoming_pdu(
|
||||||
|
|
||||||
db.rooms.append_pdu(
|
db.rooms.append_pdu(
|
||||||
pdu,
|
pdu,
|
||||||
utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
|
pdu_json,
|
||||||
count,
|
count,
|
||||||
pdu_id.clone().into(),
|
pdu_id.clone().into(),
|
||||||
&new_room_leaves.into_iter().collect::<Vec<_>>(),
|
&new_room_leaves.into_iter().collect::<Vec<_>>(),
|
||||||
|
@ -1366,7 +1452,7 @@ pub(crate) fn append_incoming_pdu(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(pdu_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg_attr(
|
#[cfg_attr(
|
||||||
|
@ -1495,6 +1581,291 @@ pub fn get_room_state_ids_route<'a>(
|
||||||
.into())
|
.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg_attr(
|
||||||
|
feature = "conduit_bin",
|
||||||
|
get("/_matrix/federation/v1/make_join/<_>/<_>", data = "<body>")
|
||||||
|
)]
|
||||||
|
#[tracing::instrument(skip(db, body))]
|
||||||
|
pub fn create_join_event_template_route<'a>(
|
||||||
|
db: State<'a, Database>,
|
||||||
|
body: Ruma<create_join_event_template::v1::Request<'_>>,
|
||||||
|
) -> ConduitResult<create_join_event_template::v1::Response> {
|
||||||
|
if !db.globals.allow_federation() {
|
||||||
|
return Err(Error::bad_config("Federation is disabled."));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !db.rooms.exists(&body.room_id)? {
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
ErrorKind::NotFound,
|
||||||
|
"Server is not in room.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !body.ver.contains(&RoomVersionId::Version6) {
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
ErrorKind::IncompatibleRoomVersion {
|
||||||
|
room_version: RoomVersionId::Version6,
|
||||||
|
},
|
||||||
|
"Room version not supported.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let prev_events = db
|
||||||
|
.rooms
|
||||||
|
.get_pdu_leaves(&body.room_id)?
|
||||||
|
.into_iter()
|
||||||
|
.take(20)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let create_event = db
|
||||||
|
.rooms
|
||||||
|
.room_state_get(&body.room_id, &EventType::RoomCreate, "")?;
|
||||||
|
|
||||||
|
let create_event_content = create_event
|
||||||
|
.as_ref()
|
||||||
|
.map(|create_event| {
|
||||||
|
Ok::<_, Error>(
|
||||||
|
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone())
|
||||||
|
.expect("Raw::from_value always works.")
|
||||||
|
.deserialize()
|
||||||
|
.map_err(|_| Error::bad_database("Invalid PowerLevels event in db."))?,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
let create_prev_event = if prev_events.len() == 1
|
||||||
|
&& Some(&prev_events[0]) == create_event.as_ref().map(|c| &c.event_id)
|
||||||
|
{
|
||||||
|
create_event.map(Arc::new)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// If there was no create event yet, assume we are creating a version 6 room right now
|
||||||
|
let room_version = create_event_content.map_or(RoomVersionId::Version6, |create_event| {
|
||||||
|
create_event.room_version
|
||||||
|
});
|
||||||
|
|
||||||
|
let content = serde_json::to_value(MemberEventContent {
|
||||||
|
avatar_url: None,
|
||||||
|
displayname: None,
|
||||||
|
is_direct: None,
|
||||||
|
membership: MembershipState::Join,
|
||||||
|
third_party_invite: None,
|
||||||
|
})
|
||||||
|
.expect("member event is valid value");
|
||||||
|
|
||||||
|
let state_key = body.user_id.to_string();
|
||||||
|
let kind = EventType::RoomMember;
|
||||||
|
|
||||||
|
let auth_events = db.rooms.get_auth_events(
|
||||||
|
&body.room_id,
|
||||||
|
&kind,
|
||||||
|
&body.user_id,
|
||||||
|
Some(&state_key),
|
||||||
|
&content,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// Our depth is the maximum depth of prev_events + 1
|
||||||
|
let depth = prev_events
|
||||||
|
.iter()
|
||||||
|
.filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth))
|
||||||
|
.max()
|
||||||
|
.unwrap_or_else(|| uint!(0))
|
||||||
|
+ uint!(1);
|
||||||
|
|
||||||
|
let mut unsigned = BTreeMap::new();
|
||||||
|
|
||||||
|
if let Some(prev_pdu) = db.rooms.room_state_get(&body.room_id, &kind, &state_key)? {
|
||||||
|
unsigned.insert("prev_content".to_owned(), prev_pdu.content);
|
||||||
|
unsigned.insert(
|
||||||
|
"prev_sender".to_owned(),
|
||||||
|
serde_json::to_value(prev_pdu.sender).expect("UserId::to_value always works"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let pdu = PduEvent {
|
||||||
|
event_id: ruma::event_id!("$thiswillbefilledinlater"),
|
||||||
|
room_id: body.room_id.clone(),
|
||||||
|
sender: body.user_id.clone(),
|
||||||
|
origin_server_ts: utils::millis_since_unix_epoch()
|
||||||
|
.try_into()
|
||||||
|
.expect("time is valid"),
|
||||||
|
kind,
|
||||||
|
content,
|
||||||
|
state_key: Some(state_key),
|
||||||
|
prev_events,
|
||||||
|
depth,
|
||||||
|
auth_events: auth_events
|
||||||
|
.iter()
|
||||||
|
.map(|(_, pdu)| pdu.event_id.clone())
|
||||||
|
.collect(),
|
||||||
|
redacts: None,
|
||||||
|
unsigned,
|
||||||
|
hashes: ruma::events::pdu::EventHash {
|
||||||
|
sha256: "aaa".to_owned(),
|
||||||
|
},
|
||||||
|
signatures: BTreeMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let auth_check = state_res::auth_check(
|
||||||
|
&room_version,
|
||||||
|
&Arc::new(pdu.clone()),
|
||||||
|
create_prev_event,
|
||||||
|
&auth_events,
|
||||||
|
None, // TODO: third_party_invite
|
||||||
|
)
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("{:?}", e);
|
||||||
|
Error::bad_database("Auth check failed.")
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if !auth_check {
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
ErrorKind::InvalidParam,
|
||||||
|
"Event is not authorized.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hash and sign
|
||||||
|
let mut pdu_json =
|
||||||
|
utils::to_canonical_object(&pdu).expect("event is valid, we just created it");
|
||||||
|
|
||||||
|
pdu_json.remove("event_id");
|
||||||
|
|
||||||
|
// Add origin because synapse likes that (and it's required in the spec)
|
||||||
|
pdu_json.insert(
|
||||||
|
"origin".to_owned(),
|
||||||
|
to_canonical_value(db.globals.server_name())
|
||||||
|
.expect("server name is a valid CanonicalJsonValue"),
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(create_join_event_template::v1::Response {
|
||||||
|
room_version: Some(RoomVersionId::Version6),
|
||||||
|
event: serde_json::from_value::<Raw<_>>(
|
||||||
|
serde_json::to_value(pdu_json).expect("CanonicalJson is valid serde_json::Value"),
|
||||||
|
)
|
||||||
|
.expect("Raw::from_value always works"),
|
||||||
|
}
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg_attr(
|
||||||
|
feature = "conduit_bin",
|
||||||
|
put("/_matrix/federation/v2/send_join/<_>/<_>", data = "<body>")
|
||||||
|
)]
|
||||||
|
#[tracing::instrument(skip(db, body))]
|
||||||
|
pub async fn create_join_event_route<'a>(
|
||||||
|
db: State<'a, Database>,
|
||||||
|
body: Ruma<create_join_event::v2::Request<'_>>,
|
||||||
|
) -> ConduitResult<create_join_event::v2::Response> {
|
||||||
|
if !db.globals.allow_federation() {
|
||||||
|
return Err(Error::bad_config("Federation is disabled."));
|
||||||
|
}
|
||||||
|
|
||||||
|
// We need to return the state prior to joining, let's keep a reference to that here
|
||||||
|
let shortstatehash =
|
||||||
|
db.rooms
|
||||||
|
.current_shortstatehash(&body.room_id)?
|
||||||
|
.ok_or(Error::BadRequest(
|
||||||
|
ErrorKind::NotFound,
|
||||||
|
"Pdu state not found.",
|
||||||
|
))?;
|
||||||
|
|
||||||
|
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||||
|
let mut auth_cache = EventMap::new();
|
||||||
|
|
||||||
|
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
||||||
|
let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(&body.pdu) {
|
||||||
|
Ok(t) => t,
|
||||||
|
Err(_) => {
|
||||||
|
// Event could not be converted to canonical json
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
ErrorKind::InvalidParam,
|
||||||
|
"Could not convert event to canonical json.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let origin = serde_json::from_value::<Box<ServerName>>(
|
||||||
|
serde_json::to_value(value.get("origin").ok_or(Error::BadRequest(
|
||||||
|
ErrorKind::InvalidParam,
|
||||||
|
"Event needs an origin field.",
|
||||||
|
))?)
|
||||||
|
.expect("CanonicalJson is valid json value"),
|
||||||
|
)
|
||||||
|
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?;
|
||||||
|
|
||||||
|
let pdu_id = handle_incoming_pdu(
|
||||||
|
&origin,
|
||||||
|
&event_id,
|
||||||
|
value,
|
||||||
|
true,
|
||||||
|
&db,
|
||||||
|
&pub_key_map,
|
||||||
|
&mut auth_cache,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|_| {
|
||||||
|
Error::BadRequest(
|
||||||
|
ErrorKind::InvalidParam,
|
||||||
|
"Error while handling incoming PDU.",
|
||||||
|
)
|
||||||
|
})?
|
||||||
|
.ok_or(Error::BadRequest(
|
||||||
|
ErrorKind::InvalidParam,
|
||||||
|
"Could not accept incoming PDU as timeline event.",
|
||||||
|
))?;
|
||||||
|
|
||||||
|
let state_ids = db.rooms.state_full_ids(shortstatehash)?;
|
||||||
|
|
||||||
|
let mut auth_chain_ids = BTreeSet::<EventId>::new();
|
||||||
|
let mut todo = state_ids.iter().cloned().collect::<BTreeSet<_>>();
|
||||||
|
|
||||||
|
while let Some(event_id) = todo.iter().next().cloned() {
|
||||||
|
if let Some(pdu) = db.rooms.get_pdu(&event_id)? {
|
||||||
|
todo.extend(
|
||||||
|
pdu.auth_events
|
||||||
|
.clone()
|
||||||
|
.into_iter()
|
||||||
|
.collect::<BTreeSet<_>>()
|
||||||
|
.difference(&auth_chain_ids)
|
||||||
|
.cloned(),
|
||||||
|
);
|
||||||
|
auth_chain_ids.extend(pdu.auth_events.into_iter());
|
||||||
|
} else {
|
||||||
|
warn!("Could not find pdu mentioned in auth events.");
|
||||||
|
}
|
||||||
|
|
||||||
|
todo.remove(&event_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
for server in db
|
||||||
|
.rooms
|
||||||
|
.room_servers(&body.room_id)
|
||||||
|
.filter_map(|r| r.ok())
|
||||||
|
.filter(|server| &**server != db.globals.server_name())
|
||||||
|
{
|
||||||
|
db.sending.send_pdu(&server, &pdu_id)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(create_join_event::v2::Response {
|
||||||
|
room_state: RoomState {
|
||||||
|
auth_chain: auth_chain_ids
|
||||||
|
.iter()
|
||||||
|
.filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten())
|
||||||
|
.map(|json| PduEvent::convert_to_outgoing_federation_event(json))
|
||||||
|
.collect(),
|
||||||
|
state: state_ids
|
||||||
|
.iter()
|
||||||
|
.filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten())
|
||||||
|
.map(|json| PduEvent::convert_to_outgoing_federation_event(json))
|
||||||
|
.collect(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg_attr(
|
#[cfg_attr(
|
||||||
feature = "conduit_bin",
|
feature = "conduit_bin",
|
||||||
put("/_matrix/federation/v2/invite/<_>/<_>", data = "<body>")
|
put("/_matrix/federation/v2/invite/<_>/<_>", data = "<body>")
|
||||||
|
@ -1504,6 +1875,10 @@ pub async fn create_invite_route<'a>(
|
||||||
db: State<'a, Database>,
|
db: State<'a, Database>,
|
||||||
body: Ruma<create_invite::v2::Request>,
|
body: Ruma<create_invite::v2::Request>,
|
||||||
) -> ConduitResult<create_invite::v2::Response> {
|
) -> ConduitResult<create_invite::v2::Response> {
|
||||||
|
if !db.globals.allow_federation() {
|
||||||
|
return Err(Error::bad_config("Federation is disabled."));
|
||||||
|
}
|
||||||
|
|
||||||
if body.room_version < RoomVersionId::Version6 {
|
if body.room_version < RoomVersionId::Version6 {
|
||||||
return Err(Error::BadRequest(
|
return Err(Error::BadRequest(
|
||||||
ErrorKind::IncompatibleRoomVersion {
|
ErrorKind::IncompatibleRoomVersion {
|
||||||
|
@ -1599,6 +1974,31 @@ pub async fn create_invite_route<'a>(
|
||||||
.into())
|
.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg_attr(
|
||||||
|
feature = "conduit_bin",
|
||||||
|
get("/_matrix/federation/v1/query/directory", data = "<body>")
|
||||||
|
)]
|
||||||
|
#[tracing::instrument(skip(db, body))]
|
||||||
|
pub fn get_room_information_route<'a>(
|
||||||
|
db: State<'a, Database>,
|
||||||
|
body: Ruma<get_room_information::v1::Request<'_>>,
|
||||||
|
) -> ConduitResult<get_room_information::v1::Response> {
|
||||||
|
if !db.globals.allow_federation() {
|
||||||
|
return Err(Error::bad_config("Federation is disabled."));
|
||||||
|
}
|
||||||
|
|
||||||
|
let room_id = db
|
||||||
|
.rooms
|
||||||
|
.id_from_alias(&body.room_alias)?
|
||||||
|
.ok_or_else(|| Error::BadRequest(ErrorKind::NotFound, "Room alias not found."))?;
|
||||||
|
|
||||||
|
Ok(get_room_information::v1::Response {
|
||||||
|
room_id,
|
||||||
|
servers: vec![db.globals.server_name().to_owned()],
|
||||||
|
}
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg_attr(
|
#[cfg_attr(
|
||||||
feature = "conduit_bin",
|
feature = "conduit_bin",
|
||||||
get("/_matrix/federation/v1/query/profile", data = "<body>")
|
get("/_matrix/federation/v1/query/profile", data = "<body>")
|
||||||
|
|
Loading…
Reference in New Issue