Skip to content

Commit

Permalink
renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
petarjuki7 committed Nov 26, 2024
1 parent 18926c3 commit 6c6e2f7
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 52 deletions.
30 changes: 16 additions & 14 deletions crates/node/src/sync/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use calimero_crypto::{Nonce, SharedKey};
use calimero_crypto::{Nonce, SharedKey, NONCE_LEN};
use calimero_network::stream::Stream;
use calimero_primitives::blobs::BlobId;
use calimero_primitives::context::Context;
Expand Down Expand Up @@ -30,15 +30,15 @@ impl Node {
"Initiating blob share",
);

let nonce = thread_rng().gen::<Nonce>();
let our_nonce = thread_rng().gen::<Nonce>();

send(
stream,
&StreamMessage::Init {
context_id: context.id,
party_id: our_identity,
payload: InitPayload::BlobShare { blob_id },
next_nonce: nonce,
next_nonce: our_nonce,
},
None,
)
Expand All @@ -48,7 +48,7 @@ impl Node {
bail!("connection closed while awaiting blob share handshake");
};

let (their_identity, mut nonce) = match ack {
let (their_identity, mut their_nonce) = match ack {
StreamMessage::Init {
party_id,
payload:
Expand Down Expand Up @@ -93,10 +93,14 @@ impl Node {
let read_task = async {
let mut sequencer = Sequencer::default();

while let Some(msg) =
recv(stream, self.sync_config.timeout, Some((shared_key, nonce))).await?
while let Some(msg) = recv(
stream,
self.sync_config.timeout,
Some((shared_key, their_nonce)),
)
.await?
{
let (sequence_id, chunk, new_nonce) = match msg {
let (sequence_id, chunk, their_new_nonce) = match msg {
StreamMessage::OpaqueError => bail!("other peer ran into an error"),
StreamMessage::Message {
sequence_id,
Expand All @@ -116,7 +120,7 @@ impl Node {

tx.send(Ok(chunk)).await?;

nonce = new_nonce;
their_nonce = their_new_nonce;
}

drop(tx);
Expand Down Expand Up @@ -190,31 +194,29 @@ impl Node {
let mut sequencer = Sequencer::default();

while let Some(chunk) = blob.try_next().await? {
let new_nonce = thread_rng().gen::<Nonce>();
let next_nonce = thread_rng().gen::<Nonce>();
send(
stream,
&StreamMessage::Message {
sequence_id: sequencer.next(),
payload: MessagePayload::BlobShare {
chunk: chunk.into_vec().into(),
},
next_nonce: new_nonce,
next_nonce,
},
Some((shared_key, nonce)),
)
.await?;

nonce = new_nonce;
nonce = next_nonce;
}

let new_nonce = thread_rng().gen::<Nonce>();

send(
stream,
&StreamMessage::Message {
sequence_id: sequencer.next(),
payload: MessagePayload::BlobShare { chunk: b"".into() },
next_nonce: new_nonce,
next_nonce: [0; NONCE_LEN],
},
Some((shared_key, nonce)),
)
Expand Down
26 changes: 12 additions & 14 deletions crates/node/src/sync/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ impl Node {
"Initiating key share",
);

let nonce = thread_rng().gen::<Nonce>();
let our_nonce = thread_rng().gen::<Nonce>();

send(
stream,
&StreamMessage::Init {
context_id: context.id,
party_id: our_identity,
payload: InitPayload::KeyShare,
next_nonce: nonce,
next_nonce: our_nonce,
},
None,
)
Expand Down Expand Up @@ -60,7 +60,7 @@ impl Node {
our_identity,
their_identity,
stream,
nonce,
our_nonce,
their_nonce,
)
.await
Expand All @@ -72,25 +72,23 @@ impl Node {
our_identity: PublicKey,
their_identity: PublicKey,
stream: &mut Stream,
nonce: Nonce,
their_nonce: Nonce,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
their_identity=%their_identity,
"Received key share request",
);

let their_nonce = nonce;

let nonce = thread_rng().gen::<Nonce>();
let our_nonce = thread_rng().gen::<Nonce>();

send(
stream,
&StreamMessage::Init {
context_id: context.id,
party_id: our_identity,
payload: InitPayload::KeyShare,
next_nonce: nonce,
next_nonce: our_nonce,
},
None,
)
Expand All @@ -101,7 +99,7 @@ impl Node {
our_identity,
their_identity,
stream,
nonce,
our_nonce,
their_nonce,
)
.await
Expand All @@ -113,8 +111,8 @@ impl Node {
our_identity: PublicKey,
their_identity: PublicKey,
stream: &mut Stream,
sending_nonce: Nonce,
receiving_nonce: Nonce,
our_nonce: Nonce,
their_nonce: Nonce,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
Expand Down Expand Up @@ -142,16 +140,16 @@ impl Node {
&StreamMessage::Message {
sequence_id: sqx_out.next(),
payload: MessagePayload::KeyShare { sender_key },
next_nonce: sending_nonce, // the next nonce will not be used
next_nonce: our_nonce,
},
Some((shared_key, sending_nonce)),
Some((shared_key, our_nonce)),
)
.await?;

let Some(msg) = recv(
stream,
self.sync_config.timeout,
Some((shared_key, receiving_nonce)),
Some((shared_key, their_nonce)),
)
.await?
else {
Expand Down
45 changes: 21 additions & 24 deletions crates/node/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Node {
"Initiating state sync",
);

let nonce = thread_rng().gen::<Nonce>();
let our_nonce = thread_rng().gen::<Nonce>();

send(
stream,
Expand All @@ -40,7 +40,7 @@ impl Node {
root_hash: context.root_hash,
application_id: context.application_id,
},
next_nonce: nonce,
next_nonce: our_nonce,
},
None,
)
Expand Down Expand Up @@ -118,7 +118,7 @@ impl Node {
.ok_or_eyre("expected own identity to have private key")?;

let shared_key = SharedKey::new(&private_key, &their_identity);
let new_nonce = thread_rng().gen::<Nonce>();
let our_new_nonce = thread_rng().gen::<Nonce>();

send(
stream,
Expand All @@ -127,9 +127,9 @@ impl Node {
payload: MessagePayload::StateSync {
artifact: b"".into(),
},
next_nonce: new_nonce,
next_nonce: our_new_nonce,
},
Some((shared_key, nonce)),
Some((shared_key, our_nonce)),
)
.await?;
self.bidirectional_sync(
Expand All @@ -139,7 +139,7 @@ impl Node {
&mut sqx_out,
stream,
shared_key,
new_nonce,
our_new_nonce,
their_nonce,
)
.await?;
Expand All @@ -155,7 +155,7 @@ impl Node {
their_root_hash: Hash,
their_application_id: ApplicationId,
stream: &mut Stream,
nonce: Nonce,
their_nonce: Nonce,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
Expand Down Expand Up @@ -200,9 +200,7 @@ impl Node {
debug!(context_id=%context.id, "Resuming state sync");
}

let their_nonce = nonce;

let nonce = thread_rng().gen::<Nonce>();
let our_nonce = thread_rng().gen::<Nonce>();

send(
stream,
Expand All @@ -213,7 +211,7 @@ impl Node {
root_hash: context.root_hash,
application_id: context.application_id,
},
next_nonce: nonce,
next_nonce: our_nonce,
},
None,
)
Expand All @@ -229,7 +227,6 @@ impl Node {
.ok_or_eyre("expected own identity to have private key")?;

let shared_key = SharedKey::new(&private_key, &their_identity);
let nonce = thread_rng().gen::<Nonce>();

let mut sqx_out = Sequencer::default();

Expand All @@ -240,7 +237,7 @@ impl Node {
&mut sqx_out,
stream,
shared_key,
nonce,
our_nonce,
their_nonce,
)
.await
Expand All @@ -256,8 +253,8 @@ impl Node {
sqx_out: &mut Sequencer,
stream: &mut Stream,
shared_key: SharedKey,
sending_nonce: Nonce,
receiving_nonce: Nonce,
our_nonce: Nonce,
their_nonce: Nonce,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
Expand All @@ -268,17 +265,17 @@ impl Node {

let mut sqx_in = Sequencer::default();

let mut sending_nonce = sending_nonce;
let mut receiving_nonce = receiving_nonce;
let mut our_nonce = our_nonce;
let mut their_nonce = their_nonce;

while let Some(msg) = recv(
stream,
self.sync_config.timeout,
Some((shared_key, receiving_nonce)),
Some((shared_key, their_nonce)),
)
.await?
{
let (sequence_id, artifact, new_receiving_nonce) = match msg {
let (sequence_id, artifact, new_their_nonce) = match msg {
StreamMessage::OpaqueError => bail!("other peer ran into an error"),
StreamMessage::Message {
sequence_id,
Expand All @@ -290,7 +287,7 @@ impl Node {
}
};

receiving_nonce = new_receiving_nonce;
their_nonce = new_their_nonce;

sqx_in.test(sequence_id)?;

Expand All @@ -314,7 +311,7 @@ impl Node {
"State sync outcome",
);

let new_sending_nonce = thread_rng().gen::<Nonce>();
let new_our_nonce = thread_rng().gen::<Nonce>();

send(
stream,
Expand All @@ -323,13 +320,13 @@ impl Node {
payload: MessagePayload::StateSync {
artifact: Cow::from(&outcome.artifact),
},
next_nonce: new_sending_nonce,
next_nonce: new_our_nonce,
},
Some((shared_key, sending_nonce)),
Some((shared_key, our_nonce)),
)
.await?;

sending_nonce = new_sending_nonce;
our_nonce = new_our_nonce;
}

debug!(
Expand Down

0 comments on commit 6c6e2f7

Please sign in to comment.