diff --git a/crates/crypto/src/lib.rs b/crates/crypto/src/lib.rs index 765fa2eed..9d3929bcb 100644 --- a/crates/crypto/src/lib.rs +++ b/crates/crypto/src/lib.rs @@ -2,17 +2,15 @@ use calimero_primitives::identity::{PrivateKey, PublicKey}; use ed25519_dalek::{SecretKey, SigningKey}; use ring::aead; +pub const NONCE_LEN: usize = 12; + +pub type Nonce = [u8; NONCE_LEN]; + #[derive(Copy, Clone, Debug)] pub struct SharedKey { key: SecretKey, } -#[derive(Debug)] -pub struct Record { - pub token: Vec, - pub nonce: [u8; 12], -} - impl SharedKey { pub fn new(sk: &PrivateKey, pk: &PublicKey) -> Self { SharedKey { @@ -29,7 +27,7 @@ impl SharedKey { SharedKey { key: **sk } } - pub fn encrypt(&self, payload: Vec, nonce: [u8; 12]) -> Option> { + pub fn encrypt(&self, payload: Vec, nonce: Nonce) -> Option> { let encryption_key = aead::LessSafeKey::new(aead::UnboundKey::new(&aead::AES_256_GCM, &self.key).ok()?); @@ -45,7 +43,7 @@ impl SharedKey { Some(cipher_text) } - pub fn decrypt(&self, cipher_text: Vec, nonce: [u8; aead::NONCE_LEN]) -> Option> { + pub fn decrypt(&self, cipher_text: Vec, nonce: Nonce) -> Option> { let decryption_key = aead::LessSafeKey::new(aead::UnboundKey::new(&aead::AES_256_GCM, &self.key).ok()?); @@ -68,12 +66,13 @@ impl SharedKey { #[cfg(test)] mod tests { use eyre::OptionExt; + use rand::thread_rng; use super::*; #[test] fn test_encrypt_decrypt() -> eyre::Result<()> { - let mut csprng = rand::thread_rng(); + let mut csprng = thread_rng(); let signer = PrivateKey::random(&mut csprng); let verifier = PrivateKey::random(&mut csprng); @@ -82,7 +81,7 @@ mod tests { let verifier_shared_key = SharedKey::new(&verifier, &signer.public_key()); let payload = b"privacy is important"; - let nonce = [0u8; aead::NONCE_LEN]; + let nonce = [0u8; NONCE_LEN]; let encrypted_payload = signer_shared_key .encrypt(payload.to_vec(), nonce) @@ -100,7 +99,7 @@ mod tests { #[test] fn test_decrypt_with_invalid_key() -> eyre::Result<()> { - let mut csprng = rand::thread_rng(); + let mut csprng = thread_rng(); let signer = PrivateKey::random(&mut csprng); let verifier = PrivateKey::random(&mut csprng); @@ -110,7 +109,7 @@ mod tests { let invalid_shared_key = SharedKey::new(&invalid, &invalid.public_key()); let token = b"privacy is important"; - let nonce = [0u8; aead::NONCE_LEN]; + let nonce = [0u8; NONCE_LEN]; let encrypted_token = signer_shared_key .encrypt(token.to_vec(), nonce) diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 010ad204d..fecb5e625 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -16,7 +16,7 @@ use calimero_context::config::ContextConfig; use calimero_context::ContextManager; use calimero_context_config::repr::ReprTransmute; use calimero_context_config::ProposalAction; -use calimero_crypto::SharedKey; +use calimero_crypto::{Nonce, SharedKey, NONCE_LEN}; use calimero_network::client::NetworkClient; use calimero_network::config::NetworkConfig; use calimero_network::types::{NetworkEvent, PeerId}; @@ -305,6 +305,7 @@ impl Node { author_id, root_hash, artifact, + nonce, } => { self.handle_state_delta( source, @@ -312,6 +313,7 @@ impl Node { author_id, root_hash, artifact.into_owned(), + nonce, ) .await?; } @@ -327,6 +329,7 @@ impl Node { author_id: PublicKey, root_hash: Hash, artifact: Vec, + nonce: [u8; NONCE_LEN], ) -> EyreResult<()> { let Some(mut context) = self.ctx_manager.get_context(&context_id)? else { bail!("context '{}' not found", context_id); @@ -344,7 +347,7 @@ impl Node { let shared_key = SharedKey::from_sk(&sender_key); let artifact = &shared_key - .decrypt(artifact, [0; 12]) + .decrypt(artifact, nonce) .ok_or_eyre("failed to decrypt message")?; let Some(outcome) = self @@ -386,9 +389,10 @@ impl Node { .ok_or_eyre("expected own identity to have sender key")?; let shared_key = SharedKey::from_sk(&sender_key); + let nonce = thread_rng().gen::(); let artifact_encrypted = shared_key - .encrypt(outcome.artifact.clone(), [0; 12]) + .encrypt(outcome.artifact.clone(), nonce) .ok_or_eyre("encryption failed")?; let message = to_vec(&BroadcastMessage::StateDelta { @@ -396,6 +400,7 @@ impl Node { author_id: executor_public_key, root_hash: context.root_hash, artifact: artifact_encrypted.as_slice().into(), + nonce, })?; let _ignored = self diff --git a/crates/node/src/sync.rs b/crates/node/src/sync.rs index 0f7b066bd..52c23b4ea 100644 --- a/crates/node/src/sync.rs +++ b/crates/node/src/sync.rs @@ -1,9 +1,9 @@ use std::time::Duration; -use calimero_crypto::SharedKey; +use calimero_crypto::{Nonce, SharedKey}; use calimero_network::stream::{Message, Stream}; use calimero_primitives::context::ContextId; -use eyre::{bail, Result as EyreResult}; +use eyre::{bail, eyre, OptionExt, Result as EyreResult}; use futures_util::{SinkExt, StreamExt}; use libp2p::gossipsub::TopicHash; use libp2p::PeerId; @@ -28,15 +28,14 @@ pub struct SyncConfig { async fn send( stream: &mut Stream, message: &StreamMessage<'_>, - shared_key: Option, + shared_key: Option<(SharedKey, Nonce)>, ) -> EyreResult<()> { let base_data = borsh::to_vec(message)?; let data = match shared_key { - Some(key) => match key.encrypt(base_data, [0; 12]) { - Some(data) => data, - None => bail!("encryption failed"), - }, + Some((key, nonce)) => key + .encrypt(base_data, nonce) + .ok_or_eyre("encryption failed")?, None => base_data, }; @@ -47,7 +46,7 @@ async fn send( async fn recv( stream: &mut Stream, duration: Duration, - shared_key: Option, + shared_key: Option<(SharedKey, Nonce)>, ) -> EyreResult>> { let Some(message) = timeout(duration, stream.next()).await? else { return Ok(None); @@ -56,10 +55,17 @@ async fn recv( let message_data = message?.data.into_owned(); let data = match shared_key { - Some(key) => match key.decrypt(message_data, [0; 12]) { - Some(data) => data, - None => bail!("decryption failed"), - }, + Some((key, nonce)) => { + match key.decrypt( + message_data, + nonce + .try_into() + .map_err(|_| eyre!("nonce must be 12 bytes"))?, + ) { + Some(data) => data, + None => bail!("decryption failed"), + } + } None => message_data, }; @@ -158,12 +164,14 @@ impl Node { return Ok(None); }; - let (context_id, their_identity, payload) = match message { + let (context_id, their_identity, payload, nonce) = match message { StreamMessage::Init { context_id, party_id, payload, - } => (context_id, party_id, payload), + next_nonce, + .. + } => (context_id, party_id, payload, next_nonce), unexpected @ (StreamMessage::Message { .. } | StreamMessage::OpaqueError) => { bail!("expected initialization handshake, got {:?}", unexpected) } @@ -201,7 +209,7 @@ impl Node { match payload { InitPayload::KeyShare => { - self.handle_key_share_request(&context, our_identity, their_identity, stream) + self.handle_key_share_request(&context, our_identity, their_identity, stream, nonce) .await? } InitPayload::BlobShare { blob_id } => { @@ -233,6 +241,7 @@ impl Node { their_root_hash, their_application_id, stream, + nonce, ) .await? } diff --git a/crates/node/src/sync/blobs.rs b/crates/node/src/sync/blobs.rs index 9731a098f..454b20f0c 100644 --- a/crates/node/src/sync/blobs.rs +++ b/crates/node/src/sync/blobs.rs @@ -1,4 +1,4 @@ -use calimero_crypto::SharedKey; +use calimero_crypto::{Nonce, SharedKey, NONCE_LEN}; use calimero_network::stream::Stream; use calimero_primitives::blobs::BlobId; use calimero_primitives::context::Context; @@ -6,6 +6,7 @@ use calimero_primitives::identity::PublicKey; use eyre::{bail, OptionExt}; use futures_util::stream::poll_fn; use futures_util::TryStreamExt; +use rand::{thread_rng, Rng}; use tokio::sync::mpsc; use tracing::{debug, warn}; @@ -29,12 +30,15 @@ impl Node { "Initiating blob share", ); + let our_nonce = thread_rng().gen::(); + send( stream, &StreamMessage::Init { context_id: context.id, party_id: our_identity, payload: InitPayload::BlobShare { blob_id }, + next_nonce: our_nonce, }, None, ) @@ -44,13 +48,14 @@ impl Node { bail!("connection closed while awaiting blob share handshake"); }; - let their_identity = match ack { + let (their_identity, mut their_nonce) = match ack { StreamMessage::Init { party_id, payload: InitPayload::BlobShare { blob_id: ack_blob_id, }, + next_nonce, .. } => { if ack_blob_id != blob_id { @@ -61,7 +66,7 @@ impl Node { ); } - party_id + (party_id, next_nonce) } unexpected @ (StreamMessage::Init { .. } | StreamMessage::Message { .. } @@ -88,13 +93,20 @@ impl Node { let read_task = async { let mut sequencer = Sequencer::default(); - while let Some(msg) = recv(stream, self.sync_config.timeout, Some(shared_key)).await? { - let (sequence_id, chunk) = match msg { + while let Some(msg) = recv( + stream, + self.sync_config.timeout, + Some((shared_key, their_nonce)), + ) + .await? + { + let (sequence_id, chunk, their_new_nonce) = match msg { StreamMessage::OpaqueError => bail!("other peer ran into an error"), StreamMessage::Message { sequence_id, payload: MessagePayload::BlobShare { chunk }, - } => (sequence_id, chunk), + next_nonce, + } => (sequence_id, chunk, next_nonce), unexpected @ (StreamMessage::Init { .. } | StreamMessage::Message { .. }) => { bail!("unexpected message: {:?}", unexpected) } @@ -107,6 +119,8 @@ impl Node { } tx.send(Ok(chunk)).await?; + + their_nonce = their_new_nonce; } drop(tx); @@ -163,6 +177,7 @@ impl Node { .ok_or_eyre("expected own identity to have private key")?; let shared_key = SharedKey::new(&private_key, &their_identity); + let mut our_nonce = thread_rng().gen::(); send( stream, @@ -170,6 +185,7 @@ impl Node { context_id: context.id, party_id: our_identity, payload: InitPayload::BlobShare { blob_id }, + next_nonce: our_nonce, }, None, ) @@ -178,6 +194,7 @@ impl Node { let mut sequencer = Sequencer::default(); while let Some(chunk) = blob.try_next().await? { + let our_new_nonce = thread_rng().gen::(); send( stream, &StreamMessage::Message { @@ -185,10 +202,13 @@ impl Node { payload: MessagePayload::BlobShare { chunk: chunk.into_vec().into(), }, + next_nonce: our_new_nonce, }, - Some(shared_key), + Some((shared_key, our_nonce)), ) .await?; + + our_nonce = our_new_nonce; } send( @@ -196,8 +216,9 @@ impl Node { &StreamMessage::Message { sequence_id: sequencer.next(), payload: MessagePayload::BlobShare { chunk: b"".into() }, + next_nonce: [0; NONCE_LEN], }, - Some(shared_key), + Some((shared_key, our_nonce)), ) .await?; diff --git a/crates/node/src/sync/key.rs b/crates/node/src/sync/key.rs index 9068710c6..0848bbe75 100644 --- a/crates/node/src/sync/key.rs +++ b/crates/node/src/sync/key.rs @@ -1,8 +1,9 @@ -use calimero_crypto::SharedKey; +use calimero_crypto::{Nonce, SharedKey}; use calimero_network::stream::Stream; use calimero_primitives::context::Context; use calimero_primitives::identity::PublicKey; use eyre::{bail, OptionExt}; +use rand::{thread_rng, Rng}; use tracing::debug; use crate::sync::{recv, send, Sequencer}; @@ -22,12 +23,15 @@ impl Node { "Initiating key share", ); + let our_nonce = thread_rng().gen::(); + send( stream, &StreamMessage::Init { context_id: context.id, party_id: our_identity, payload: InitPayload::KeyShare, + next_nonce: our_nonce, }, None, ) @@ -37,12 +41,13 @@ impl Node { bail!("connection closed while awaiting state sync handshake"); }; - let their_identity = match ack { + let (their_identity, their_nonce) = match ack { StreamMessage::Init { party_id, payload: InitPayload::KeyShare, + next_nonce, .. - } => party_id, + } => (party_id, next_nonce), unexpected @ (StreamMessage::Init { .. } | StreamMessage::Message { .. } | StreamMessage::OpaqueError) => { @@ -50,8 +55,15 @@ impl Node { } }; - self.bidirectional_key_share(context, our_identity, their_identity, stream) - .await + self.bidirectional_key_share( + context, + our_identity, + their_identity, + stream, + our_nonce, + their_nonce, + ) + .await } pub(super) async fn handle_key_share_request( @@ -60,6 +72,7 @@ impl Node { our_identity: PublicKey, their_identity: PublicKey, stream: &mut Stream, + their_nonce: Nonce, ) -> eyre::Result<()> { debug!( context_id=%context.id, @@ -67,19 +80,29 @@ impl Node { "Received key share request", ); + let our_nonce = thread_rng().gen::(); + send( stream, &StreamMessage::Init { context_id: context.id, party_id: our_identity, payload: InitPayload::KeyShare, + next_nonce: our_nonce, }, None, ) .await?; - self.bidirectional_key_share(context, our_identity, their_identity, stream) - .await + self.bidirectional_key_share( + context, + our_identity, + their_identity, + stream, + our_nonce, + their_nonce, + ) + .await } async fn bidirectional_key_share( @@ -88,6 +111,8 @@ impl Node { our_identity: PublicKey, their_identity: PublicKey, stream: &mut Stream, + our_nonce: Nonce, + their_nonce: Nonce, ) -> eyre::Result<()> { debug!( context_id=%context.id, @@ -115,12 +140,19 @@ impl Node { &StreamMessage::Message { sequence_id: sqx_out.next(), payload: MessagePayload::KeyShare { sender_key }, + next_nonce: our_nonce, }, - Some(shared_key), + Some((shared_key, our_nonce)), ) .await?; - let Some(msg) = recv(stream, self.sync_config.timeout, Some(shared_key)).await? else { + let Some(msg) = recv( + stream, + self.sync_config.timeout, + Some((shared_key, their_nonce)), + ) + .await? + else { bail!("connection closed while awaiting key share"); }; @@ -128,6 +160,7 @@ impl Node { StreamMessage::Message { sequence_id, payload: MessagePayload::KeyShare { sender_key }, + .. } => (sequence_id, sender_key), unexpected @ (StreamMessage::Init { .. } | StreamMessage::Message { .. } diff --git a/crates/node/src/sync/state.rs b/crates/node/src/sync/state.rs index 1761ca813..d723dc28b 100644 --- a/crates/node/src/sync/state.rs +++ b/crates/node/src/sync/state.rs @@ -1,12 +1,13 @@ use std::borrow::Cow; -use calimero_crypto::SharedKey; +use calimero_crypto::{Nonce, SharedKey}; use calimero_network::stream::Stream; use calimero_primitives::application::ApplicationId; use calimero_primitives::context::Context; use calimero_primitives::hash::Hash; use calimero_primitives::identity::PublicKey; use eyre::{bail, OptionExt}; +use rand::{thread_rng, Rng}; use tracing::debug; use crate::sync::{recv, send, Sequencer}; @@ -28,6 +29,8 @@ impl Node { "Initiating state sync", ); + let our_nonce = thread_rng().gen::(); + send( stream, &StreamMessage::Init { @@ -37,19 +40,20 @@ impl Node { root_hash: context.root_hash, application_id: context.application_id, }, + next_nonce: our_nonce, }, None, ) .await?; - let mut pair = None; + let mut triple = None; for _ in 1..=2 { let Some(ack) = recv(stream, self.sync_config.timeout, None).await? else { bail!("connection closed while awaiting state sync handshake"); }; - let (root_hash, their_identity) = match ack { + let (root_hash, their_identity, their_nonce) = match ack { StreamMessage::Init { party_id, payload: @@ -57,6 +61,7 @@ impl Node { root_hash, application_id, }, + next_nonce, .. } => { if application_id != context.application_id { @@ -67,7 +72,7 @@ impl Node { ); } - (root_hash, party_id) + (root_hash, party_id, next_nonce) } StreamMessage::Init { party_id: their_identity, @@ -92,12 +97,12 @@ impl Node { } }; - pair = Some((root_hash, their_identity)); + triple = Some((root_hash, their_identity, their_nonce)); break; } - let Some((root_hash, their_identity)) = pair else { + let Some((root_hash, their_identity, their_nonce)) = triple else { bail!("expected two state sync handshakes, got none"); }; @@ -113,6 +118,7 @@ impl Node { .ok_or_eyre("expected own identity to have private key")?; let shared_key = SharedKey::new(&private_key, &their_identity); + let our_new_nonce = thread_rng().gen::(); send( stream, @@ -121,11 +127,11 @@ impl Node { payload: MessagePayload::StateSync { artifact: b"".into(), }, + next_nonce: our_new_nonce, }, - Some(shared_key), + Some((shared_key, our_nonce)), ) .await?; - self.bidirectional_sync( context, our_identity, @@ -133,6 +139,8 @@ impl Node { &mut sqx_out, stream, shared_key, + our_new_nonce, + their_nonce, ) .await?; @@ -147,6 +155,7 @@ impl Node { their_root_hash: Hash, their_application_id: ApplicationId, stream: &mut Stream, + their_nonce: Nonce, ) -> eyre::Result<()> { debug!( context_id=%context.id, @@ -191,6 +200,8 @@ impl Node { debug!(context_id=%context.id, "Resuming state sync"); } + let our_nonce = thread_rng().gen::(); + send( stream, &StreamMessage::Init { @@ -200,6 +211,7 @@ impl Node { root_hash: context.root_hash, application_id: context.application_id, }, + next_nonce: our_nonce, }, None, ) @@ -225,6 +237,8 @@ impl Node { &mut sqx_out, stream, shared_key, + our_nonce, + their_nonce, ) .await @@ -239,6 +253,8 @@ impl Node { sqx_out: &mut Sequencer, stream: &mut Stream, shared_key: SharedKey, + mut our_nonce: Nonce, + mut their_nonce: Nonce, ) -> eyre::Result<()> { debug!( context_id=%context.id, @@ -249,18 +265,27 @@ impl Node { let mut sqx_in = Sequencer::default(); - while let Some(msg) = recv(stream, self.sync_config.timeout, Some(shared_key)).await? { - let (sequence_id, artifact) = match msg { + while let Some(msg) = recv( + stream, + self.sync_config.timeout, + Some((shared_key, their_nonce)), + ) + .await? + { + let (sequence_id, artifact, their_new_nonce) = match msg { StreamMessage::OpaqueError => bail!("other peer ran into an error"), StreamMessage::Message { sequence_id, payload: MessagePayload::StateSync { artifact }, - } => (sequence_id, artifact), + next_nonce, + } => (sequence_id, artifact, next_nonce), unexpected @ (StreamMessage::Init { .. } | StreamMessage::Message { .. }) => { bail!("unexpected message: {:?}", unexpected) } }; + their_nonce = their_new_nonce; + sqx_in.test(sequence_id)?; if artifact.is_empty() && sqx_out.current() != 0 { @@ -283,6 +308,8 @@ impl Node { "State sync outcome", ); + let our_new_nonce = thread_rng().gen::(); + send( stream, &StreamMessage::Message { @@ -290,10 +317,13 @@ impl Node { payload: MessagePayload::StateSync { artifact: Cow::from(&outcome.artifact), }, + next_nonce: our_new_nonce, }, - Some(shared_key), + Some((shared_key, our_nonce)), ) .await?; + + our_nonce = our_new_nonce; } debug!( diff --git a/crates/node/src/types.rs b/crates/node/src/types.rs index 1d02526c4..e07848555 100644 --- a/crates/node/src/types.rs +++ b/crates/node/src/types.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use borsh::{BorshDeserialize, BorshSerialize}; +use calimero_crypto::{Nonce, NONCE_LEN}; use calimero_primitives::application::ApplicationId; use calimero_primitives::blobs::BlobId; use calimero_primitives::context::ContextId; @@ -18,6 +19,7 @@ pub enum BroadcastMessage<'a> { author_id: PublicKey, root_hash: Hash, artifact: Cow<'a, [u8]>, + nonce: [u8; NONCE_LEN], }, } @@ -26,12 +28,13 @@ pub enum StreamMessage<'a> { Init { context_id: ContextId, party_id: PublicKey, - // nonce: usize, payload: InitPayload, + next_nonce: Nonce, }, Message { sequence_id: usize, payload: MessagePayload<'a>, + next_nonce: Nonce, }, /// Other peers must not learn anything about the node's state if anything goes wrong. OpaqueError,