diff --git a/crates/node/src/sync.rs b/crates/node/src/sync.rs index 10a53b976..83d71070b 100644 --- a/crates/node/src/sync.rs +++ b/crates/node/src/sync.rs @@ -201,12 +201,12 @@ impl Node { match payload { InitPayload::KeyShare => { - self.handle_key_share_request(context, our_identity, their_identity, stream) + self.handle_key_share_request(&context, our_identity, their_identity, stream) .await? } InitPayload::BlobShare { blob_id } => { self.handle_blob_share_request( - context, + &context, our_identity, their_identity, blob_id, @@ -234,8 +234,21 @@ impl Node { context = updated; } + if let Some(application) = self.ctx_manager.get_application(&application_id)? { + if !self.ctx_manager.has_blob_available(application.blob)? { + self.initiate_blob_share_process( + &context, + our_identity, + application.blob, + application.size, + stream, + ) + .await?; + } + } + self.handle_state_sync_request( - context, + &mut context, our_identity, their_identity, root_hash, diff --git a/crates/node/src/sync/blobs.rs b/crates/node/src/sync/blobs.rs index 3c6eb0c50..84fa1ca55 100644 --- a/crates/node/src/sync/blobs.rs +++ b/crates/node/src/sync/blobs.rs @@ -122,7 +122,7 @@ impl Node { pub(super) async fn handle_blob_share_request( &self, - context: Context, + context: &Context, our_identity: PublicKey, their_identity: PublicKey, blob_id: BlobId, @@ -186,6 +186,14 @@ impl Node { ) .await?; + debug!( + context_id=%context.id, + our_identity=%our_identity, + their_identity=%their_identity, + blob_id=%blob_id, + "Blob share completed", + ); + Ok(()) } } diff --git a/crates/node/src/sync/key.rs b/crates/node/src/sync/key.rs index 961d858df..aa6777067 100644 --- a/crates/node/src/sync/key.rs +++ b/crates/node/src/sync/key.rs @@ -50,7 +50,7 @@ impl Node { pub(super) async fn handle_key_share_request( &self, - context: Context, + context: &Context, our_identity: PublicKey, their_identity: PublicKey, stream: &mut Stream, @@ -72,21 +72,19 @@ impl Node { ) .await?; - let mut context = context; - self.bidirectional_key_sync(&mut context, our_identity, their_identity, stream) + self.bidirectional_key_sync(context, our_identity, their_identity, stream) .await } async fn bidirectional_key_sync( &self, - context: &mut Context, + context: &Context, our_identity: PublicKey, their_identity: PublicKey, stream: &mut Stream, ) -> eyre::Result<()> { debug!( context_id=%context.id, - our_root_hash=%context.root_hash, our_identity=%our_identity, their_identity=%their_identity, "Starting bidirectional key sync", @@ -139,6 +137,13 @@ impl Node { self.ctx_manager .update_sender_key(&context.id, &their_identity, &sender_key)?; + debug!( + context_id=%context.id, + our_identity=%our_identity, + their_identity=%their_identity, + "Key sync completed", + ); + Ok(()) } } diff --git a/crates/node/src/sync/state.rs b/crates/node/src/sync/state.rs index 2223981df..47a092a2d 100644 --- a/crates/node/src/sync/state.rs +++ b/crates/node/src/sync/state.rs @@ -34,35 +34,63 @@ impl Node { ) .await?; - let Some(ack) = recv(stream, self.sync_config.timeout, None).await? else { - bail!("connection closed while awaiting state sync handshake"); - }; + let mut pair = None; - let (root_hash, their_identity) = match ack { - StreamMessage::Init { - party_id, - payload: - InitPayload::StateSync { - root_hash, - application_id, - }, - .. - } => { - if application_id != context.application_id { - bail!( - "unexpected application id: expected {}, got {}", - context.application_id, - application_id - ); + for _ in 1..=2 { + let Some(ack) = recv(stream, self.sync_config.timeout, None).await? else { + bail!("connection closed while awaiting state sync handshake"); + }; + + let (root_hash, their_identity) = match ack { + StreamMessage::Init { + party_id, + payload: + InitPayload::StateSync { + root_hash, + application_id, + }, + .. + } => { + if application_id != context.application_id { + bail!( + "unexpected application id: expected {}, got {}", + context.application_id, + application_id + ); + } + + (root_hash, party_id) } + StreamMessage::Init { + party_id: their_identity, + payload: InitPayload::BlobShare { blob_id }, + .. + } => { + self.handle_blob_share_request( + context, + our_identity, + their_identity, + blob_id, + stream, + ) + .await?; - (root_hash, party_id) - } - unexpected @ (StreamMessage::Init { .. } - | StreamMessage::Message { .. } - | StreamMessage::OpaqueError) => { - bail!("unexpected message: {:?}", unexpected) - } + continue; + } + unexpected @ (StreamMessage::Init { .. } + | StreamMessage::Message { .. } + | StreamMessage::OpaqueError) => { + bail!("unexpected message: {:?}", unexpected) + } + }; + + pair = Some((root_hash, their_identity)); + + break; + } + + let Some((root_hash, their_identity)) = pair else { + bail!("expected two state sync handshakes, got none"); }; if root_hash == context.root_hash { @@ -105,7 +133,7 @@ impl Node { pub(super) async fn handle_state_sync_request( &self, - context: Context, + context: &mut Context, our_identity: PublicKey, their_identity: PublicKey, root_hash: Hash, @@ -148,9 +176,8 @@ impl Node { let mut sqx_out = Sequencer::default(); - let mut context = context; self.bidirectional_sync( - &mut context, + context, our_identity, their_identity, &mut sqx_out, @@ -228,6 +255,14 @@ impl Node { .await?; } + debug!( + context_id=%context.id, + our_root_hash=%context.root_hash, + our_identity=%our_identity, + their_identity=%their_identity, + "State sync completed", + ); + Ok(()) } }