diff --git a/crates/node/src/sync.rs b/crates/node/src/sync.rs index b5454f713..0f7b066bd 100644 --- a/crates/node/src/sync.rs +++ b/crates/node/src/sync.rs @@ -193,40 +193,45 @@ impl Node { } } + let identities = self.ctx_manager.get_context_owned_identities(context.id)?; + + let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else { + bail!("no identities found for context: {}", context.id); + }; + match payload { InitPayload::KeyShare => { - self.handle_key_share_request(context, their_identity, stream) + self.handle_key_share_request(&context, our_identity, their_identity, stream) .await? } InitPayload::BlobShare { blob_id } => { - self.handle_blob_share_request(context, their_identity, blob_id, stream) - .await? + self.handle_blob_share_request( + &context, + our_identity, + their_identity, + blob_id, + stream, + ) + .await? } InitPayload::StateSync { - root_hash, - application_id, + root_hash: their_root_hash, + application_id: their_application_id, } => { - if updated.is_none() && context.application_id != application_id { + if updated.is_none() && context.application_id != their_application_id { updated = Some(self.ctx_manager.sync_context_config(context_id).await?); } if let Some(updated) = updated { - if application_id != updated.application_id { - bail!( - "application mismatch: expected {}, got {}", - updated.application_id, - application_id - ); - } - context = updated; } self.handle_state_sync_request( - context, + &mut context, + our_identity, their_identity, - root_hash, - application_id, + their_root_hash, + their_application_id, stream, ) .await? diff --git a/crates/node/src/sync/blobs.rs b/crates/node/src/sync/blobs.rs index 02f3272e3..9731a098f 100644 --- a/crates/node/src/sync/blobs.rs +++ b/crates/node/src/sync/blobs.rs @@ -6,8 +6,6 @@ use calimero_primitives::identity::PublicKey; use eyre::{bail, OptionExt}; use futures_util::stream::poll_fn; use futures_util::TryStreamExt; -use rand::seq::IteratorRandom; -use rand::thread_rng; use tokio::sync::mpsc; use tracing::{debug, warn}; @@ -24,6 +22,13 @@ impl Node { size: u64, stream: &mut Stream, ) -> eyre::Result<()> { + debug!( + context_id=%context.id, + our_identity=%our_identity, + blob_id=%blob_id, + "Initiating blob share", + ); + send( stream, &StreamMessage::Init { @@ -119,18 +124,28 @@ impl Node { ); } + debug!( + context_id=%context.id, + our_identity=%our_identity, + their_identity=%their_identity, + blob_id=%blob_id, + "Blob share completed", + ); + Ok(()) } pub(super) async fn handle_blob_share_request( &self, - context: Context, + context: &Context, + our_identity: PublicKey, their_identity: PublicKey, blob_id: BlobId, stream: &mut Stream, ) -> eyre::Result<()> { debug!( context_id=%context.id, + our_identity=%our_identity, their_identity=%their_identity, blob_id=%blob_id, "Received blob share request", @@ -142,12 +157,6 @@ impl Node { return Ok(()); }; - let identities = self.ctx_manager.get_context_owned_identities(context.id)?; - - let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else { - bail!("no identities found for context: {}", context.id); - }; - let private_key = self .ctx_manager .get_private_key(context.id, our_identity)? @@ -192,6 +201,14 @@ impl Node { ) .await?; + debug!( + context_id=%context.id, + our_identity=%our_identity, + their_identity=%their_identity, + blob_id=%blob_id, + "Blob share completed", + ); + Ok(()) } } diff --git a/crates/node/src/sync/key.rs b/crates/node/src/sync/key.rs index c1735aae0..9068710c6 100644 --- a/crates/node/src/sync/key.rs +++ b/crates/node/src/sync/key.rs @@ -3,8 +3,6 @@ use calimero_network::stream::Stream; use calimero_primitives::context::Context; use calimero_primitives::identity::PublicKey; use eyre::{bail, OptionExt}; -use rand::seq::IteratorRandom; -use rand::thread_rng; use tracing::debug; use crate::sync::{recv, send, Sequencer}; @@ -18,6 +16,12 @@ impl Node { our_identity: PublicKey, stream: &mut Stream, ) -> eyre::Result<()> { + debug!( + context_id=%context.id, + our_identity=%our_identity, + "Initiating key share", + ); + send( stream, &StreamMessage::Init { @@ -46,13 +50,14 @@ impl Node { } }; - self.bidirectional_key_sync(context, our_identity, their_identity, stream) + self.bidirectional_key_share(context, our_identity, their_identity, stream) .await } pub(super) async fn handle_key_share_request( &self, - context: Context, + context: &Context, + our_identity: PublicKey, their_identity: PublicKey, stream: &mut Stream, ) -> eyre::Result<()> { @@ -62,12 +67,6 @@ impl Node { "Received key share request", ); - let identities = self.ctx_manager.get_context_owned_identities(context.id)?; - - let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else { - bail!("no identities found for context: {}", context.id); - }; - send( stream, &StreamMessage::Init { @@ -79,24 +78,22 @@ impl Node { ) .await?; - let mut context = context; - self.bidirectional_key_sync(&mut context, our_identity, their_identity, stream) + self.bidirectional_key_share(context, our_identity, their_identity, stream) .await } - async fn bidirectional_key_sync( + async fn bidirectional_key_share( &self, - context: &mut Context, + context: &Context, our_identity: PublicKey, their_identity: PublicKey, stream: &mut Stream, ) -> eyre::Result<()> { debug!( context_id=%context.id, - our_root_hash=%context.root_hash, our_identity=%our_identity, their_identity=%their_identity, - "Starting bidirectional key sync", + "Starting bidirectional key share", ); let private_key = self @@ -146,6 +143,13 @@ impl Node { self.ctx_manager .update_sender_key(&context.id, &their_identity, &sender_key)?; + debug!( + context_id=%context.id, + our_identity=%our_identity, + their_identity=%their_identity, + "Key share completed", + ); + Ok(()) } } diff --git a/crates/node/src/sync/state.rs b/crates/node/src/sync/state.rs index 44a2716ff..1761ca813 100644 --- a/crates/node/src/sync/state.rs +++ b/crates/node/src/sync/state.rs @@ -7,8 +7,6 @@ use calimero_primitives::context::Context; use calimero_primitives::hash::Hash; use calimero_primitives::identity::PublicKey; use eyre::{bail, OptionExt}; -use rand::seq::IteratorRandom; -use rand::thread_rng; use tracing::debug; use crate::sync::{recv, send, Sequencer}; @@ -22,6 +20,14 @@ impl Node { our_identity: PublicKey, stream: &mut Stream, ) -> eyre::Result<()> { + debug!( + context_id=%context.id, + our_identity=%our_identity, + our_root_hash=?context.root_hash, + our_application_id=%context.application_id, + "Initiating state sync", + ); + send( stream, &StreamMessage::Init { @@ -36,35 +42,63 @@ impl Node { ) .await?; - let Some(ack) = recv(stream, self.sync_config.timeout, None).await? else { - bail!("connection closed while awaiting state sync handshake"); - }; + let mut pair = None; - let (root_hash, their_identity) = match ack { - StreamMessage::Init { - party_id, - payload: - InitPayload::StateSync { - root_hash, - application_id, - }, - .. - } => { - if application_id != context.application_id { - bail!( - "unexpected application id: expected {}, got {}", - context.application_id, - application_id - ); + for _ in 1..=2 { + let Some(ack) = recv(stream, self.sync_config.timeout, None).await? else { + bail!("connection closed while awaiting state sync handshake"); + }; + + let (root_hash, their_identity) = match ack { + StreamMessage::Init { + party_id, + payload: + InitPayload::StateSync { + root_hash, + application_id, + }, + .. + } => { + if application_id != context.application_id { + bail!( + "unexpected application id: expected {}, got {}", + context.application_id, + application_id + ); + } + + (root_hash, party_id) } + StreamMessage::Init { + party_id: their_identity, + payload: InitPayload::BlobShare { blob_id }, + .. + } => { + self.handle_blob_share_request( + context, + our_identity, + their_identity, + blob_id, + stream, + ) + .await?; + + continue; + } + unexpected @ (StreamMessage::Init { .. } + | StreamMessage::Message { .. } + | StreamMessage::OpaqueError) => { + bail!("unexpected message: {:?}", unexpected) + } + }; - (root_hash, party_id) - } - unexpected @ (StreamMessage::Init { .. } - | StreamMessage::Message { .. } - | StreamMessage::OpaqueError) => { - bail!("unexpected message: {:?}", unexpected) - } + pair = Some((root_hash, their_identity)); + + break; + } + + let Some((root_hash, their_identity)) = pair else { + bail!("expected two state sync handshakes, got none"); }; if root_hash == context.root_hash { @@ -107,32 +141,55 @@ impl Node { pub(super) async fn handle_state_sync_request( &self, - context: Context, + context: &mut Context, + our_identity: PublicKey, their_identity: PublicKey, - root_hash: Hash, - application_id: ApplicationId, + their_root_hash: Hash, + their_application_id: ApplicationId, stream: &mut Stream, ) -> eyre::Result<()> { debug!( context_id=%context.id, + our_identity=%our_identity, + our_root_hash=?context.root_hash, + our_application_id=%context.application_id, their_identity=%their_identity, - their_root_hash=%root_hash, - their_application_id=%application_id, + their_root_hash=%their_root_hash, + their_application_id=%their_application_id, "Received state sync request", ); - let identities = self.ctx_manager.get_context_owned_identities(context.id)?; - - let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else { - bail!("no identities found for context: {}", context.id); - }; + if their_application_id != context.application_id { + bail!( + "application mismatch: expected {}, got {}", + context.application_id, + their_application_id + ); + } - let private_key = self + let application = self .ctx_manager - .get_private_key(context.id, our_identity)? - .ok_or_eyre("expected own identity to have private key")?; + .get_application(&context.application_id)? + .ok_or_eyre("fatal: the application (even if just a sparse reference) should exist")?; - let shared_key = SharedKey::new(&private_key, &their_identity); + if !self.ctx_manager.has_blob_available(application.blob)? { + debug!( + context_id=%context.id, + application_id=%context.application_id, + "The application blob is not available, attempting to receive it from the other peer", + ); + + self.initiate_blob_share_process( + &context, + our_identity, + application.blob, + application.size, + stream, + ) + .await?; + + debug!(context_id=%context.id, "Resuming state sync"); + } send( stream, @@ -148,15 +205,21 @@ impl Node { ) .await?; - if root_hash == context.root_hash { + if their_root_hash == context.root_hash { return Ok(()); } + let private_key = self + .ctx_manager + .get_private_key(context.id, our_identity)? + .ok_or_eyre("expected own identity to have private key")?; + + let shared_key = SharedKey::new(&private_key, &their_identity); + let mut sqx_out = Sequencer::default(); - let mut context = context; self.bidirectional_sync( - &mut context, + context, our_identity, their_identity, &mut sqx_out, @@ -179,10 +242,9 @@ impl Node { ) -> eyre::Result<()> { debug!( context_id=%context.id, - our_root_hash=%context.root_hash, our_identity=%our_identity, their_identity=%their_identity, - "Starting bidirectional sync", + "Starting bidirectional state sync", ); let mut sqx_in = Sequencer::default(); @@ -234,6 +296,14 @@ impl Node { .await?; } + debug!( + context_id=%context.id, + our_root_hash=%context.root_hash, + our_identity=%our_identity, + their_identity=%their_identity, + "State sync completed", + ); + Ok(()) } }