Skip to content

Commit

Permalink
chore: facilitate blob share during state sync if necessary (#972)
Browse files Browse the repository at this point in the history
  • Loading branch information
miraclx authored Nov 19, 2024
1 parent b8f1352 commit 5654fee
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 89 deletions.
39 changes: 22 additions & 17 deletions crates/node/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,40 +193,45 @@ impl Node {
}
}

let identities = self.ctx_manager.get_context_owned_identities(context.id)?;

let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else {
bail!("no identities found for context: {}", context.id);
};

match payload {
InitPayload::KeyShare => {
self.handle_key_share_request(context, their_identity, stream)
self.handle_key_share_request(&context, our_identity, their_identity, stream)
.await?
}
InitPayload::BlobShare { blob_id } => {
self.handle_blob_share_request(context, their_identity, blob_id, stream)
.await?
self.handle_blob_share_request(
&context,
our_identity,
their_identity,
blob_id,
stream,
)
.await?
}
InitPayload::StateSync {
root_hash,
application_id,
root_hash: their_root_hash,
application_id: their_application_id,
} => {
if updated.is_none() && context.application_id != application_id {
if updated.is_none() && context.application_id != their_application_id {
updated = Some(self.ctx_manager.sync_context_config(context_id).await?);
}

if let Some(updated) = updated {
if application_id != updated.application_id {
bail!(
"application mismatch: expected {}, got {}",
updated.application_id,
application_id
);
}

context = updated;
}

self.handle_state_sync_request(
context,
&mut context,
our_identity,
their_identity,
root_hash,
application_id,
their_root_hash,
their_application_id,
stream,
)
.await?
Expand Down
35 changes: 26 additions & 9 deletions crates/node/src/sync/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use calimero_primitives::identity::PublicKey;
use eyre::{bail, OptionExt};
use futures_util::stream::poll_fn;
use futures_util::TryStreamExt;
use rand::seq::IteratorRandom;
use rand::thread_rng;
use tokio::sync::mpsc;
use tracing::{debug, warn};

Expand All @@ -24,6 +22,13 @@ impl Node {
size: u64,
stream: &mut Stream,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
our_identity=%our_identity,
blob_id=%blob_id,
"Initiating blob share",
);

send(
stream,
&StreamMessage::Init {
Expand Down Expand Up @@ -119,18 +124,28 @@ impl Node {
);
}

debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
blob_id=%blob_id,
"Blob share completed",
);

Ok(())
}

pub(super) async fn handle_blob_share_request(
&self,
context: Context,
context: &Context,
our_identity: PublicKey,
their_identity: PublicKey,
blob_id: BlobId,
stream: &mut Stream,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
blob_id=%blob_id,
"Received blob share request",
Expand All @@ -142,12 +157,6 @@ impl Node {
return Ok(());
};

let identities = self.ctx_manager.get_context_owned_identities(context.id)?;

let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else {
bail!("no identities found for context: {}", context.id);
};

let private_key = self
.ctx_manager
.get_private_key(context.id, our_identity)?
Expand Down Expand Up @@ -192,6 +201,14 @@ impl Node {
)
.await?;

debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
blob_id=%blob_id,
"Blob share completed",
);

Ok(())
}
}
36 changes: 20 additions & 16 deletions crates/node/src/sync/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use calimero_network::stream::Stream;
use calimero_primitives::context::Context;
use calimero_primitives::identity::PublicKey;
use eyre::{bail, OptionExt};
use rand::seq::IteratorRandom;
use rand::thread_rng;
use tracing::debug;

use crate::sync::{recv, send, Sequencer};
Expand All @@ -18,6 +16,12 @@ impl Node {
our_identity: PublicKey,
stream: &mut Stream,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
our_identity=%our_identity,
"Initiating key share",
);

send(
stream,
&StreamMessage::Init {
Expand Down Expand Up @@ -46,13 +50,14 @@ impl Node {
}
};

self.bidirectional_key_sync(context, our_identity, their_identity, stream)
self.bidirectional_key_share(context, our_identity, their_identity, stream)
.await
}

pub(super) async fn handle_key_share_request(
&self,
context: Context,
context: &Context,
our_identity: PublicKey,
their_identity: PublicKey,
stream: &mut Stream,
) -> eyre::Result<()> {
Expand All @@ -62,12 +67,6 @@ impl Node {
"Received key share request",
);

let identities = self.ctx_manager.get_context_owned_identities(context.id)?;

let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else {
bail!("no identities found for context: {}", context.id);
};

send(
stream,
&StreamMessage::Init {
Expand All @@ -79,24 +78,22 @@ impl Node {
)
.await?;

let mut context = context;
self.bidirectional_key_sync(&mut context, our_identity, their_identity, stream)
self.bidirectional_key_share(context, our_identity, their_identity, stream)
.await
}

async fn bidirectional_key_sync(
async fn bidirectional_key_share(
&self,
context: &mut Context,
context: &Context,
our_identity: PublicKey,
their_identity: PublicKey,
stream: &mut Stream,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
our_root_hash=%context.root_hash,
our_identity=%our_identity,
their_identity=%their_identity,
"Starting bidirectional key sync",
"Starting bidirectional key share",
);

let private_key = self
Expand Down Expand Up @@ -146,6 +143,13 @@ impl Node {
self.ctx_manager
.update_sender_key(&context.id, &their_identity, &sender_key)?;

debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
"Key share completed",
);

Ok(())
}
}
Loading

0 comments on commit 5654fee

Please sign in to comment.