Skip to content

Commit

Permalink
fix: race condition in simultaneous connect, NAT ID, FileKey Collisions
Browse files Browse the repository at this point in the history
  • Loading branch information
tbraun96 committed Nov 18, 2024
1 parent f510cb7 commit a3623be
Show file tree
Hide file tree
Showing 21 changed files with 202 additions and 176 deletions.
12 changes: 1 addition & 11 deletions async_ip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,7 @@ pub async fn get_ip_from(client: Option<Client>, addr: &str) -> Result<IpAddr, I
.text()
.await
.map_err(|err| IpRetrieveError::Error(err.to_string()))?;
IpAddr::from_str(text.as_str())
.map_err(|err| IpRetrieveError::Error(err.to_string()))
.and_then(|res| {
if res.is_ipv4() {
Err(IpRetrieveError::Error(
"This node does not have an ipv6 addr".to_string(),
))
} else {
Ok(res)
}
})
IpAddr::from_str(text.as_str()).map_err(|err| IpRetrieveError::Error(err.to_string()))
}

/// Gets the internal IP address using DNS
Expand Down
25 changes: 21 additions & 4 deletions citadel_crypt/src/endpoint_crypto_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::toolset::{Toolset, UpdateStatus};
use citadel_pqcrypto::constructor_opts::ConstructorOpts;
use citadel_types::crypto::CryptoParameters;
use citadel_types::crypto::SecurityLevel;
use citadel_types::prelude::ObjectId;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -21,7 +22,7 @@ pub struct PeerSessionCrypto<R: Ratchet = StackedRatchet> {
pub update_in_progress: Arc<AtomicBool>,
// if local is initiator, then in the case both nodes send a FastMessage at the same time (causing an update to the keys), the initiator takes preference, and the non-initiator's upgrade attempt gets dropped (if update_in_progress)
pub local_is_initiator: bool,
pub rolling_object_id: u64,
pub rolling_object_id: ObjectId,
pub rolling_group_id: u64,
pub lock_set_by_alice: Option<bool>,
/// Alice sends to Bob, then bob updates internally the toolset. However. Bob can't send packets to Alice quite yet using that newest version. He must first wait from Alice to commit on her end and wait for an ACK.
Expand Down Expand Up @@ -200,9 +201,25 @@ impl<R: Ratchet> PeerSessionCrypto<R> {
self.rolling_group_id.wrapping_sub(1)
}

pub fn get_and_increment_object_id(&mut self) -> u64 {
self.rolling_object_id = self.rolling_object_id.wrapping_add(1);
self.rolling_object_id.wrapping_sub(1)
pub fn get_and_increment_object_id(&mut self) -> i64 {
let next = if self.local_is_initiator {
let mut next_val = self.rolling_object_id.wrapping_add(1);
if next_val <= 0 {
next_val = 1
}

next_val
} else {
let mut next_val = self.rolling_object_id.wrapping_sub(1);
if next_val >= 0 {
next_val = -1
}

next_val
};

self.rolling_object_id = next;
next
}

/// Returns a new constructor only if a concurrent update isn't occurring
Expand Down
19 changes: 10 additions & 9 deletions citadel_crypt/src/scramble/crypt_splitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::entropy_bank::EntropyBank;
use crate::packet_vector::{generate_packet_vector, PacketVector};
use crate::prelude::CryptError;
use crate::stacked_ratchet::Ratchet;
pub use citadel_types::prelude::ObjectId;
#[cfg(not(target_family = "wasm"))]
use rayon::prelude::*;

Expand Down Expand Up @@ -62,7 +63,7 @@ pub fn generate_scrambler_metadata<T: AsRef<[u8]>>(
header_size_bytes: usize,
security_level: SecurityLevel,
group_id: u64,
object_id: u64,
object_id: ObjectId,
enx: EncryptionAlgorithm,
sig_alg: SigAlgorithm,
transfer_type: &TransferType,
Expand Down Expand Up @@ -141,7 +142,7 @@ fn get_scramble_encrypt_config<'a, R: Ratchet>(
header_size_bytes: usize,
security_level: SecurityLevel,
group_id: u64,
object_id: u64,
object_id: ObjectId,
transfer_type: &TransferType,
empty_transfer: bool,
) -> Result<
Expand Down Expand Up @@ -190,13 +191,13 @@ pub fn par_scramble_encrypt_group<T: AsRef<[u8]>, R: Ratchet, F, const N: usize>
static_aux_ratchet: &R,
header_size_bytes: usize,
target_cid: u64,
object_id: u64,
object_id: ObjectId,
group_id: u64,
transfer_type: TransferType,
header_inscriber: F,
) -> Result<GroupSenderDevice<N>, CryptError<String>>
where
F: Fn(&PacketVector, &EntropyBank, u64, u64, &mut BytesMut) + Send + Sync,
F: Fn(&PacketVector, &EntropyBank, ObjectId, u64, &mut BytesMut) + Send + Sync,
{
let mut plain_text = Cow::Borrowed(plain_text.as_ref());

Expand Down Expand Up @@ -303,9 +304,9 @@ fn scramble_encrypt_wave(
msg_pqc: &PostQuantumContainer,
scramble_drill: &EntropyBank,
target_cid: u64,
object_id: u64,
object_id: ObjectId,
header_size_bytes: usize,
header_inscriber: impl Fn(&PacketVector, &EntropyBank, u64, u64, &mut BytesMut) + Send + Sync,
header_inscriber: impl Fn(&PacketVector, &EntropyBank, ObjectId, u64, &mut BytesMut) + Send + Sync,
) -> Vec<(usize, PacketCoordinate)> {
let ciphertext = msg_drill
.encrypt(msg_pqc, bytes_to_encrypt_for_this_wave)
Expand Down Expand Up @@ -336,7 +337,7 @@ pub fn oneshot_unencrypted_group_unified<const N: usize>(
plain_text: SecureMessagePacket<N>,
header_size_bytes: usize,
group_id: u64,
object_id: u64,
object_id: ObjectId,
empty_transfer: bool,
) -> Result<GroupSenderDevice<N>, CryptError<String>> {
let len = plain_text.message_len() as u64;
Expand Down Expand Up @@ -435,7 +436,7 @@ pub struct GroupReceiverConfig {
// this is NOT inscribed; only for transmission
pub header_size_bytes: u64,
pub group_id: u64,
pub object_id: u64,
pub object_id: ObjectId,
// only relevant for files. Note: if transfer type is RemoteVirtualFileystem, then,
// the receiving endpoint won't decrypt the first level of encryption since the goal
// is to keep the file remotely encrypted
Expand All @@ -450,7 +451,7 @@ impl GroupReceiverConfig {
#[allow(clippy::too_many_arguments)]
pub fn new_refresh(
group_id: u64,
object_id: u64,
object_id: ObjectId,
header_size_bytes: u64,
plaintext_length: u64,
max_packet_payload_size: u32,
Expand Down
12 changes: 8 additions & 4 deletions citadel_crypt/src/streaming_crypt_scrambler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::stacked_ratchet::StackedRatchet;
use citadel_io::Mutex;
use citadel_io::{BlockingSpawn, BlockingSpawnError};
use citadel_types::crypto::SecurityLevel;
use citadel_types::prelude::ObjectId;
use citadel_types::proto::TransferType;
use futures::Future;
use num_integer::Integer;
Expand All @@ -41,11 +42,14 @@ impl FixedSizedSource for std::fs::File {

/// Generic function for inscribing headers on packets
pub trait HeaderInscriberFn:
for<'a> Fn(&'a PacketVector, &'a EntropyBank, u64, u64, &'a mut BytesMut) + Send + Sync + 'static
for<'a> Fn(&'a PacketVector, &'a EntropyBank, ObjectId, u64, &'a mut BytesMut)
+ Send
+ Sync
+ 'static
{
}
impl<
T: for<'a> Fn(&'a PacketVector, &'a EntropyBank, u64, u64, &'a mut BytesMut)
T: for<'a> Fn(&'a PacketVector, &'a EntropyBank, ObjectId, u64, &'a mut BytesMut)
+ Send
+ Sync
+ 'static,
Expand Down Expand Up @@ -162,7 +166,7 @@ impl<T: Into<Vec<u8>>> From<T> for BytesSource {
pub fn scramble_encrypt_source<S: ObjectSource, F: HeaderInscriberFn, const N: usize>(
mut source: S,
max_group_size: Option<usize>,
object_id: u64,
object_id: ObjectId,
group_sender: GroupChanneler<Result<GroupSenderDevice<N>, CryptError>>,
stop: Receiver<()>,
security_level: SecurityLevel,
Expand Down Expand Up @@ -266,7 +270,7 @@ struct AsyncCryptScrambler<F: HeaderInscriberFn, R: Read, const N: usize> {
transfer_type: TransferType,
file_len: usize,
read_cursor: usize,
object_id: u64,
object_id: ObjectId,
header_size_bytes: usize,
target_cid: u64,
group_id: u64,
Expand Down
10 changes: 8 additions & 2 deletions citadel_crypt/tests/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod tests {
AlgorithmsExt, CryptoParameters, EncryptionAlgorithm, KemAlgorithm, SecBuffer,
SigAlgorithm, KEM_ALGORITHM_COUNT,
};
use citadel_types::proto::TransferType;
use citadel_types::proto::{ObjectId, TransferType};
use rstest::rstest;
#[cfg(not(target_family = "wasm"))]
use std::path::PathBuf;
Expand Down Expand Up @@ -765,7 +765,13 @@ mod tests {
}

const HEADER_LEN: usize = 52;
fn header_inscribe(_: &PacketVector, _: &EntropyBank, _: u64, _: u64, packet: &mut BytesMut) {
fn header_inscribe(
_: &PacketVector,
_: &EntropyBank,
_: ObjectId,
_: u64,
packet: &mut BytesMut,
) {
for x in 0..HEADER_LEN {
packet.put_u8((x % 255) as u8)
}
Expand Down
2 changes: 1 addition & 1 deletion citadel_logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing_subscriber::EnvFilter;
/// Sets up the logging for any crate
pub fn setup_log() {
std::panic::set_hook(Box::new(|info| {
error!(target: "citadel", "Panic occurred: {:#?}", info);
error!(target: "citadel", "Panic occurred: {}", info);
std::process::exit(1);
}));

Expand Down
38 changes: 24 additions & 14 deletions citadel_proto/src/proto/packet_crafter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::proto::state_container::VirtualTargetType;
use citadel_crypt::scramble::crypt_splitter::oneshot_unencrypted_group_unified;
use citadel_crypt::secure_buffer::sec_packet::SecureMessagePacket;
use citadel_crypt::stacked_ratchet::{Ratchet, StackedRatchet};
use citadel_types::prelude::ObjectId;

#[derive(Debug)]
/// A thin wrapper used for convenient creation of zero-copy outgoing buffers
Expand Down Expand Up @@ -63,7 +64,7 @@ pub struct GroupTransmitter {
/// Contained within Self::group_transmitter, but is here for convenience
group_config: GroupReceiverConfig,
/// The ID of the object that is being transmitted
pub object_id: u64,
pub object_id: ObjectId,
pub group_id: u64,
/// For interfacing with the higher-level kernel
ticket: Ticket,
Expand Down Expand Up @@ -98,7 +99,7 @@ impl GroupTransmitter {
to_primary_stream: OutboundPrimaryStreamSender,
group_sender: GroupSenderDevice<HDP_HEADER_BYTE_LEN>,
hyper_ratchet: RatchetPacketCrafterContainer,
object_id: u64,
object_id: ObjectId,
ticket: Ticket,
security_level: SecurityLevel,
time_tracker: TimeTracker,
Expand Down Expand Up @@ -126,7 +127,7 @@ impl GroupTransmitter {
#[allow(clippy::too_many_arguments)]
pub fn new_message(
to_primary_stream: OutboundPrimaryStreamSender,
object_id: u64,
object_id: ObjectId,
hyper_ratchet: RatchetPacketCrafterContainer,
input_packet: SecureProtocolPacket,
security_level: SecurityLevel,
Expand Down Expand Up @@ -241,6 +242,7 @@ pub(crate) mod group {
use crate::proto::validation::group::{GroupHeader, GroupHeaderAck, WaveAck};
use citadel_crypt::endpoint_crypto_container::KemTransferStatus;
use citadel_crypt::stacked_ratchet::StackedRatchet;
use citadel_types::proto::ObjectId;
use citadel_user::serialization::SyncIO;
use std::ops::RangeInclusive;

Expand Down Expand Up @@ -296,7 +298,7 @@ pub(crate) mod group {
packet
};

packet.put_u64(processor.object_id);
packet.put_i64(processor.object_id);

processor
.hyper_ratchet_container
Expand All @@ -320,7 +322,7 @@ pub(crate) mod group {
hyper_ratchet: &StackedRatchet,
group_id: u64,
target_cid: u64,
object_id: u64,
object_id: ObjectId,
ticket: Ticket,
initial_wave_window: Option<RangeInclusive<u32>>,
fast_msg: bool,
Expand Down Expand Up @@ -362,12 +364,20 @@ pub(crate) mod group {
packet
}

pub(crate) fn pack_i64_to_u128(value: i64) -> u128 {
value as u128 & 0xFFFFFFFFFFFFFFFF
}

pub(crate) fn unpack_i64_from_u128(packed: u128) -> i64 {
(packed & 0xFFFFFFFFFFFFFFFF) as i64
}

/// This is called by the scrambler. NOTE: the scramble_drill MUST have the same drill/cid as the message_drill, otherwise
/// packets will not be rendered on the otherside
pub(crate) fn craft_wave_payload_packet_into(
coords: &PacketVector,
scramble_drill: &EntropyBank,
object_id: u64,
object_id: ObjectId,
target_cid: u64,
mut buffer: &mut BytesMut,
) {
Expand All @@ -377,7 +387,7 @@ pub(crate) mod group {
cmd_aux: packet_flags::cmd::aux::group::GROUP_PAYLOAD,
algorithm: 0,
security_level: 0, // Irrelevant; supplied by the wave header anyways
context_info: U128::new(object_id as _),
context_info: U128::new(pack_i64_to_u128(object_id)),
group: U64::new(coords.group_id),
wave_id: U32::new(coords.wave_id),
session_cid: U64::new(scramble_drill.get_cid()),
Expand All @@ -400,7 +410,7 @@ pub(crate) mod group {
#[allow(clippy::too_many_arguments)]
pub(crate) fn craft_wave_ack(
hyper_ratchet: &StackedRatchet,
object_id: u32,
object_id: ObjectId,
target_cid: u64,
group_id: u64,
wave_id: u32,
Expand All @@ -414,7 +424,7 @@ pub(crate) mod group {
cmd_aux: packet_flags::cmd::aux::group::WAVE_ACK,
algorithm: 0,
security_level: security_level.value(),
context_info: U128::new(object_id as _),
context_info: U128::new(pack_i64_to_u128(object_id)),
group: U64::new(group_id),
wave_id: U32::new(wave_id),
session_cid: U64::new(hyper_ratchet.get_cid()),
Expand Down Expand Up @@ -1598,7 +1608,7 @@ pub(crate) mod file {
use citadel_crypt::stacked_ratchet::StackedRatchet;
use citadel_types::crypto::SecurityLevel;
use citadel_types::prelude::TransferType;
use citadel_types::proto::VirtualObjectMetadata;
use citadel_types::proto::{ObjectId, VirtualObjectMetadata};
use citadel_user::serialization::SyncIO;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
Expand All @@ -1607,7 +1617,7 @@ pub(crate) mod file {
#[derive(Serialize, Deserialize, Debug)]
pub struct FileTransferErrorPacket {
pub error_message: String,
pub object_id: u64,
pub object_id: ObjectId,
}

pub(crate) fn craft_file_error_packet(
Expand All @@ -1617,7 +1627,7 @@ pub(crate) mod file {
virtual_target: VirtualTargetType,
timestamp: i64,
error_message: String,
object_id: u64,
object_id: ObjectId,
) -> BytesMut {
let header = HdpHeader {
protocol_version: (*crate::constants::PROTOCOL_VERSION).into(),
Expand Down Expand Up @@ -1704,15 +1714,15 @@ pub(crate) mod file {
pub struct FileHeaderAckPacket {
pub success: bool,
pub virtual_target: VirtualTargetType,
pub object_id: u64,
pub object_id: ObjectId,
pub transfer_type: TransferType,
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn craft_file_header_ack_packet(
hyper_ratchet: &StackedRatchet,
success: bool,
object_id: u64,
object_id: ObjectId,
target_cid: u64,
ticket: Ticket,
security_level: SecurityLevel,
Expand Down
Loading

0 comments on commit a3623be

Please sign in to comment.