diff --git a/crates/context/config/src/client/protocol/near.rs b/crates/context/config/src/client/protocol/near.rs index 2e480df66..61f027c7f 100644 --- a/crates/context/config/src/client/protocol/near.rs +++ b/crates/context/config/src/client/protocol/near.rs @@ -151,6 +151,8 @@ impl<'a> NearTransport<'a> { #[derive(Debug, Error)] #[non_exhaustive] pub enum NearError { + #[error("unsupported protocol `{0}`")] + UnsupportedProtocol(String), #[error("unknown network `{0}`")] UnknownNetwork(String), #[error("invalid response from RPC while {operation}")] @@ -191,6 +193,12 @@ impl Transport for NearTransport<'_> { request: TransportRequest<'_>, payload: Vec, ) -> Result, Self::Error> { + if request.protocol != Near::PROTOCOL { + return Err(NearError::UnsupportedProtocol( + request.protocol.into_owned(), + )); + } + let Some(network) = self.networks.get(&request.network_id) else { return Err(NearError::UnknownNetwork(request.network_id.into_owned())); }; diff --git a/crates/context/config/src/client/protocol/starknet.rs b/crates/context/config/src/client/protocol/starknet.rs index fb7f31ac3..c3a771088 100644 --- a/crates/context/config/src/client/protocol/starknet.rs +++ b/crates/context/config/src/client/protocol/starknet.rs @@ -130,6 +130,8 @@ impl<'a> StarknetTransport<'a> { #[derive(Debug, Error)] #[non_exhaustive] pub enum StarknetError { + #[error("unsupported protocol: {0}")] + UnsupportedProtocol(String), #[error("unknown network `{0}`")] UnknownNetwork(String), #[error("invalid response from RPC while {operation}")] @@ -174,6 +176,12 @@ impl Transport for StarknetTransport<'_> { request: TransportRequest<'_>, payload: Vec, ) -> Result, Self::Error> { + if request.protocol != Starknet::PROTOCOL { + return Err(StarknetError::UnsupportedProtocol( + request.protocol.into_owned(), + )); + } + let Some(network) = self.networks.get(&request.network_id) else { return Err(StarknetError::UnknownNetwork( request.network_id.into_owned(), diff --git a/crates/context/config/src/client/transport.rs b/crates/context/config/src/client/transport.rs index af7dbfb38..3a5156137 100644 --- a/crates/context/config/src/client/transport.rs +++ b/crates/context/config/src/client/transport.rs @@ -18,21 +18,6 @@ pub trait Transport { ) -> Result, Self::Error>; } -impl Transport for Either { - type Error = Either; - - async fn send( - &self, - request: TransportRequest<'_>, - payload: Vec, - ) -> Result, Self::Error> { - match self { - Self::Left(left) => left.send(request, payload).await.map_err(Either::Left), - Self::Right(right) => right.send(request, payload).await.map_err(Either::Right), - } - } -} - #[derive(Debug)] #[non_exhaustive] pub struct TransportRequest<'a> { @@ -59,6 +44,34 @@ impl<'a> TransportRequest<'a> { } } +#[derive(Debug, Error)] +pub enum EitherError { + #[error(transparent)] + Left(L), + #[error(transparent)] + Right(R), + #[error("unsupported protocol: {0}")] + UnsupportedProtocol(String), +} + +impl Transport for Either { + type Error = EitherError; + + async fn send( + &self, + request: TransportRequest<'_>, + payload: Vec, + ) -> Result, Self::Error> { + match self { + Self::Left(left) => left.send(request, payload).await.map_err(EitherError::Left), + Self::Right(right) => right + .send(request, payload) + .await + .map_err(EitherError::Right), + } + } +} + #[derive(Debug, Serialize, Deserialize)] #[expect(clippy::exhaustive_enums, reason = "Considered to be exhaustive")] pub enum Operation<'a> { @@ -83,22 +96,12 @@ pub struct Both { pub right: R, } -#[derive(Debug, Error)] -pub enum BothError { - #[error("left error: {0}")] - Left(L), - #[error("right error: {0}")] - Right(R), - #[error("unsupported protocol: {0}")] - UnsupportedProtocol(String), -} - impl Transport for Both where L: AssociatedTransport, R: AssociatedTransport, { - type Error = BothError; + type Error = EitherError; async fn send( &self, @@ -109,14 +112,14 @@ where self.left .send(request, payload) .await - .map_err(BothError::Left) + .map_err(EitherError::Left) } else if request.protocol == R::protocol() { self.right .send(request, payload) .await - .map_err(BothError::Right) + .map_err(EitherError::Right) } else { - return Err(BothError::UnsupportedProtocol( + return Err(EitherError::UnsupportedProtocol( request.protocol.into_owned(), )); } diff --git a/crates/context/src/lib.rs b/crates/context/src/lib.rs index e7dd4c717..827aff601 100644 --- a/crates/context/src/lib.rs +++ b/crates/context/src/lib.rs @@ -1005,8 +1005,4 @@ impl ContextManager { Ok(Some(stream)) } - - pub fn is_application_blob_installed(&self, blob_id: BlobId) -> EyreResult { - self.blob_manager.has(blob_id) - } } diff --git a/crates/network/src/stream.rs b/crates/network/src/stream.rs index 782dcf40e..a90e5fadd 100644 --- a/crates/network/src/stream.rs +++ b/crates/network/src/stream.rs @@ -7,7 +7,7 @@ use core::pin::Pin; use core::task::{Context, Poll}; use eyre::{bail, Result as EyreResult}; -use futures_util::{Sink as FuturesSink, SinkExt, Stream as FuturesStream}; +use futures_util::{Sink as FuturesSink, SinkExt, Stream as FuturesStream, StreamExt}; use libp2p::{PeerId, Stream as P2pStream, StreamProtocol}; use tokio::io::BufStream; use tokio_util::codec::Framed; @@ -43,9 +43,8 @@ impl Stream { impl FuturesStream for Stream { type Item = Result, CodecError>; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let inner = Pin::new(&mut self.get_mut().inner); - inner.poll_next(cx) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_next_unpin(cx) } } diff --git a/crates/network/src/stream/codec.rs b/crates/network/src/stream/codec.rs index e3e280d0f..08ecce9d8 100644 --- a/crates/network/src/stream/codec.rs +++ b/crates/network/src/stream/codec.rs @@ -25,9 +25,9 @@ impl<'a> Message<'a> { } #[derive(Debug, ThisError)] -#[error("CodecError")] #[non_exhaustive] pub enum CodecError { + #[error(transparent)] StdIo(#[from] IoError), } diff --git a/crates/node/src/catchup.rs b/crates/node/src/catchup.rs deleted file mode 100644 index 58c9c5b45..000000000 --- a/crates/node/src/catchup.rs +++ /dev/null @@ -1,354 +0,0 @@ -use std::io::{Error as StdIoError, ErrorKind as StdIoErrorKind}; - -use calimero_network::stream::{Message, Stream}; -use calimero_primitives::application::Application; -use calimero_primitives::context::{Context, ContextId}; -use eyre::{bail, Result as EyreResult}; -use futures_util::io::BufReader; -use futures_util::stream::poll_fn; -use futures_util::{SinkExt, StreamExt, TryStreamExt}; -use libp2p::gossipsub::TopicHash; -use libp2p::PeerId; -use rand::seq::SliceRandom; -use rand::thread_rng; -use serde_json::{from_slice as from_json_slice, to_vec as to_json_vec}; -use tokio::spawn; -use tokio::sync::mpsc; -use tokio::time::timeout; -use tracing::{error, info, warn}; -use url::Url; - -use crate::catchup::blobs::ApplicationBlobChunkSender; -use crate::types::{ - ActionMessage, CatchupActionsBatch, CatchupApplicationBlobChunk, CatchupApplicationBlobRequest, - CatchupError, CatchupStreamMessage, CatchupSyncRequest, -}; -use crate::Node; - -// mod actions; -mod blobs; - -impl Node { - pub(crate) async fn handle_opened_stream(&self, mut stream: Box) -> EyreResult<()> { - let Some(message) = stream.next().await else { - bail!("Stream closed unexpectedly") - }; - - match from_json_slice(&message?.data)? { - CatchupStreamMessage::SyncRequest(req) => self.handle_action_catchup(req, stream).await, - CatchupStreamMessage::ApplicationBlobRequest(req) => { - self.handle_blob_catchup(req, stream).await - } - message @ (CatchupStreamMessage::ActionsBatch(_) - | CatchupStreamMessage::ApplicationBlobChunk(_) - | CatchupStreamMessage::Error(_)) => { - bail!("Unexpected message: {:?}", message) - } - } - } - - async fn handle_blob_catchup( - &self, - request: CatchupApplicationBlobRequest, - mut stream: Box, - ) -> Result<(), eyre::Error> { - let Some(mut blob) = self - .ctx_manager - .get_application_blob(&request.application_id)? - else { - let message = to_json_vec(&CatchupStreamMessage::Error( - CatchupError::ApplicationNotFound { - application_id: request.application_id, - }, - ))?; - stream.send(Message::new(message)).await?; - - return Ok(()); - }; - - info!( - request=?request, - "Processing application blob catchup request", - ); - - let mut blob_sender = ApplicationBlobChunkSender::new(stream); - - while let Some(chunk) = blob.try_next().await? { - blob_sender.send(&chunk).await?; - } - - blob_sender.flush().await - } - - async fn handle_action_catchup( - &self, - request: CatchupSyncRequest, - mut stream: Box, - ) -> EyreResult<()> { - let Some(context) = self.ctx_manager.get_context(&request.context_id)? else { - let message = to_json_vec(&CatchupStreamMessage::Error( - CatchupError::ContextNotFound { - context_id: request.context_id, - }, - ))?; - stream.send(Message::new(message)).await?; - - return Ok(()); - }; - - info!( - request=?request, - root_hash=%context.root_hash, - "Processing context catchup request", - ); - - let _handle = self.store.handle(); - - // TODO: If the root hashes don't match, we need to start a comparison - if context.root_hash != request.root_hash { - bail!("Root hash mismatch: TODO"); - } - - Ok(()) - } - - pub(crate) async fn perform_interval_catchup(&mut self) { - let Some(context_id) = self.ctx_manager.get_any_pending_catchup_context().await else { - return; - }; - - let peers = self - .network_client - .mesh_peers(TopicHash::from_raw(context_id)) - .await; - let Some(peer_id) = peers.choose(&mut thread_rng()) else { - return; - }; - - info!(%context_id, %peer_id, "Attempting to perform interval triggered catchup"); - - if let Err(err) = self.perform_catchup(context_id, *peer_id).await { - error!(%err, "Failed to perform interval catchup"); - return; - } - - let _ = self - .ctx_manager - .clear_context_pending_catchup(&context_id) - .await; - - info!(%context_id, %peer_id, "Interval triggered catchup successfully finished"); - } - - // TODO: Is this even needed now? Can it be removed? In theory, a sync will - // TODO: take place so long as there is a comparison - i.e. it will send - // TODO: everything back and forth until everything matches. But, for e.g. a - // TODO: first-time sync, that would be slower than just sending everything - // TODO: all at once. So... could this be utilised for that? - pub(crate) async fn perform_catchup( - &mut self, - context_id: ContextId, - chosen_peer: PeerId, - ) -> EyreResult<()> { - let Some(mut context) = self.ctx_manager.get_context(&context_id)? else { - bail!("catching up for non-existent context?"); - }; - - let latest_application = self.ctx_manager.get_latest_application(context_id).await?; - let local_application = self.ctx_manager.get_application(&latest_application.id)?; - - if local_application.map_or(true, |app| app.blob != latest_application.blob) - || !self - .ctx_manager - .is_application_blob_installed(latest_application.blob)? - { - self.perform_blob_catchup(chosen_peer, latest_application) - .await?; - } - - self.perform_action_catchup(chosen_peer, &mut context).await - } - - async fn perform_blob_catchup( - &self, - chosen_peer: PeerId, - latest_application: Application, - ) -> EyreResult<()> { - let source = Url::from(latest_application.source.clone()); - - match source.scheme() { - "http" | "https" => { - info!("Skipping blob catchup for HTTP/HTTPS source"); - Ok(()) - } - _ => { - self.perform_blob_stream_catchup(chosen_peer, latest_application) - .await - } - } - } - - async fn perform_blob_stream_catchup( - &self, - chosen_peer: PeerId, - latest_application: Application, - ) -> EyreResult<()> { - let mut stream = self.network_client.open_stream(chosen_peer).await?; - - let request = CatchupApplicationBlobRequest { - application_id: latest_application.id, - }; - - let data = to_json_vec(&CatchupStreamMessage::ApplicationBlobRequest(request))?; - - stream.send(Message::new(data)).await?; - - let (tx, mut rx) = mpsc::channel(100); - let mut current_sequential_id = 0; - - let chunk_stream = BufReader::new( - poll_fn(move |cx| rx.poll_recv(cx)) - .map(move |chunk: CatchupApplicationBlobChunk| { - if chunk.sequential_id != current_sequential_id { - return Err(StdIoError::new( - StdIoErrorKind::InvalidData, - format!( - "invalid sequential id, expected: {expected}, got: {got}", - expected = current_sequential_id, - got = chunk.sequential_id - ), - )); - } - - current_sequential_id = current_sequential_id.saturating_add(1); - - Ok(chunk.chunk) - }) - .into_async_read(), - ); - - let ctx_manager = self.ctx_manager.clone(); - let metadata = latest_application.metadata.clone(); - - let handle = spawn(async move { - ctx_manager - .install_application_from_stream( - latest_application.size, - chunk_stream, - &latest_application.source, - metadata, - ) - .await - .map(|_| ()) - }); - - loop { - match timeout( - self.network_client.catchup_config.receive_timeout, - stream.next(), - ) - .await - { - Ok(message) => match message { - Some(message) => match from_json_slice(&message?.data)? { - CatchupStreamMessage::ApplicationBlobChunk(chunk) => { - tx.send(chunk).await?; - } - message @ (CatchupStreamMessage::ActionsBatch(_) - | CatchupStreamMessage::SyncRequest(_) - | CatchupStreamMessage::ApplicationBlobRequest(_)) => { - warn!("Ignoring unexpected message: {:?}", message); - } - CatchupStreamMessage::Error(err) => { - error!(?err, "Received error during application blob catchup"); - bail!(err); - } - }, - None => break, - }, - Err(err) => { - bail!("Failed to await application blob chunk message: {}", err) - } - } - } - - drop(tx); - - handle.await? - } - - async fn perform_action_catchup( - &mut self, - chosen_peer: PeerId, - context: &mut Context, - ) -> EyreResult<()> { - let request = CatchupSyncRequest { - context_id: context.id, - root_hash: context.root_hash, - }; - - let mut stream = self.network_client.open_stream(chosen_peer).await?; - - let data = to_json_vec(&CatchupStreamMessage::SyncRequest(request))?; - - stream.send(Message::new(data)).await?; - - loop { - match timeout( - self.network_client.catchup_config.receive_timeout, - stream.next(), - ) - .await - { - Ok(message) => match message { - Some(message) => match from_json_slice(&message?.data)? { - CatchupStreamMessage::ActionsBatch(batch) => { - self.apply_actions_batch(chosen_peer, context, batch) - .await?; - } - message @ (CatchupStreamMessage::ApplicationBlobChunk(_) - | CatchupStreamMessage::SyncRequest(_) - | CatchupStreamMessage::ApplicationBlobRequest(_)) => { - warn!("Ignoring unexpected message: {:?}", message); - } - CatchupStreamMessage::Error(err) => { - error!(?err, "Received error during action catchup"); - bail!(err); - } - }, - None => break, - }, - Err(err) => bail!("Failed to await actions catchup message: {}", err), - } - } - - Ok(()) - } - - async fn apply_actions_batch( - &mut self, - // TODO: How should this be used? - _chosen_peer: PeerId, - context: &mut Context, - batch: CatchupActionsBatch, - ) -> EyreResult<()> { - info!( - context_id=%context.id, - actions=%batch.actions.len(), - "Processing catchup actions batch" - ); - - for ActionMessage { - actions, - public_key, - .. - } in batch.actions - { - for action in actions { - self.apply_action(context, &action, public_key).await?; - } - } - - Ok(()) - } -} diff --git a/crates/node/src/catchup/blobs.rs b/crates/node/src/catchup/blobs.rs deleted file mode 100644 index 8e21b4b7e..000000000 --- a/crates/node/src/catchup/blobs.rs +++ /dev/null @@ -1,67 +0,0 @@ -use core::mem::take; - -use calimero_blobstore::CHUNK_SIZE as BLOB_CHUNK_SIZE; -use calimero_network::stream::{Message, Stream, MAX_MESSAGE_SIZE as MAX_STREAM_MESSAGE_SIZE}; -use eyre::Result as EyreResult; -use futures_util::SinkExt; -use serde_json::to_vec as to_json_vec; - -use crate::types::{CatchupApplicationBlobChunk, CatchupStreamMessage}; - -pub struct ApplicationBlobChunkSender { - batch_size: usize, - batch: Vec, - stream: Box, - sequential_id: u64, -} - -impl ApplicationBlobChunkSender { - #[expect(clippy::integer_division, reason = "TODO")] - pub(crate) fn new(stream: Box) -> Self { - // Stream messages are encoded with length delimited codec. - // Calculate batch size based on the maximum message size and blob chunk size. - // Leave some space for other fields in the message. - let batch_size = (MAX_STREAM_MESSAGE_SIZE / BLOB_CHUNK_SIZE) - 1; - - Self { - batch_size, - batch: Vec::with_capacity(batch_size * BLOB_CHUNK_SIZE), - stream, - sequential_id: 0, - } - } - - pub(crate) async fn send(&mut self, chunk: &[u8]) -> EyreResult<()> { - self.batch.extend_from_slice(chunk); - - if self.batch.len() >= self.batch_size.saturating_mul(BLOB_CHUNK_SIZE) { - let message = to_json_vec(&CatchupStreamMessage::ApplicationBlobChunk( - CatchupApplicationBlobChunk { - sequential_id: self.sequential_id, - chunk: take(&mut self.batch).into_boxed_slice(), - }, - ))?; - - self.stream.send(Message::new(message)).await?; - - self.sequential_id = self.sequential_id.saturating_add(1); - } - - Ok(()) - } - - pub(crate) async fn flush(&mut self) -> EyreResult<()> { - if !self.batch.is_empty() { - let message = to_json_vec(&CatchupStreamMessage::ApplicationBlobChunk( - CatchupApplicationBlobChunk { - sequential_id: self.sequential_id, - chunk: take(&mut self.batch).into_boxed_slice(), - }, - ))?; - - self.stream.send(Message::new(message)).await?; - } - - Ok(()) - } -} diff --git a/crates/node/src/interactive_cli.rs b/crates/node/src/interactive_cli.rs index 41373f98e..a5eaaf139 100644 --- a/crates/node/src/interactive_cli.rs +++ b/crates/node/src/interactive_cli.rs @@ -36,14 +36,18 @@ pub enum SubCommands { pub async fn handle_line(node: &mut Node, line: String) -> eyre::Result<()> { // IMPORTANT: Parser needs first string to be binary name - let mut args = vec![""]; + let mut args = vec!["{repl}"]; args.extend(line.split_whitespace()); + if args.len() == 1 { + return Ok(()); + } + let command = match RootCommand::try_parse_from(args) { Ok(command) => command, Err(err) => { - println!("Failed to parse command: {err}"); - eyre::bail!("Failed to parse command"); + println!("{err}"); + return Ok(()); } }; diff --git a/crates/node/src/interactive_cli/applications.rs b/crates/node/src/interactive_cli/applications.rs index e6cc2c479..9ac4d1d15 100644 --- a/crates/node/src/interactive_cli/applications.rs +++ b/crates/node/src/interactive_cli/applications.rs @@ -68,16 +68,21 @@ impl ApplicationCommand { } ApplicationSubcommand::Ls => { println!( - "{ind} {c1:44} | {c2:44} | Source", + "{ind} {c1:44} | {c2:44} | Installed | Source", c1 = "Application ID", c2 = "Blob ID", ); for application in node.ctx_manager.list_installed_applications()? { let entry = format!( - "{c1:44} | {c2:44} | {c3}", + "{c1:44} | {c2:44} | {c3:9} | {c4}", c1 = application.id, c2 = application.blob, - c3 = application.source + c3 = if node.ctx_manager.has_blob_available(application.blob)? { + "Yes" + } else { + "No" + }, + c4 = application.source ); for line in entry.lines() { println!("{ind} {}", line.cyan()); diff --git a/crates/node/src/interactive_cli/context.rs b/crates/node/src/interactive_cli/context.rs index 16b861a7c..d50de5655 100644 --- a/crates/node/src/interactive_cli/context.rs +++ b/crates/node/src/interactive_cli/context.rs @@ -50,7 +50,7 @@ impl ContextCommand { match self.command { Commands::Ls => { println!( - "{c1:44} | {c2:44} | Root Hash", + "{ind} {c1:44} | {c2:44} | Root Hash", c1 = "Context ID", c2 = "Application ID", ); diff --git a/crates/node/src/interactive_cli/identity.rs b/crates/node/src/interactive_cli/identity.rs index 063cd2e58..cebeeea72 100644 --- a/crates/node/src/interactive_cli/identity.rs +++ b/crates/node/src/interactive_cli/identity.rs @@ -22,6 +22,8 @@ enum IdentitySubcommands { impl IdentityCommand { pub fn run(self, node: &Node) -> Result<()> { + let ind = ">>".blue(); + match &self.subcommand { IdentitySubcommands::Ls { context_id } => { match ContextId::from_str(context_id) { @@ -41,7 +43,7 @@ impl IdentityCommand { Some((k, iter.read())) }; - println!("{:44} | Owned", "Identity"); + println!("{ind} {:44} | Owned", "Identity"); for (k, v) in first.into_iter().chain(iter.entries()) { let (k, v) = (k?, v?); @@ -53,23 +55,23 @@ impl IdentityCommand { let entry = format!( "{:44} | {}", k.public_key(), - if v.private_key.is_some() { "*" } else { " " }, + if v.private_key.is_some() { "Yes" } else { "No" }, ); for line in entry.lines() { - println!("{}", line.cyan()); + println!("{ind} {}", line.cyan()); } } } Err(_) => { - println!("Invalid context ID: {context_id}"); + println!("{ind} Invalid context ID: {context_id}"); } } } IdentitySubcommands::New => { // Handle the "new" subcommand let identity = node.ctx_manager.new_identity(); - println!("Private Key: {}", identity.cyan()); - println!("Public Key: {}", identity.public_key().cyan()); + println!("{ind} Private Key: {}", identity.cyan()); + println!("{ind} Public Key: {}", identity.public_key().cyan()); } } diff --git a/crates/node/src/interactive_cli/peers.rs b/crates/node/src/interactive_cli/peers.rs index 7f8e47cee..a6f58f15e 100644 --- a/crates/node/src/interactive_cli/peers.rs +++ b/crates/node/src/interactive_cli/peers.rs @@ -1,14 +1,15 @@ use std::sync::Arc; use calimero_network::client::NetworkClient; +use calimero_primitives::context::ContextId; use clap::Parser; use eyre::Result; use libp2p::gossipsub::TopicHash; use owo_colors::OwoColorize; -#[derive(Debug, Parser)] +#[derive(Copy, Clone, Debug, Parser)] pub struct PeersCommand { - topic: String, + topic: Option, } impl PeersCommand { @@ -19,12 +20,14 @@ impl PeersCommand { network_client.peer_count().await.cyan() ); - let topic = TopicHash::from_raw(self.topic.clone()); - println!( - "{ind} Peers (Session) for Topic {}: {:#?}", - topic.clone(), - network_client.mesh_peer_count(topic).await.cyan() - ); + if let Some(topic) = self.topic { + let topic = TopicHash::from_raw(topic); + println!( + "{ind} Peers (Session) for Topic {}: {:#?}", + topic.clone(), + network_client.mesh_peer_count(topic).await.cyan() + ); + } Ok(()) } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 343f9536e..49cbea275 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -148,7 +148,7 @@ pub async fn start(config: NodeConfig) -> EyreResult<()> { let mut catchup_interval_tick = interval_at( Instant::now() - .checked_add(Duration::from_millis(thread_rng().gen_range(0..1001))) + .checked_add(Duration::from_millis(thread_rng().gen_range(1000..5000))) .ok_or_else(|| eyre!("Overflow when calculating initial catchup interval delay"))?, config.sync.interval, ); @@ -347,8 +347,7 @@ impl Node { if let Some(derived_root_hash) = outcome.root_hash { if derived_root_hash != *root_hash { - self.initiate_state_sync_process(&mut context, source) - .await?; + self.initiate_sync(context_id, source).await?; } } diff --git a/crates/node/src/sync.rs b/crates/node/src/sync.rs index 6026e9634..938922547 100644 --- a/crates/node/src/sync.rs +++ b/crates/node/src/sync.rs @@ -6,7 +6,7 @@ use eyre::{bail, Result as EyreResult}; use futures_util::{SinkExt, StreamExt}; use libp2p::gossipsub::TopicHash; use libp2p::PeerId; -use rand::seq::SliceRandom; +use rand::seq::{IteratorRandom, SliceRandom}; use rand::thread_rng; use tokio::time::timeout; use tracing::{debug, error}; @@ -50,6 +50,10 @@ struct Sequencer { } impl Sequencer { + fn current(&self) -> usize { + self.current + } + fn test(&mut self, idx: usize) -> eyre::Result<()> { if self.current != idx { bail!( @@ -72,40 +76,59 @@ impl Sequencer { } impl Node { - async fn initiate_sync(&self, context_id: ContextId, chosen_peer: PeerId) -> EyreResult<()> { + pub(crate) async fn initiate_sync( + &self, + context_id: ContextId, + chosen_peer: PeerId, + ) -> EyreResult<()> { let mut context = self.ctx_manager.sync_context_config(context_id).await?; let Some(application) = self.ctx_manager.get_application(&context.application_id)? else { bail!("application not found: {}", context.application_id); }; + let identities = self.ctx_manager.get_context_owned_identities(context.id)?; + + let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else { + bail!("no identities found for context: {}", context.id); + }; + + let mut stream = self.network_client.open_stream(chosen_peer).await?; + if !self.ctx_manager.has_blob_available(application.blob)? { self.initiate_blob_share_process( &context, + our_identity, application.blob, application.size, - chosen_peer, + &mut stream, ) .await?; } - self.initiate_state_sync_process(&mut context, chosen_peer) + self.initiate_state_sync_process(&mut context, our_identity, &mut stream) .await } pub(crate) async fn handle_opened_stream(&self, mut stream: Box) { - if let Err(err) = self.internal_handle_opened_stream(&mut stream).await { - error!(%err, "Failed to handle stream message"); - - if let Err(err) = send(&mut stream, &StreamMessage::OpaqueError).await { - error!(%err, "Failed to send error message"); + loop { + match self.internal_handle_opened_stream(&mut stream).await { + Ok(None) => break, + Ok(Some(())) => {} + Err(err) => { + error!(%err, "Failed to handle stream message"); + + if let Err(err) = send(&mut stream, &StreamMessage::OpaqueError).await { + error!(%err, "Failed to send error message"); + } + } } } } - async fn internal_handle_opened_stream(&self, mut stream: &mut Stream) -> EyreResult<()> { - let Some(message) = recv(&mut stream, self.sync_config.timeout).await? else { - bail!("stream closed unexpectedly") + async fn internal_handle_opened_stream(&self, stream: &mut Stream) -> EyreResult> { + let Some(message) = recv(stream, self.sync_config.timeout).await? else { + return Ok(None); }; let (context_id, their_identity, payload) = match message { @@ -145,7 +168,7 @@ impl Node { match payload { InitPayload::BlobShare { blob_id } => { - self.handle_blob_share_request(context, their_identity, blob_id, &mut stream) + self.handle_blob_share_request(context, their_identity, blob_id, stream) .await? } InitPayload::StateSync { @@ -173,13 +196,13 @@ impl Node { their_identity, root_hash, application_id, - &mut stream, + stream, ) .await? } }; - Ok(()) + Ok(Some(())) } pub async fn perform_interval_sync(&self) { @@ -193,7 +216,7 @@ impl Node { break; } - debug!(%context_id, "Unable to perform interval sync for context, trying another"); + debug!(%context_id, "Unable to perform interval sync for context, trying another.."); } }; diff --git a/crates/node/src/sync/blobs.rs b/crates/node/src/sync/blobs.rs index dc492dafc..0611e9f2f 100644 --- a/crates/node/src/sync/blobs.rs +++ b/crates/node/src/sync/blobs.rs @@ -5,34 +5,26 @@ use calimero_primitives::identity::PublicKey; use eyre::bail; use futures_util::stream::poll_fn; use futures_util::TryStreamExt; -use libp2p::PeerId; use rand::seq::IteratorRandom; use rand::thread_rng; use tokio::sync::mpsc; -use tracing::debug; +use tracing::{debug, warn}; use super::{recv, send, Sequencer}; use crate::types::{InitPayload, MessagePayload, StreamMessage}; use crate::Node; impl Node { - pub async fn initiate_blob_share_process( + pub(super) async fn initiate_blob_share_process( &self, context: &Context, + our_identity: PublicKey, blob_id: BlobId, size: u64, - chosen_peer: PeerId, + stream: &mut Stream, ) -> eyre::Result<()> { - let identities = self.ctx_manager.get_context_owned_identities(context.id)?; - - let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else { - bail!("no identities found for context: {}", context.id); - }; - - let mut stream = self.network_client.open_stream(chosen_peer).await?; - send( - &mut stream, + stream, &StreamMessage::Init { context_id: context.id, party_id: our_identity, @@ -41,8 +33,8 @@ impl Node { ) .await?; - let Some(ack) = recv(&mut stream, self.sync_config.timeout).await? else { - bail!("no response to blob share request"); + let Some(ack) = recv(stream, self.sync_config.timeout).await? else { + bail!("connection closed while awaiting blob share handshake"); }; let _their_identity = match ack { @@ -82,7 +74,7 @@ impl Node { let read_task = async { let mut sequencer = Sequencer::default(); - while let Some(msg) = recv(&mut stream, self.sync_config.timeout).await? { + while let Some(msg) = recv(stream, self.sync_config.timeout).await? { let (sequence_id, chunk) = match msg { StreamMessage::OpaqueError => bail!("other peer ran into an error"), StreamMessage::Message { @@ -96,6 +88,10 @@ impl Node { sequencer.test(sequence_id)?; + if chunk.is_empty() { + break; + } + tx.send(Ok(chunk)).await?; } @@ -117,7 +113,7 @@ impl Node { Ok(()) } - pub async fn handle_blob_share_request( + pub(super) async fn handle_blob_share_request( &self, context: Context, their_identity: PublicKey, @@ -132,7 +128,9 @@ impl Node { ); let Some(mut blob) = self.ctx_manager.get_blob(blob_id)? else { - bail!("blob not found: {}", blob_id); + warn!(%blob_id, "blob not found"); + + return Ok(()); }; let identities = self.ctx_manager.get_context_owned_identities(context.id)?; @@ -166,6 +164,15 @@ impl Node { .await?; } + send( + stream, + &StreamMessage::Message { + sequence_id: sequencer.next(), + payload: MessagePayload::BlobShare { chunk: b"".into() }, + }, + ) + .await?; + Ok(()) } } diff --git a/crates/node/src/sync/state.rs b/crates/node/src/sync/state.rs index 37ad51409..b51d14842 100644 --- a/crates/node/src/sync/state.rs +++ b/crates/node/src/sync/state.rs @@ -6,7 +6,6 @@ use calimero_primitives::context::Context; use calimero_primitives::hash::Hash; use calimero_primitives::identity::PublicKey; use eyre::{bail, OptionExt}; -use libp2p::PeerId; use rand::seq::IteratorRandom; use rand::thread_rng; use tracing::debug; @@ -16,21 +15,14 @@ use crate::types::{InitPayload, MessagePayload, StreamMessage}; use crate::Node; impl Node { - pub async fn initiate_state_sync_process( + pub(super) async fn initiate_state_sync_process( &self, context: &mut Context, - chosen_peer: PeerId, + our_identity: PublicKey, + stream: &mut Stream, ) -> eyre::Result<()> { - let identities = self.ctx_manager.get_context_owned_identities(context.id)?; - - let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else { - bail!("no identities found for context: {}", context.id); - }; - - let mut stream = self.network_client.open_stream(chosen_peer).await?; - send( - &mut stream, + stream, &StreamMessage::Init { context_id: context.id, party_id: our_identity, @@ -42,8 +34,8 @@ impl Node { ) .await?; - let Some(ack) = recv(&mut stream, self.sync_config.timeout).await? else { - bail!("no response to state sync request"); + let Some(ack) = recv(stream, self.sync_config.timeout).await? else { + bail!("connection closed while awaiting state sync handshake"); }; let (root_hash, their_identity) = match ack { @@ -80,29 +72,23 @@ impl Node { let mut sqx_out = Sequencer::default(); send( - &mut stream, + stream, &StreamMessage::Message { sequence_id: sqx_out.next(), payload: MessagePayload::StateSync { - artifact: Cow::from(&[]), + artifact: b"".into(), }, }, ) .await?; - self.bidirectional_sync( - context, - our_identity, - their_identity, - &mut sqx_out, - &mut stream, - ) - .await?; + self.bidirectional_sync(context, our_identity, their_identity, &mut sqx_out, stream) + .await?; Ok(()) } - pub async fn handle_state_sync_request( + pub(super) async fn handle_state_sync_request( &self, context: Context, their_identity: PublicKey, @@ -188,6 +174,10 @@ impl Node { sqx_in.test(sequence_id)?; + if artifact.is_empty() && sqx_out.current() != 0 { + break; + } + let outcome = self .execute( context, @@ -204,10 +194,6 @@ impl Node { "State sync outcome", ); - if outcome.artifact.is_empty() { - break; - } - send( stream, &StreamMessage::Message {