diff --git a/crates/network/src/stream.rs b/crates/network/src/stream.rs index 782dcf40e..a90e5fadd 100644 --- a/crates/network/src/stream.rs +++ b/crates/network/src/stream.rs @@ -7,7 +7,7 @@ use core::pin::Pin; use core::task::{Context, Poll}; use eyre::{bail, Result as EyreResult}; -use futures_util::{Sink as FuturesSink, SinkExt, Stream as FuturesStream}; +use futures_util::{Sink as FuturesSink, SinkExt, Stream as FuturesStream, StreamExt}; use libp2p::{PeerId, Stream as P2pStream, StreamProtocol}; use tokio::io::BufStream; use tokio_util::codec::Framed; @@ -43,9 +43,8 @@ impl Stream { impl FuturesStream for Stream { type Item = Result, CodecError>; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let inner = Pin::new(&mut self.get_mut().inner); - inner.poll_next(cx) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_next_unpin(cx) } } diff --git a/crates/node/src/sync.rs b/crates/node/src/sync.rs index 401166d36..938922547 100644 --- a/crates/node/src/sync.rs +++ b/crates/node/src/sync.rs @@ -50,6 +50,10 @@ struct Sequencer { } impl Sequencer { + fn current(&self) -> usize { + self.current + } + fn test(&mut self, idx: usize) -> eyre::Result<()> { if self.current != idx { bail!( @@ -107,18 +111,24 @@ impl Node { } pub(crate) async fn handle_opened_stream(&self, mut stream: Box) { - if let Err(err) = self.internal_handle_opened_stream(&mut stream).await { - error!(%err, "Failed to handle stream message"); - - if let Err(err) = send(&mut stream, &StreamMessage::OpaqueError).await { - error!(%err, "Failed to send error message"); + loop { + match self.internal_handle_opened_stream(&mut stream).await { + Ok(None) => break, + Ok(Some(())) => {} + Err(err) => { + error!(%err, "Failed to handle stream message"); + + if let Err(err) = send(&mut stream, &StreamMessage::OpaqueError).await { + error!(%err, "Failed to send error message"); + } + } } } } - async fn internal_handle_opened_stream(&self, mut stream: &mut Stream) -> EyreResult<()> { - let Some(message) = recv(&mut stream, self.sync_config.timeout).await? else { - bail!("stream closed unexpectedly") + async fn internal_handle_opened_stream(&self, stream: &mut Stream) -> EyreResult> { + let Some(message) = recv(stream, self.sync_config.timeout).await? else { + return Ok(None); }; let (context_id, their_identity, payload) = match message { @@ -158,7 +168,7 @@ impl Node { match payload { InitPayload::BlobShare { blob_id } => { - self.handle_blob_share_request(context, their_identity, blob_id, &mut stream) + self.handle_blob_share_request(context, their_identity, blob_id, stream) .await? } InitPayload::StateSync { @@ -186,13 +196,13 @@ impl Node { their_identity, root_hash, application_id, - &mut stream, + stream, ) .await? } }; - Ok(()) + Ok(Some(())) } pub async fn perform_interval_sync(&self) { @@ -206,11 +216,11 @@ impl Node { break; } - debug!(%context_id, "Unable to perform interval sync for context, trying another"); + debug!(%context_id, "Unable to perform interval sync for context, trying another.."); } }; - if timeout(self.sync_config.interval * 2, task).await.is_err() { + if timeout(self.sync_config.interval, task).await.is_err() { error!("Timeout while performing interval sync"); } } diff --git a/crates/node/src/sync/blobs.rs b/crates/node/src/sync/blobs.rs index b2db61fe3..0611e9f2f 100644 --- a/crates/node/src/sync/blobs.rs +++ b/crates/node/src/sync/blobs.rs @@ -8,7 +8,7 @@ use futures_util::TryStreamExt; use rand::seq::IteratorRandom; use rand::thread_rng; use tokio::sync::mpsc; -use tracing::debug; +use tracing::{debug, warn}; use super::{recv, send, Sequencer}; use crate::types::{InitPayload, MessagePayload, StreamMessage}; @@ -34,7 +34,7 @@ impl Node { .await?; let Some(ack) = recv(stream, self.sync_config.timeout).await? else { - bail!("no response to blob share request"); + bail!("connection closed while awaiting blob share handshake"); }; let _their_identity = match ack { @@ -88,6 +88,10 @@ impl Node { sequencer.test(sequence_id)?; + if chunk.is_empty() { + break; + } + tx.send(Ok(chunk)).await?; } @@ -124,7 +128,9 @@ impl Node { ); let Some(mut blob) = self.ctx_manager.get_blob(blob_id)? else { - bail!("blob not found: {}", blob_id); + warn!(%blob_id, "blob not found"); + + return Ok(()); }; let identities = self.ctx_manager.get_context_owned_identities(context.id)?; @@ -158,6 +164,15 @@ impl Node { .await?; } + send( + stream, + &StreamMessage::Message { + sequence_id: sequencer.next(), + payload: MessagePayload::BlobShare { chunk: b"".into() }, + }, + ) + .await?; + Ok(()) } } diff --git a/crates/node/src/sync/state.rs b/crates/node/src/sync/state.rs index 8d24d0d9b..b51d14842 100644 --- a/crates/node/src/sync/state.rs +++ b/crates/node/src/sync/state.rs @@ -35,7 +35,7 @@ impl Node { .await?; let Some(ack) = recv(stream, self.sync_config.timeout).await? else { - bail!("no response to state sync request"); + bail!("connection closed while awaiting state sync handshake"); }; let (root_hash, their_identity) = match ack { @@ -76,7 +76,7 @@ impl Node { &StreamMessage::Message { sequence_id: sqx_out.next(), payload: MessagePayload::StateSync { - artifact: Cow::from(&[]), + artifact: b"".into(), }, }, ) @@ -174,6 +174,10 @@ impl Node { sqx_in.test(sequence_id)?; + if artifact.is_empty() && sqx_out.current() != 0 { + break; + } + let outcome = self .execute( context, @@ -190,10 +194,6 @@ impl Node { "State sync outcome", ); - if outcome.artifact.is_empty() { - break; - } - send( stream, &StreamMessage::Message {