Skip to content

Commit

Permalink
appropriately determine when to terminate stream
Browse files Browse the repository at this point in the history
  • Loading branch information
miraclx committed Nov 6, 2024
1 parent a287f5e commit ccb4c0e
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 26 deletions.
7 changes: 3 additions & 4 deletions crates/network/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use core::pin::Pin;
use core::task::{Context, Poll};

use eyre::{bail, Result as EyreResult};
use futures_util::{Sink as FuturesSink, SinkExt, Stream as FuturesStream};
use futures_util::{Sink as FuturesSink, SinkExt, Stream as FuturesStream, StreamExt};
use libp2p::{PeerId, Stream as P2pStream, StreamProtocol};
use tokio::io::BufStream;
use tokio_util::codec::Framed;
Expand Down Expand Up @@ -43,9 +43,8 @@ impl Stream {
impl FuturesStream for Stream {
type Item = Result<Message<'static>, CodecError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = Pin::new(&mut self.get_mut().inner);
inner.poll_next(cx)
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}

Expand Down
36 changes: 23 additions & 13 deletions crates/node/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ struct Sequencer {
}

impl Sequencer {
fn current(&self) -> usize {
self.current
}

fn test(&mut self, idx: usize) -> eyre::Result<()> {
if self.current != idx {
bail!(
Expand Down Expand Up @@ -107,18 +111,24 @@ impl Node {
}

pub(crate) async fn handle_opened_stream(&self, mut stream: Box<Stream>) {
if let Err(err) = self.internal_handle_opened_stream(&mut stream).await {
error!(%err, "Failed to handle stream message");

if let Err(err) = send(&mut stream, &StreamMessage::OpaqueError).await {
error!(%err, "Failed to send error message");
loop {
match self.internal_handle_opened_stream(&mut stream).await {
Ok(None) => break,
Ok(Some(())) => {}
Err(err) => {
error!(%err, "Failed to handle stream message");

if let Err(err) = send(&mut stream, &StreamMessage::OpaqueError).await {
error!(%err, "Failed to send error message");
}
}
}
}
}

async fn internal_handle_opened_stream(&self, mut stream: &mut Stream) -> EyreResult<()> {
let Some(message) = recv(&mut stream, self.sync_config.timeout).await? else {
bail!("stream closed unexpectedly")
async fn internal_handle_opened_stream(&self, stream: &mut Stream) -> EyreResult<Option<()>> {
let Some(message) = recv(stream, self.sync_config.timeout).await? else {
return Ok(None);
};

let (context_id, their_identity, payload) = match message {
Expand Down Expand Up @@ -158,7 +168,7 @@ impl Node {

match payload {
InitPayload::BlobShare { blob_id } => {
self.handle_blob_share_request(context, their_identity, blob_id, &mut stream)
self.handle_blob_share_request(context, their_identity, blob_id, stream)
.await?
}
InitPayload::StateSync {
Expand Down Expand Up @@ -186,13 +196,13 @@ impl Node {
their_identity,
root_hash,
application_id,
&mut stream,
stream,
)
.await?
}
};

Ok(())
Ok(Some(()))
}

pub async fn perform_interval_sync(&self) {
Expand All @@ -206,11 +216,11 @@ impl Node {
break;
}

debug!(%context_id, "Unable to perform interval sync for context, trying another");
debug!(%context_id, "Unable to perform interval sync for context, trying another..");
}
};

if timeout(self.sync_config.interval * 2, task).await.is_err() {
if timeout(self.sync_config.interval, task).await.is_err() {
error!("Timeout while performing interval sync");
}
}
Expand Down
21 changes: 18 additions & 3 deletions crates/node/src/sync/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures_util::TryStreamExt;
use rand::seq::IteratorRandom;
use rand::thread_rng;
use tokio::sync::mpsc;
use tracing::debug;
use tracing::{debug, warn};

use super::{recv, send, Sequencer};
use crate::types::{InitPayload, MessagePayload, StreamMessage};
Expand All @@ -34,7 +34,7 @@ impl Node {
.await?;

let Some(ack) = recv(stream, self.sync_config.timeout).await? else {
bail!("no response to blob share request");
bail!("connection closed while awaiting blob share handshake");
};

let _their_identity = match ack {
Expand Down Expand Up @@ -88,6 +88,10 @@ impl Node {

sequencer.test(sequence_id)?;

if chunk.is_empty() {
break;
}

tx.send(Ok(chunk)).await?;
}

Expand Down Expand Up @@ -124,7 +128,9 @@ impl Node {
);

let Some(mut blob) = self.ctx_manager.get_blob(blob_id)? else {
bail!("blob not found: {}", blob_id);
warn!(%blob_id, "blob not found");

return Ok(());
};

let identities = self.ctx_manager.get_context_owned_identities(context.id)?;
Expand Down Expand Up @@ -158,6 +164,15 @@ impl Node {
.await?;
}

send(
stream,
&StreamMessage::Message {
sequence_id: sequencer.next(),
payload: MessagePayload::BlobShare { chunk: b"".into() },
},
)
.await?;

Ok(())
}
}
12 changes: 6 additions & 6 deletions crates/node/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Node {
.await?;

let Some(ack) = recv(stream, self.sync_config.timeout).await? else {
bail!("no response to state sync request");
bail!("connection closed while awaiting state sync handshake");
};

let (root_hash, their_identity) = match ack {
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Node {
&StreamMessage::Message {
sequence_id: sqx_out.next(),
payload: MessagePayload::StateSync {
artifact: Cow::from(&[]),
artifact: b"".into(),
},
},
)
Expand Down Expand Up @@ -174,6 +174,10 @@ impl Node {

sqx_in.test(sequence_id)?;

if artifact.is_empty() && sqx_out.current() != 0 {
break;
}

let outcome = self
.execute(
context,
Expand All @@ -190,10 +194,6 @@ impl Node {
"State sync outcome",
);

if outcome.artifact.is_empty() {
break;
}

send(
stream,
&StreamMessage::Message {
Expand Down

0 comments on commit ccb4c0e

Please sign in to comment.