Use the auth_events for step 6, WIP forward_extremity_ids fn

This commit is contained in:
Devin Ragotzy 2021-01-15 15:46:47 -05:00 committed by Devin Ragotzy
parent 4b2eb5ab82
commit db0aee3318

View file

@ -569,7 +569,7 @@ pub async fn send_transaction_message_route<'a>(
// discard the event whereas the Client Server API's /send/{eventType} endpoint
// would return a M_BAD_JSON error.
let mut resolved_map = BTreeMap::new();
for pdu in &body.pdus {
'main_pdu_loop: for pdu in &body.pdus {
// 1. Is a valid event, otherwise it is dropped.
// Ruma/PduEvent/StateEvent satisfies this
// We do not add the event_id field to the pdu here because of signature and hashes checks
@ -660,7 +660,6 @@ pub async fn send_transaction_message_route<'a>(
};
let pdu = Arc::new(pdu.clone());
// Fetch any unknown prev_events or retrieve them from the DB
let previous = match fetch_events(
&db,
@ -675,6 +674,7 @@ pub async fn send_transaction_message_route<'a>(
_ => None,
};
// [auth_cache] At this point we have the auth chain of the incoming event.
let mut event_map: state_res::EventMap<Arc<PduEvent>> = auth_cache
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
@ -688,7 +688,7 @@ pub async fn send_transaction_message_route<'a>(
&pdu.auth_events
.iter()
.map(|id| {
event_map
auth_cache
.get(id)
.map(|pdu| ((pdu.kind(), pdu.state_key()), pdu.clone()))
.ok_or_else(|| {
@ -790,7 +790,15 @@ pub async fn send_transaction_message_route<'a>(
// End of step 5.
// Gather the forward extremities and resolve
let fork_states = match forward_extremity_ids(&db, &pdu) {
let fork_states = match forward_extremity_ids(
&db,
&pdu,
server_name,
&pub_key_map,
&mut auth_cache,
)
.await
{
Ok(states) => states,
Err(_) => {
resolved_map.insert(event_id, Err("Failed to gather forward extremities".into()));
@ -805,47 +813,44 @@ pub async fn send_transaction_message_route<'a>(
} else if fork_states.len() == 1 {
fork_states[0].clone()
} else {
// TODO: remove this is for current debugging Jan, 15 2021
let mut number_fetches = 0_u32;
let mut auth_events = vec![];
// this keeps track if we error so we can break out of these inner loops
// to continue on with the incoming PDU's
let mut failed = false;
for map in &fork_states {
let mut state_auth = vec![];
for pdu in map.values() {
let event = match auth_cache.get(pdu.event_id()) {
for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) {
let event = match auth_cache.get(auth_id) {
Some(aev) => aev.clone(),
// We should know about every event at this point but just incase...
None => match fetch_events(
&db,
server_name,
&pub_key_map,
&[pdu.event_id().clone()],
&[auth_id.clone()],
&mut auth_cache,
)
.await
.map(|mut vec| vec.remove(0))
{
Ok(aev) => aev.clone(),
.map(|mut vec| {
number_fetches += 1;
vec.remove(0)
}) {
Ok(aev) => aev,
Err(_) => {
resolved_map.insert(
event_id.clone(),
Err("Event has been soft failed".into()),
);
failed = true;
break;
continue 'main_pdu_loop;
}
},
};
state_auth.push(event);
}
if failed {
break;
}
auth_events.push(state_auth);
}
if failed {
continue;
}
info!("{} event's were not in the auth_cache", number_fetches);
// Add everything we will need to event_map
event_map.extend(
@ -886,7 +891,13 @@ pub async fn send_transaction_message_route<'a>(
.into_iter()
.map(|(k, v)| (k, Arc::new(db.rooms.get_pdu(&v).unwrap().unwrap())))
.collect(),
Err(e) => panic!("{:?}", e),
Err(_) => {
resolved_map.insert(
pdu.event_id().clone(),
Err("State resolution failed, either an event could not be found or deserialization".into()),
);
continue 'main_pdu_loop;
}
}
};
@ -914,6 +925,7 @@ pub async fn send_transaction_message_route<'a>(
Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into())
}
/// TODO: don't add as outlier if event is fetched as a result of gathering auth_events
/// Validate any event that is given to us by another server.
///
/// 1. Is a valid event, otherwise it is dropped (PduEvent deserialization satisfies this).
@ -955,6 +967,37 @@ fn validate_event<'a>(
})
}
/// TODO: don't add as outlier if event is fetched as a result of gathering auth_events
/// The check in `fetch_check_auth_events` is that a complete chain is found for the
/// events `auth_events`. If the chain is found to have any missing events it fails.
///
/// The `auth_cache` is filled instead of returning a `Vec`.
async fn fetch_check_auth_events(
db: &Database,
origin: &ServerName,
key_map: &PublicKeyMap,
event_ids: &[EventId],
auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<()> {
let mut stack = event_ids.to_vec();
// DFS for auth event chain
while !stack.is_empty() {
let ev_id = stack.pop().unwrap();
if auth_cache.contains_key(&ev_id) {
continue;
}
let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache)
.await
.map(|mut vec| vec.remove(0))?;
stack.extend(ev.auth_events());
auth_cache.insert(ev.event_id().clone(), ev);
}
Ok(())
}
/// Find the event and auth it.
///
/// 1. Look in the main timeline (pduid_pdu tree)
@ -1000,36 +1043,6 @@ async fn fetch_events(
Ok(pdus)
}
/// The check in `fetch_check_auth_events` is that a complete chain is found for the
/// events `auth_events`. If the chain is found to have any missing events it fails.
///
/// The `auth_cache` is filled instead of returning a `Vec`.
async fn fetch_check_auth_events(
db: &Database,
origin: &ServerName,
key_map: &PublicKeyMap,
event_ids: &[EventId],
auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<()> {
let mut stack = event_ids.to_vec();
// DFS for auth event chain
while !stack.is_empty() {
let ev_id = stack.pop().unwrap();
if auth_cache.contains_key(&ev_id) {
continue;
}
let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache)
.await
.map(|mut vec| vec.remove(0))?;
stack.extend(ev.auth_events());
auth_cache.insert(ev.event_id().clone(), ev);
}
Ok(())
}
/// Search the DB for the signing keys of the given server, if we don't have them
/// fetch them from the server and save to our DB.
async fn fetch_signing_keys(
@ -1049,6 +1062,7 @@ async fn fetch_signing_keys(
}
}
}
fn signature_and_hash_check(
pub_key_map: &ruma::signatures::PublicKeyMap,
value: CanonicalJsonObject,
@ -1073,9 +1087,23 @@ fn signature_and_hash_check(
)
}
fn forward_extremity_ids(db: &Database, pdu: &PduEvent) -> Result<Vec<StateMap<Arc<PduEvent>>>> {
async fn forward_extremity_ids(
db: &Database,
pdu: &PduEvent,
origin: &ServerName,
pub_key_map: &PublicKeyMap,
auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<Vec<StateMap<Arc<PduEvent>>>> {
let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?;
for incoming_leaf in &pdu.prev_events {
if !current_leaves.contains(incoming_leaf) {
current_leaves.push(incoming_leaf.clone());
}
}
let mut fork_states = vec![];
for id in &db.rooms.get_pdu_leaves(pdu.room_id())? {
for id in &current_leaves {
if let Some(id) = db.rooms.get_pdu_id(id)? {
let state_hash = db
.rooms
@ -1090,11 +1118,32 @@ fn forward_extremity_ids(db: &Database, pdu: &PduEvent) -> Result<Vec<StateMap<A
fork_states.push(state);
} else {
return Err(Error::Conflict(
"we don't know of a pdu that is part of our known forks OOPS",
));
let res = db
.sending
.send_federation_request(
&db.globals,
origin,
get_room_state_ids::v1::Request {
room_id: pdu.room_id(),
event_id: id,
},
)
.await?;
// TODO: This only adds events to the auth_cache, there is for sure a better way to
// do this...
fetch_events(&db, origin, &pub_key_map, &res.auth_chain_ids, auth_cache).await?;
let state = fetch_events(&db, origin, &pub_key_map, &res.pdu_ids, auth_cache)
.await?
.into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect();
fork_states.push(state);
}
}
Ok(fork_states)
}