Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: facilitate blob share during state sync if necessary #972

Merged
merged 6 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions crates/node/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,40 +193,45 @@ impl Node {
}
}

let identities = self.ctx_manager.get_context_owned_identities(context.id)?;

let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else {
bail!("no identities found for context: {}", context.id);
};

match payload {
InitPayload::KeyShare => {
self.handle_key_share_request(context, their_identity, stream)
self.handle_key_share_request(&context, our_identity, their_identity, stream)
.await?
}
InitPayload::BlobShare { blob_id } => {
self.handle_blob_share_request(context, their_identity, blob_id, stream)
.await?
self.handle_blob_share_request(
&context,
our_identity,
their_identity,
blob_id,
stream,
)
.await?
}
InitPayload::StateSync {
root_hash,
application_id,
root_hash: their_root_hash,
application_id: their_application_id,
} => {
if updated.is_none() && context.application_id != application_id {
if updated.is_none() && context.application_id != their_application_id {
updated = Some(self.ctx_manager.sync_context_config(context_id).await?);
}

if let Some(updated) = updated {
if application_id != updated.application_id {
bail!(
"application mismatch: expected {}, got {}",
updated.application_id,
application_id
);
}

context = updated;
}

self.handle_state_sync_request(
context,
&mut context,
our_identity,
their_identity,
root_hash,
application_id,
their_root_hash,
their_application_id,
stream,
)
.await?
Expand Down
35 changes: 26 additions & 9 deletions crates/node/src/sync/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use calimero_primitives::identity::PublicKey;
use eyre::{bail, OptionExt};
use futures_util::stream::poll_fn;
use futures_util::TryStreamExt;
use rand::seq::IteratorRandom;
use rand::thread_rng;
use tokio::sync::mpsc;
use tracing::{debug, warn};

Expand All @@ -24,6 +22,13 @@ impl Node {
size: u64,
stream: &mut Stream,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
our_identity=%our_identity,
blob_id=%blob_id,
"Initiating blob share",
);

send(
stream,
&StreamMessage::Init {
Expand Down Expand Up @@ -119,18 +124,28 @@ impl Node {
);
}

debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
blob_id=%blob_id,
"Blob share completed",
);

Ok(())
}

pub(super) async fn handle_blob_share_request(
&self,
context: Context,
context: &Context,
our_identity: PublicKey,
their_identity: PublicKey,
blob_id: BlobId,
stream: &mut Stream,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
blob_id=%blob_id,
"Received blob share request",
Expand All @@ -142,12 +157,6 @@ impl Node {
return Ok(());
};

let identities = self.ctx_manager.get_context_owned_identities(context.id)?;

let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else {
bail!("no identities found for context: {}", context.id);
};

let private_key = self
.ctx_manager
.get_private_key(context.id, our_identity)?
Expand Down Expand Up @@ -192,6 +201,14 @@ impl Node {
)
.await?;

debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
blob_id=%blob_id,
"Blob share completed",
);

Ok(())
}
}
36 changes: 20 additions & 16 deletions crates/node/src/sync/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use calimero_network::stream::Stream;
use calimero_primitives::context::Context;
use calimero_primitives::identity::PublicKey;
use eyre::{bail, OptionExt};
use rand::seq::IteratorRandom;
use rand::thread_rng;
use tracing::debug;

use crate::sync::{recv, send, Sequencer};
Expand All @@ -18,6 +16,12 @@ impl Node {
our_identity: PublicKey,
stream: &mut Stream,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
our_identity=%our_identity,
"Initiating key share",
);

send(
stream,
&StreamMessage::Init {
Expand Down Expand Up @@ -46,13 +50,14 @@ impl Node {
}
};

self.bidirectional_key_sync(context, our_identity, their_identity, stream)
self.bidirectional_key_share(context, our_identity, their_identity, stream)
.await
}

pub(super) async fn handle_key_share_request(
&self,
context: Context,
context: &Context,
our_identity: PublicKey,
their_identity: PublicKey,
stream: &mut Stream,
) -> eyre::Result<()> {
Expand All @@ -62,12 +67,6 @@ impl Node {
"Received key share request",
);

let identities = self.ctx_manager.get_context_owned_identities(context.id)?;

let Some(our_identity) = identities.into_iter().choose(&mut thread_rng()) else {
bail!("no identities found for context: {}", context.id);
};

send(
stream,
&StreamMessage::Init {
Expand All @@ -79,24 +78,22 @@ impl Node {
)
.await?;

let mut context = context;
self.bidirectional_key_sync(&mut context, our_identity, their_identity, stream)
self.bidirectional_key_share(context, our_identity, their_identity, stream)
.await
}

async fn bidirectional_key_sync(
async fn bidirectional_key_share(
&self,
context: &mut Context,
context: &Context,
our_identity: PublicKey,
their_identity: PublicKey,
stream: &mut Stream,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
our_root_hash=%context.root_hash,
our_identity=%our_identity,
their_identity=%their_identity,
"Starting bidirectional key sync",
"Starting bidirectional key share",
);

let private_key = self
Expand Down Expand Up @@ -146,6 +143,13 @@ impl Node {
self.ctx_manager
.update_sender_key(&context.id, &their_identity, &sender_key)?;

debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
"Key share completed",
);

Ok(())
}
}
Loading
Loading