diff --git a/async_ip/src/lib.rs b/async_ip/src/lib.rs index aae3a4ff8..9a27e78d0 100644 --- a/async_ip/src/lib.rs +++ b/async_ip/src/lib.rs @@ -121,17 +121,7 @@ pub async fn get_ip_from(client: Option, addr: &str) -> Result { pub update_in_progress: Arc, // if local is initiator, then in the case both nodes send a FastMessage at the same time (causing an update to the keys), the initiator takes preference, and the non-initiator's upgrade attempt gets dropped (if update_in_progress) pub local_is_initiator: bool, - pub rolling_object_id: u64, + pub rolling_object_id: ObjectId, pub rolling_group_id: u64, pub lock_set_by_alice: Option, /// Alice sends to Bob, then bob updates internally the toolset. However. Bob can't send packets to Alice quite yet using that newest version. He must first wait from Alice to commit on her end and wait for an ACK. @@ -200,9 +201,25 @@ impl PeerSessionCrypto { self.rolling_group_id.wrapping_sub(1) } - pub fn get_and_increment_object_id(&mut self) -> u64 { - self.rolling_object_id = self.rolling_object_id.wrapping_add(1); - self.rolling_object_id.wrapping_sub(1) + pub fn get_and_increment_object_id(&mut self) -> i64 { + let next = if self.local_is_initiator { + let mut next_val = self.rolling_object_id.wrapping_add(1); + if next_val <= 0 { + next_val = 1 + } + + next_val + } else { + let mut next_val = self.rolling_object_id.wrapping_sub(1); + if next_val >= 0 { + next_val = -1 + } + + next_val + }; + + self.rolling_object_id = next; + next } /// Returns a new constructor only if a concurrent update isn't occurring diff --git a/citadel_crypt/src/scramble/crypt_splitter.rs b/citadel_crypt/src/scramble/crypt_splitter.rs index df875328d..cc213ef0a 100644 --- a/citadel_crypt/src/scramble/crypt_splitter.rs +++ b/citadel_crypt/src/scramble/crypt_splitter.rs @@ -13,6 +13,7 @@ use crate::entropy_bank::EntropyBank; use crate::packet_vector::{generate_packet_vector, PacketVector}; use crate::prelude::CryptError; use crate::stacked_ratchet::Ratchet; +pub use citadel_types::prelude::ObjectId; #[cfg(not(target_family = "wasm"))] use rayon::prelude::*; @@ -62,7 +63,7 @@ pub fn generate_scrambler_metadata>( header_size_bytes: usize, security_level: SecurityLevel, group_id: u64, - object_id: u64, + object_id: ObjectId, enx: EncryptionAlgorithm, sig_alg: SigAlgorithm, transfer_type: &TransferType, @@ -141,7 +142,7 @@ fn get_scramble_encrypt_config<'a, R: Ratchet>( header_size_bytes: usize, security_level: SecurityLevel, group_id: u64, - object_id: u64, + object_id: ObjectId, transfer_type: &TransferType, empty_transfer: bool, ) -> Result< @@ -190,13 +191,13 @@ pub fn par_scramble_encrypt_group, R: Ratchet, F, const N: usize> static_aux_ratchet: &R, header_size_bytes: usize, target_cid: u64, - object_id: u64, + object_id: ObjectId, group_id: u64, transfer_type: TransferType, header_inscriber: F, ) -> Result, CryptError> where - F: Fn(&PacketVector, &EntropyBank, u64, u64, &mut BytesMut) + Send + Sync, + F: Fn(&PacketVector, &EntropyBank, ObjectId, u64, &mut BytesMut) + Send + Sync, { let mut plain_text = Cow::Borrowed(plain_text.as_ref()); @@ -303,9 +304,9 @@ fn scramble_encrypt_wave( msg_pqc: &PostQuantumContainer, scramble_drill: &EntropyBank, target_cid: u64, - object_id: u64, + object_id: ObjectId, header_size_bytes: usize, - header_inscriber: impl Fn(&PacketVector, &EntropyBank, u64, u64, &mut BytesMut) + Send + Sync, + header_inscriber: impl Fn(&PacketVector, &EntropyBank, ObjectId, u64, &mut BytesMut) + Send + Sync, ) -> Vec<(usize, PacketCoordinate)> { let ciphertext = msg_drill .encrypt(msg_pqc, bytes_to_encrypt_for_this_wave) @@ -336,7 +337,7 @@ pub fn oneshot_unencrypted_group_unified( plain_text: SecureMessagePacket, header_size_bytes: usize, group_id: u64, - object_id: u64, + object_id: ObjectId, empty_transfer: bool, ) -> Result, CryptError> { let len = plain_text.message_len() as u64; @@ -435,7 +436,7 @@ pub struct GroupReceiverConfig { // this is NOT inscribed; only for transmission pub header_size_bytes: u64, pub group_id: u64, - pub object_id: u64, + pub object_id: ObjectId, // only relevant for files. Note: if transfer type is RemoteVirtualFileystem, then, // the receiving endpoint won't decrypt the first level of encryption since the goal // is to keep the file remotely encrypted @@ -450,7 +451,7 @@ impl GroupReceiverConfig { #[allow(clippy::too_many_arguments)] pub fn new_refresh( group_id: u64, - object_id: u64, + object_id: ObjectId, header_size_bytes: u64, plaintext_length: u64, max_packet_payload_size: u32, diff --git a/citadel_crypt/src/streaming_crypt_scrambler.rs b/citadel_crypt/src/streaming_crypt_scrambler.rs index c466f2526..edad75ce1 100644 --- a/citadel_crypt/src/streaming_crypt_scrambler.rs +++ b/citadel_crypt/src/streaming_crypt_scrambler.rs @@ -15,6 +15,7 @@ use crate::stacked_ratchet::StackedRatchet; use citadel_io::Mutex; use citadel_io::{BlockingSpawn, BlockingSpawnError}; use citadel_types::crypto::SecurityLevel; +use citadel_types::prelude::ObjectId; use citadel_types::proto::TransferType; use futures::Future; use num_integer::Integer; @@ -41,11 +42,14 @@ impl FixedSizedSource for std::fs::File { /// Generic function for inscribing headers on packets pub trait HeaderInscriberFn: - for<'a> Fn(&'a PacketVector, &'a EntropyBank, u64, u64, &'a mut BytesMut) + Send + Sync + 'static + for<'a> Fn(&'a PacketVector, &'a EntropyBank, ObjectId, u64, &'a mut BytesMut) + + Send + + Sync + + 'static { } impl< - T: for<'a> Fn(&'a PacketVector, &'a EntropyBank, u64, u64, &'a mut BytesMut) + T: for<'a> Fn(&'a PacketVector, &'a EntropyBank, ObjectId, u64, &'a mut BytesMut) + Send + Sync + 'static, @@ -162,7 +166,7 @@ impl>> From for BytesSource { pub fn scramble_encrypt_source( mut source: S, max_group_size: Option, - object_id: u64, + object_id: ObjectId, group_sender: GroupChanneler, CryptError>>, stop: Receiver<()>, security_level: SecurityLevel, @@ -266,7 +270,7 @@ struct AsyncCryptScrambler { transfer_type: TransferType, file_len: usize, read_cursor: usize, - object_id: u64, + object_id: ObjectId, header_size_bytes: usize, target_cid: u64, group_id: u64, diff --git a/citadel_crypt/tests/primary.rs b/citadel_crypt/tests/primary.rs index 49d021c6a..b7fc0ab20 100644 --- a/citadel_crypt/tests/primary.rs +++ b/citadel_crypt/tests/primary.rs @@ -16,7 +16,7 @@ mod tests { AlgorithmsExt, CryptoParameters, EncryptionAlgorithm, KemAlgorithm, SecBuffer, SigAlgorithm, KEM_ALGORITHM_COUNT, }; - use citadel_types::proto::TransferType; + use citadel_types::proto::{ObjectId, TransferType}; use rstest::rstest; #[cfg(not(target_family = "wasm"))] use std::path::PathBuf; @@ -765,7 +765,13 @@ mod tests { } const HEADER_LEN: usize = 52; - fn header_inscribe(_: &PacketVector, _: &EntropyBank, _: u64, _: u64, packet: &mut BytesMut) { + fn header_inscribe( + _: &PacketVector, + _: &EntropyBank, + _: ObjectId, + _: u64, + packet: &mut BytesMut, + ) { for x in 0..HEADER_LEN { packet.put_u8((x % 255) as u8) } diff --git a/citadel_logging/src/lib.rs b/citadel_logging/src/lib.rs index eba10aa86..ca06b5485 100644 --- a/citadel_logging/src/lib.rs +++ b/citadel_logging/src/lib.rs @@ -7,7 +7,7 @@ use tracing_subscriber::EnvFilter; /// Sets up the logging for any crate pub fn setup_log() { std::panic::set_hook(Box::new(|info| { - error!(target: "citadel", "Panic occurred: {:#?}", info); + error!(target: "citadel", "Panic occurred: {}", info); std::process::exit(1); })); diff --git a/citadel_proto/src/proto/packet_crafter.rs b/citadel_proto/src/proto/packet_crafter.rs index 9fac62a9b..ce623ca3f 100644 --- a/citadel_proto/src/proto/packet_crafter.rs +++ b/citadel_proto/src/proto/packet_crafter.rs @@ -12,6 +12,7 @@ use crate::proto::state_container::VirtualTargetType; use citadel_crypt::scramble::crypt_splitter::oneshot_unencrypted_group_unified; use citadel_crypt::secure_buffer::sec_packet::SecureMessagePacket; use citadel_crypt::stacked_ratchet::{Ratchet, StackedRatchet}; +use citadel_types::prelude::ObjectId; #[derive(Debug)] /// A thin wrapper used for convenient creation of zero-copy outgoing buffers @@ -63,7 +64,7 @@ pub struct GroupTransmitter { /// Contained within Self::group_transmitter, but is here for convenience group_config: GroupReceiverConfig, /// The ID of the object that is being transmitted - pub object_id: u64, + pub object_id: ObjectId, pub group_id: u64, /// For interfacing with the higher-level kernel ticket: Ticket, @@ -98,7 +99,7 @@ impl GroupTransmitter { to_primary_stream: OutboundPrimaryStreamSender, group_sender: GroupSenderDevice, hyper_ratchet: RatchetPacketCrafterContainer, - object_id: u64, + object_id: ObjectId, ticket: Ticket, security_level: SecurityLevel, time_tracker: TimeTracker, @@ -126,7 +127,7 @@ impl GroupTransmitter { #[allow(clippy::too_many_arguments)] pub fn new_message( to_primary_stream: OutboundPrimaryStreamSender, - object_id: u64, + object_id: ObjectId, hyper_ratchet: RatchetPacketCrafterContainer, input_packet: SecureProtocolPacket, security_level: SecurityLevel, @@ -241,6 +242,7 @@ pub(crate) mod group { use crate::proto::validation::group::{GroupHeader, GroupHeaderAck, WaveAck}; use citadel_crypt::endpoint_crypto_container::KemTransferStatus; use citadel_crypt::stacked_ratchet::StackedRatchet; + use citadel_types::proto::ObjectId; use citadel_user::serialization::SyncIO; use std::ops::RangeInclusive; @@ -296,7 +298,7 @@ pub(crate) mod group { packet }; - packet.put_u64(processor.object_id); + packet.put_i64(processor.object_id); processor .hyper_ratchet_container @@ -320,7 +322,7 @@ pub(crate) mod group { hyper_ratchet: &StackedRatchet, group_id: u64, target_cid: u64, - object_id: u64, + object_id: ObjectId, ticket: Ticket, initial_wave_window: Option>, fast_msg: bool, @@ -362,12 +364,20 @@ pub(crate) mod group { packet } + pub(crate) fn pack_i64_to_u128(value: i64) -> u128 { + value as u128 & 0xFFFFFFFFFFFFFFFF + } + + pub(crate) fn unpack_i64_from_u128(packed: u128) -> i64 { + (packed & 0xFFFFFFFFFFFFFFFF) as i64 + } + /// This is called by the scrambler. NOTE: the scramble_drill MUST have the same drill/cid as the message_drill, otherwise /// packets will not be rendered on the otherside pub(crate) fn craft_wave_payload_packet_into( coords: &PacketVector, scramble_drill: &EntropyBank, - object_id: u64, + object_id: ObjectId, target_cid: u64, mut buffer: &mut BytesMut, ) { @@ -377,7 +387,7 @@ pub(crate) mod group { cmd_aux: packet_flags::cmd::aux::group::GROUP_PAYLOAD, algorithm: 0, security_level: 0, // Irrelevant; supplied by the wave header anyways - context_info: U128::new(object_id as _), + context_info: U128::new(pack_i64_to_u128(object_id)), group: U64::new(coords.group_id), wave_id: U32::new(coords.wave_id), session_cid: U64::new(scramble_drill.get_cid()), @@ -400,7 +410,7 @@ pub(crate) mod group { #[allow(clippy::too_many_arguments)] pub(crate) fn craft_wave_ack( hyper_ratchet: &StackedRatchet, - object_id: u32, + object_id: ObjectId, target_cid: u64, group_id: u64, wave_id: u32, @@ -414,7 +424,7 @@ pub(crate) mod group { cmd_aux: packet_flags::cmd::aux::group::WAVE_ACK, algorithm: 0, security_level: security_level.value(), - context_info: U128::new(object_id as _), + context_info: U128::new(pack_i64_to_u128(object_id)), group: U64::new(group_id), wave_id: U32::new(wave_id), session_cid: U64::new(hyper_ratchet.get_cid()), @@ -1598,7 +1608,7 @@ pub(crate) mod file { use citadel_crypt::stacked_ratchet::StackedRatchet; use citadel_types::crypto::SecurityLevel; use citadel_types::prelude::TransferType; - use citadel_types::proto::VirtualObjectMetadata; + use citadel_types::proto::{ObjectId, VirtualObjectMetadata}; use citadel_user::serialization::SyncIO; use serde::{Deserialize, Serialize}; use std::path::PathBuf; @@ -1607,7 +1617,7 @@ pub(crate) mod file { #[derive(Serialize, Deserialize, Debug)] pub struct FileTransferErrorPacket { pub error_message: String, - pub object_id: u64, + pub object_id: ObjectId, } pub(crate) fn craft_file_error_packet( @@ -1617,7 +1627,7 @@ pub(crate) mod file { virtual_target: VirtualTargetType, timestamp: i64, error_message: String, - object_id: u64, + object_id: ObjectId, ) -> BytesMut { let header = HdpHeader { protocol_version: (*crate::constants::PROTOCOL_VERSION).into(), @@ -1704,7 +1714,7 @@ pub(crate) mod file { pub struct FileHeaderAckPacket { pub success: bool, pub virtual_target: VirtualTargetType, - pub object_id: u64, + pub object_id: ObjectId, pub transfer_type: TransferType, } @@ -1712,7 +1722,7 @@ pub(crate) mod file { pub(crate) fn craft_file_header_ack_packet( hyper_ratchet: &StackedRatchet, success: bool, - object_id: u64, + object_id: ObjectId, target_cid: u64, ticket: Ticket, security_level: SecurityLevel, diff --git a/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs b/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs index 0d6766284..e7124893b 100644 --- a/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs +++ b/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs @@ -29,7 +29,7 @@ use crate::proto::peer::hole_punch_compat_sink_stream::ReliableOrderedCompatStre use crate::proto::peer::p2p_conn_handler::attempt_simultaneous_hole_punch; use crate::proto::peer::peer_crypt::{KeyExchangeProcess, PeerNatInfo}; use crate::proto::peer::peer_layer::{ - NodeConnectionType, PeerConnectionType, PeerResponse, PeerSignal, + HyperNodePeerLayerInner, NodeConnectionType, PeerConnectionType, PeerResponse, PeerSignal, }; use crate::proto::remote::Ticket; use crate::proto::session_manager::HdpSessionManager; @@ -833,6 +833,8 @@ pub async fn process_peer_cmd( _ => {} } + log::trace!(target: "citadel", "Forwarding signal {signal:?} to kernel"); + session .kernel_tx .unbounded_send(NodeResult::PeerEvent(PeerEvent { @@ -1005,7 +1007,7 @@ async fn process_signal_command_as_server( if let Some(ticket_new) = peer_layer.check_simultaneous_register(implicated_cid, target_cid) { - log::trace!(target: "citadel", "Simultaneous register detected! Simulating implicated_cid={} sent an accept_register to target={}", implicated_cid, target_cid); + log::info!(target: "citadel", "Simultaneous register detected! Simulating implicated_cid={} sent an accept_register to target={}", implicated_cid, target_cid); peer_layer.insert_mapped_ticket(implicated_cid, ticket_new, ticket); // route signal to peer drop(peer_layer); @@ -1055,7 +1057,6 @@ async fn process_signal_command_as_server( let to_primary_stream = return_if_none!(session.to_primary_stream.clone()); let sess_mgr = session.session_manager.clone(); - drop(peer_layer); route_signal_and_register_ticket_forwards( PeerSignal::PostRegister { peer_conn_type, @@ -1073,6 +1074,7 @@ async fn process_signal_command_as_server( &sess_mgr, &sess_hyper_ratchet, security_level, + &mut *peer_layer, ) .await } @@ -1253,7 +1255,6 @@ async fn process_signal_command_as_server( .await?; Ok(PrimaryProcessorResult::Void) } else { - drop(peer_layer); route_signal_and_register_ticket_forwards( PeerSignal::PostConnect { peer_conn_type, @@ -1272,6 +1273,7 @@ async fn process_signal_command_as_server( &sess_mgr, &sess_hyper_ratchet, security_level, + &mut *peer_layer, ) .await } @@ -1663,12 +1665,13 @@ pub(crate) async fn route_signal_and_register_ticket_forwards( sess_mgr: &HdpSessionManager, sess_hyper_ratchet: &StackedRatchet, security_level: SecurityLevel, + peer_layer: &mut HyperNodePeerLayerInner, ) -> Result { let sess_hyper_ratchet_2 = sess_hyper_ratchet.clone(); let to_primary_stream = to_primary_stream.clone(); // Give the target_cid 10 seconds to respond - let res = sess_mgr.route_signal_primary(implicated_cid, target_cid, ticket, signal.clone(), move |peer_hyper_ratchet| { + let res = sess_mgr.route_signal_primary(peer_layer, implicated_cid, target_cid, ticket, signal.clone(), move |peer_hyper_ratchet| { packet_crafter::peer_cmd::craft_peer_signal(peer_hyper_ratchet, signal.clone(), ticket, timestamp, security_level) }, timeout, move |stale_signal| { // on timeout, run this diff --git a/citadel_proto/src/proto/packet_processor/primary_group_packet.rs b/citadel_proto/src/proto/packet_processor/primary_group_packet.rs index ba8bd1a17..83a4e9c25 100644 --- a/citadel_proto/src/proto/packet_processor/primary_group_packet.rs +++ b/citadel_proto/src/proto/packet_processor/primary_group_packet.rs @@ -227,7 +227,7 @@ pub fn process_primary_packet( if group.has_begun { if group.receiver.has_expired(GROUP_EXPIRE_TIME_MS) { if state_container.meta_expiry_state.expired() { - log::error!(target: "citadel", "Inbound group {} has expired; removing for {}.", group_id, peer_cid); + log::warn!(target: "citadel", "Inbound group {} has expired; removing for {}.", group_id, peer_cid); if let Some(group) = state_container.inbound_groups.remove(&key) { if group.object_id != 0 { // belongs to a file. Delete file; stop transmission diff --git a/citadel_proto/src/proto/peer/peer_layer.rs b/citadel_proto/src/proto/peer/peer_layer.rs index e9f4dbe20..308ee2dc6 100644 --- a/citadel_proto/src/proto/peer/peer_layer.rs +++ b/citadel_proto/src/proto/peer/peer_layer.rs @@ -466,7 +466,7 @@ impl HyperNodePeerLayerInner { ) -> Option { let this = self.inner.read(); let peer_map = this.observed_postings.get(&peer_cid)?; - log::trace!(target: "citadel", "[simultaneous checking] peer_map len: {}", peer_map.len()); + log::trace!(target: "citadel", "[simultaneous checking] peer_map len: {} | {:?}", peer_map.len(), peer_map.values().map(|r| &r.signal).collect::>()); peer_map .iter() .find(|(_, posting)| (fx)(posting)) diff --git a/citadel_proto/src/proto/session_manager.rs b/citadel_proto/src/proto/session_manager.rs index faea43786..847b86a69 100644 --- a/citadel_proto/src/proto/session_manager.rs +++ b/citadel_proto/src/proto/session_manager.rs @@ -31,7 +31,8 @@ use crate::proto::packet_processor::includes::{Duration, Instant}; use crate::proto::packet_processor::peer::group_broadcast::GroupBroadcast; use crate::proto::packet_processor::PrimaryProcessorResult; use crate::proto::peer::peer_layer::{ - HyperNodePeerLayer, MailboxTransfer, PeerConnectionType, PeerResponse, PeerSignal, + HyperNodePeerLayer, HyperNodePeerLayerInner, MailboxTransfer, PeerConnectionType, PeerResponse, + PeerSignal, }; use crate::proto::remote::{NodeRemote, Ticket}; use crate::proto::session::{ @@ -1079,6 +1080,7 @@ impl HdpSessionManager { #[allow(clippy::too_many_arguments)] pub async fn route_signal_primary( &self, + peer_layer: &mut HyperNodePeerLayerInner, implicated_cid: u64, target_cid: u64, ticket: Ticket, @@ -1107,11 +1109,7 @@ impl HdpSessionManager { // get the target cid's session if let Some(ref sess_ref) = sess { - sess_ref - .hypernode_peer_layer - .inner - .write() - .await + peer_layer .insert_tracked_posting(implicated_cid, timeout, ticket, signal, on_timeout) .await; let peer_sender = sess_ref.to_primary_stream.as_ref().unwrap(); @@ -1126,11 +1124,7 @@ impl HdpSessionManager { // session is not active, but user is registered (thus offline). Setup return ticket tracker on implicated_cid // and deliver to the mailbox of target_cid, that way target_cid receives mail on connect. TODO: external svc route, if available { - let peer_layer = { inner!(self).hypernode_peer_layer.clone() }; peer_layer - .inner - .write() - .await .insert_tracked_posting( implicated_cid, timeout, diff --git a/citadel_proto/src/proto/state_container.rs b/citadel_proto/src/proto/state_container.rs index a1bcc388f..9d8635f22 100644 --- a/citadel_proto/src/proto/state_container.rs +++ b/citadel_proto/src/proto/state_container.rs @@ -32,6 +32,7 @@ use crate::proto::node_result::{NodeResult, ObjectTransferHandle}; use crate::proto::outbound_sender::{OutboundPrimaryStreamSender, OutboundUdpSender}; use crate::proto::packet::packet_flags; use crate::proto::packet::HdpHeader; +use crate::proto::packet_crafter::group::unpack_i64_from_u128; use crate::proto::packet_crafter::peer_cmd::C2S_ENCRYPTION_ONLY; use crate::proto::packet_crafter::{ GroupTransmitter, RatchetPacketCrafterContainer, SecureProtocolPacket, @@ -62,6 +63,7 @@ use citadel_crypt::stacked_ratchet::{Ratchet, StackedRatchet}; use citadel_types::crypto::SecBuffer; use citadel_types::crypto::SecrecyMode; use citadel_types::crypto::SecurityLevel; +use citadel_types::prelude::ObjectId; use citadel_types::proto::{ MessageGroupKey, ObjectTransferOrientation, ObjectTransferStatus, SessionSecuritySettings, TransferType, UdpMode, VirtualObjectMetadata, @@ -136,21 +138,22 @@ pub struct StateContainerInner { pub(crate) struct GroupKey { target_cid: u64, group_id: u64, - object_id: u64, + object_id: ObjectId, } #[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] pub struct FileKey { + // Where the information will be streamed pub target_cid: u64, // wave payload get the object id inscribed - pub object_id: u64, + pub object_id: ObjectId, } /// when the GROUP_HEADER comes inbound with virtual file metadata, this should be created alongside /// an async task fired-up on the threadpool #[allow(dead_code)] pub(crate) struct InboundFileTransfer { - pub object_id: u64, + pub object_id: ObjectId, pub total_groups: usize, pub groups_rendered: usize, pub last_group_window_len: usize, @@ -176,7 +179,7 @@ pub(crate) struct OutboundFileTransfer { } impl GroupKey { - pub fn new(target_cid: u64, group_id: u64, object_id: u64) -> Self { + pub fn new(target_cid: u64, group_id: u64, object_id: ObjectId) -> Self { Self { target_cid, group_id, @@ -186,7 +189,7 @@ impl GroupKey { } impl FileKey { - pub fn new(target_cid: u64, object_id: u64) -> Self { + pub fn new(target_cid: u64, object_id: ObjectId) -> Self { Self { target_cid, object_id, @@ -510,12 +513,12 @@ pub(crate) struct GroupReceiverContainer { max_window_size: usize, window_drift: isize, waves_in_window_finished: usize, - pub object_id: u64, + pub object_id: ObjectId, } impl GroupReceiverContainer { pub fn new( - object_id: u64, + object_id: ObjectId, receiver: GroupReceiver, virtual_target: VirtualTargetType, security_level: SecurityLevel, @@ -1104,9 +1107,10 @@ impl StateContainerInner { let ticket = header.context_info.get().into(); let is_revfs_pull = local_encryption_level.is_some(); + log::info!(target: "citadel", "File header received for cid {}, file key: {key:?} | revfs_pull: {is_revfs_pull}", hyper_ratchet.get_cid()); + if let std::collections::hash_map::Entry::Vacant(e) = self.inbound_files.entry(key) { let (stream_to_hd, stream_to_hd_rx) = unbounded::>(); - let (start_recv_tx, start_recv_rx) = tokio::sync::oneshot::channel::(); let security_level_rebound: SecurityLevel = header.security_level.into(); let timestamp = self.time_tracker.get_global_time_ns(); @@ -1129,39 +1133,45 @@ impl StateContainerInner { local_encryption_level, }; + let (start_recv_tx, start_recv_rx) = if !is_revfs_pull { + let (tx, rx) = tokio::sync::oneshot::channel(); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + e.insert(entry); let (handle, tx_status) = ObjectTransferHandler::new( header.session_cid.get(), header.target_cid.get(), metadata.clone(), ObjectTransferOrientation::Receiver { is_revfs_pull }, - Some(start_recv_tx), - ); - self.file_transfer_handles.insert( - key, - crate::proto::outbound_sender::UnboundedSender(tx_status.clone()), + start_recv_tx, ); + self.file_transfer_handles + .insert(key, UnboundedSender(tx_status.clone())); // finally, alert the kernel (receiver) if let Err(err) = self .kernel_tx .unbounded_send(NodeResult::ObjectTransferHandle(ObjectTransferHandle { ticket, handle, - implicated_cid: hyper_ratchet.get_cid(), + implicated_cid: header.target_cid.get(), })) { log::error!(target: "citadel", "Failed to send the ObjectTransferHandle to the kernel: {err:?}"); } let task = async move { - let res = if is_revfs_pull { - // auto-accept for revfs pull requests - log::trace!(target: "citadel", "Auto-accepting for REVFS pull request"); - Ok(true) + log::info!(target: "citadel", "File transfer initiated, awaiting acceptance ... | revfs_pull: {is_revfs_pull}"); + let res = if let Some(start_rx) = start_recv_rx { + start_rx.await } else { - start_recv_rx.await + Ok(true) }; + log::info!(target: "citadel", "File transfer initiated! | revfs_pull: {is_revfs_pull}"); + let accepted = res.as_ref().map(|r| *r).unwrap_or(false); // first, send a rebound signal immediately to the sender // to ensure the sender knows if the user accepted or not @@ -1196,13 +1206,13 @@ impl StateContainerInner { .await { Ok(()) => { - log::info!(target: "citadel", "Successfully synced file to backend | {is_revfs_pull}"); + log::info!(target: "citadel", "Successfully synced file to backend | revfs_pull: {is_revfs_pull}"); let status = match success_receiving_rx.await { Ok(header) => { // write the header let wave_ack = packet_crafter::group::craft_wave_ack( &hyper_ratchet, - header.context_info.get() as u32, + unpack_i64_from_u128(header.context_info.get()), get_resp_target_cid_from_header(&header), header.group.get(), header.wave_id.get(), @@ -1235,6 +1245,7 @@ impl StateContainerInner { } } else { // user did not accept. cleanup local + log::warn!(target: "citadel", "User did not accept file transfer"); let mut state_container = inner_mut_state!(state_container); let _ = state_container.inbound_files.remove(&key); let _ = state_container.file_transfer_handles.remove(&key); @@ -1272,7 +1283,7 @@ impl StateContainerInner { success: bool, implicated_cid: u64, ticket: Ticket, - object_id: u64, + object_id: ObjectId, v_target: VirtualTargetType, _transfer_type: TransferType, ) -> Option<()> { @@ -1362,7 +1373,7 @@ impl StateContainerInner { peer_cid: u64, target_cid: u64, group_id: u64, - object_id: u64, + object_id: ObjectId, next_window: Option>, transfer: KemTransferStatus, fast_msg: bool, @@ -1412,7 +1423,7 @@ impl StateContainerInner { &mut self, header: &HdpHeader, error_message: T, - object_id: u64, + object_id: ObjectId, ) -> Result<(), NetworkError> { let target_cid = header.session_cid.get(); self.notify_object_transfer_handle_failure_with(target_cid, object_id, error_message) @@ -1421,7 +1432,7 @@ impl StateContainerInner { pub fn notify_object_transfer_handle_failure_with>( &mut self, target_cid: u64, - object_id: u64, + object_id: ObjectId, error_message: T, ) -> Result<(), NetworkError> { // let group_key = GroupKey::new(target_cid, group_id, object_id); @@ -1445,10 +1456,10 @@ impl StateContainerInner { header: &HdpHeader, payload: Bytes, hr: &StackedRatchet, - ) -> Result { + ) -> Result { let target_cid = header.session_cid.get(); let group_id = header.group.get(); - let object_id = header.context_info.get() as u64; + let object_id = unpack_i64_from_u128(header.context_info.get()); let group_key = GroupKey::new(target_cid, group_id, object_id); let grc = self.inbound_groups.get_mut(&group_key).ok_or_else(|| { ( @@ -1521,7 +1532,7 @@ impl StateContainerInner { GroupReceiverStatus::GROUP_COMPLETE(_last_wid) => { let receiver = self.inbound_groups.remove(&group_key).unwrap().receiver; let mut chunk = receiver.finalize(); - log::info!(target: "citadel", "GROUP {} COMPLETE. Total groups: {} | Plaintext len: {} | Received plaintext len: {}", group_id, file_container.total_groups, file_container.metadata.plaintext_length, chunk.len()); + log::trace!(target: "citadel", "GROUP {} COMPLETE. Total groups: {} | Plaintext len: {} | Received plaintext len: {}", group_id, file_container.total_groups, file_container.metadata.plaintext_length, chunk.len()); if let Some(local_encryption_level) = file_container.local_encryption_level { // which static hr do we need? Since we are receiving this chunk, always our local account's @@ -1589,7 +1600,7 @@ impl StateContainerInner { } if complete { - log::trace!(target: "citadel", "Finished receiving file {:?}", file_key); + log::info!(target: "citadel", "Finished receiving file {:?}", file_key); let _ = self.inbound_files.remove(&file_key); let _ = self.file_transfer_handles.remove(&file_key); } @@ -1599,7 +1610,7 @@ impl StateContainerInner { if !complete { let wave_ack = packet_crafter::group::craft_wave_ack( hr, - header.context_info.get() as u32, + unpack_i64_from_u128(header.context_info.get()), get_resp_target_cid_from_header(header), header.group.get(), header.wave_id.get(), @@ -1622,10 +1633,10 @@ impl StateContainerInner { #[allow(unused_results)] pub fn on_wave_ack_received( &mut self, - _implicated_cid: u64, + implicated_cid: u64, header: &Ref<&[u8], HdpHeader>, ) -> bool { - let object_id = header.context_info.get() as u64; + let object_id = unpack_i64_from_u128(header.context_info.get()); let group = header.group.get(); let wave_id = header.wave_id.get(); let target_cid = header.session_cid.get(); @@ -1672,6 +1683,7 @@ impl StateContainerInner { ObjectTransferStatus::TransferComplete }; + log::info!(target: "citadel", "Transmitter {file_key:?} received final wave ack. Sending status to local node: {:?}", status); if let Err(err) = tx.unbounded_send(status.clone()) { // if the server is using an accept-only policy with no further responses, this branch // will be reached @@ -1682,11 +1694,11 @@ impl StateContainerInner { if matches!(status, ObjectTransferStatus::TransferComplete) { // remove the transmitter. Dropping will stop related futures - log::trace!(target: "citadel", "FileTransfer is complete!"); + log::info!(target: "citadel", "FileTransfer is complete!"); let _ = self.file_transfer_handles.remove(&file_key); } } else { - log::error!(target: "citadel", "Unable to find ObjectTransferHandle for {:?}", file_key); + log::error!(target: "citadel", "Unable to find ObjectTransferHandle for {:?} | Local is {implicated_cid} | FileKeys available: {:?}", file_key, self.file_transfer_handles.keys().copied().collect::>()); } delete_group = true; @@ -1859,7 +1871,7 @@ impl StateContainerInner { } // object singleton == 0 implies that the data does not belong to a file - const OBJECT_SINGLETON: u64 = 0; + const OBJECT_SINGLETON: ObjectId = 0; // Drop this to ensure that it doesn't block other async closures from accessing the inner device // std::mem::drop(this); let (mut transmitter, group_id, target_cid) = match virtual_target { diff --git a/citadel_proto/src/proto/validation.rs b/citadel_proto/src/proto/validation.rs index 83b4999fc..99ec24b10 100644 --- a/citadel_proto/src/proto/validation.rs +++ b/citadel_proto/src/proto/validation.rs @@ -44,6 +44,7 @@ pub(crate) mod group { use citadel_crypt::stacked_ratchet::StackedRatchet; use citadel_types::crypto::SecBuffer; use citadel_types::crypto::SecurityLevel; + use citadel_types::proto::ObjectId; use citadel_user::serialization::SyncIO; use serde::{Deserialize, Serialize}; @@ -84,13 +85,13 @@ pub(crate) mod group { pub(crate) fn validate_message( payload_orig: &mut BytesMut, - ) -> Option<(SecBuffer, Option, u64)> { + ) -> Option<(SecBuffer, Option, ObjectId)> { // Safely check that there are 8 bytes in length, then, split at the end - 8 if payload_orig.len() < 8 { return None; } let mut payload = payload_orig.split_to(payload_orig.len() - 8); - let object_id = payload_orig.reader().read_u64::().ok()?; + let object_id = payload_orig.reader().read_i64::().ok()?; let message = SecureProtocolPacket::extract_message(&mut payload).ok()?; let deser = SyncIO::deserialize_from_vector(&payload[..]).ok()?; Some((message.into(), deser, object_id)) @@ -103,11 +104,11 @@ pub(crate) mod group { fast_msg: bool, initial_window: Option>, transfer: KemTransferStatus, - object_id: u64, + object_id: ObjectId, }, NotReady { fast_msg: bool, - object_id: u64, + object_id: ObjectId, }, } diff --git a/citadel_sdk/src/fs.rs b/citadel_sdk/src/fs.rs index 6797fb4e0..72353de68 100644 --- a/citadel_sdk/src/fs.rs +++ b/citadel_sdk/src/fs.rs @@ -320,7 +320,7 @@ mod tests { #[rstest] #[case(SecrecyMode::BestEffort)] - #[timeout(std::time::Duration::from_secs(240))] + #[timeout(Duration::from_secs(240))] #[tokio::test(flavor = "multi_thread")] async fn test_p2p_file_transfer_revfs( #[case] secrecy_mode: SecrecyMode, @@ -360,12 +360,11 @@ mod tests { let mut connection = connection.recv().await.unwrap()?; wait_for_peers().await; // The other peer will send the file first - log::trace!(target: "citadel", "***CLIENT LOGIN SUCCESS :: File transfer next ***"); + log::info!(target: "citadel", "***CLIENT A LOGIN SUCCESS :: File transfer next ***"); + let remote = connection.remote.clone(); let handle_orig = connection.incoming_object_transfer_handles.take().unwrap(); accept_all(handle_orig); - wait_for_peers().await; - /* let virtual_path = PathBuf::from("/home/john.doe/TheBridge.pdf"); // write the file to the RE-VFS crate::fs::write_with_security_level( @@ -375,17 +374,17 @@ mod tests { &virtual_path, ) .await?; - log::error!(target: "citadel", "X01"); - log::trace!(target: "citadel", "***CLIENT FILE TRANSFER SUCCESS***"); + log::info!(target: "citadel", "***CLIENT A FILE TRANSFER SUCCESS***"); + //wait_for_peers().await; // now, pull it let save_dir = crate::fs::read(&remote, virtual_path).await?; // now, compare bytes - log::trace!(target: "citadel", "***CLIENT REVFS PULL SUCCESS"); + log::info!(target: "citadel", "***CLIENT A REVFS PULL SUCCESS"); let original_bytes = tokio::fs::read(&source_dir).await.unwrap(); let revfs_pulled_bytes = tokio::fs::read(&save_dir).await.unwrap(); assert_eq!(original_bytes, revfs_pulled_bytes); - log::trace!(target: "citadel", "***CLIENT REVFS PULL COMPARE SUCCESS"); - wait_for_peers().await;*/ + log::info!(target: "citadel", "***CLIENT A REVFS PULL COMPARE SUCCESS"); + wait_for_peers().await; client0_success.store(true, Ordering::Relaxed); remote_outer.shutdown_kernel().await }, @@ -401,11 +400,13 @@ mod tests { None, move |mut connection, remote_outer| async move { wait_for_peers().await; - let connection = connection.recv().await.unwrap()?; + let mut connection = connection.recv().await.unwrap()?; wait_for_peers().await; - let remote = connection.remote.clone(); - log::trace!(target: "citadel", "***CLIENT LOGIN SUCCESS :: File transfer next ***"); - let virtual_path = PathBuf::from("/home/john.doe/TheBridge.pdf"); + let _remote = connection.remote.clone(); + let handle_orig = connection.incoming_object_transfer_handles.take().unwrap(); + accept_all(handle_orig); + log::info!(target: "citadel", "***CLIENT B LOGIN SUCCESS :: File transfer next ***"); + /*let virtual_path = PathBuf::from("/home/john.doe/TheBridge.pdf"); // write the file to the RE-VFS crate::fs::write_with_security_level( &remote, @@ -414,22 +415,19 @@ mod tests { &virtual_path, ) .await?; - log::trace!(target: "citadel", "***CLIENT FILE TRANSFER SUCCESS***"); + log::info!(target: "citadel", "***CLIENT B FILE TRANSFER SUCCESS***"); + wait_for_peers().await; // now, pull it let save_dir = crate::fs::read(&remote, virtual_path).await?; // now, compare bytes - log::trace!(target: "citadel", "***CLIENT REVFS PULL SUCCESS"); + log::info!(target: "citadel", "***CLIENT B REVFS PULL SUCCESS"); let original_bytes = tokio::fs::read(&source_dir).await.unwrap(); let revfs_pulled_bytes = tokio::fs::read(&save_dir).await.unwrap(); assert_eq!(original_bytes, revfs_pulled_bytes); - log::trace!(target: "citadel", "***CLIENT REVFS PULL COMPARE SUCCESS"); - wait_for_peers().await; - /* - // Now, accept the peer's incoming handle - let handle_orig = connection.incoming_object_transfer_handles.take().unwrap(); - accept_all(handle_orig); + log::info!(target: "citadel", "***CLIENT B REVFS PULL COMPARE SUCCESS"); - wait_for_peers().await;*/ + log::info!(target: "citadel", "***CLIENT B WAITING FOR ADJACENT PEER TO FINISH STREAMING");*/ + wait_for_peers().await; client1_success.store(true, Ordering::Relaxed); remote_outer.shutdown_kernel().await }, @@ -458,10 +456,13 @@ mod tests { fn accept_all(mut rx: FileTransferHandleRx) { let handle = tokio::task::spawn(async move { while let Some(mut handle) = rx.recv().await { - let _ = handle.accept(); + if let Err(err) = handle.accept() { + log::error!(target: "citadel", "Failed to accept file transfer: {err:?}"); + } // Exhaust the stream let handle = tokio::task::spawn(async move { while let Some(evt) = handle.next().await { + log::info!(target: "citadel", "File Transfer Event: {evt:?}"); if let ObjectTransferStatus::Fail(err) = evt { log::error!(target: "citadel", "File Transfer Failed: {err:?}"); std::process::exit(1); diff --git a/citadel_sdk/src/prefabs/client/peer_connection.rs b/citadel_sdk/src/prefabs/client/peer_connection.rs index f55c05148..34480cfd3 100644 --- a/citadel_sdk/src/prefabs/client/peer_connection.rs +++ b/citadel_sdk/src/prefabs/client/peer_connection.rs @@ -364,7 +364,7 @@ where } let _reg_success = handle.register_to_peer().await?; - log::trace!(target: "citadel", "Peer {:?} registered || success -> now connecting", id); + log::info!(target: "citadel", "Peer {:?} registered || success -> now connecting", id); handle }; diff --git a/citadel_sdk/src/prefabs/client/single_connection.rs b/citadel_sdk/src/prefabs/client/single_connection.rs index 0c5b27074..ed46d80a6 100644 --- a/citadel_sdk/src/prefabs/client/single_connection.rs +++ b/citadel_sdk/src/prefabs/client/single_connection.rs @@ -280,7 +280,7 @@ where async fn on_node_event_received(&self, message: NodeResult) -> Result<(), NetworkError> { if let Some(val) = self.unprocessed_signal_filter_tx.lock().as_ref() { - log::info!(target: "citadel", "Will forward message {:?}", val); + log::trace!(target: "citadel", "Will forward message {:?}", val); if let Err(err) = val.send(message) { log::warn!(target: "citadel", "failed to send unprocessed NodeResult: {:?}", err) } diff --git a/citadel_sdk/src/remote_ext.rs b/citadel_sdk/src/remote_ext.rs index 0703b74a2..1b8c3a1e4 100644 --- a/citadel_sdk/src/remote_ext.rs +++ b/citadel_sdk/src/remote_ext.rs @@ -651,11 +651,12 @@ pub trait ProtocolRemoteTargetExt: TargetLockedRemote { let mut stream = self.remote().send_callback_subscription(request).await?; while let Some(event) = stream.next().await { + log::info!(target: "citadel", "REVFS NODE RESULT for {} {:?}", self.user().get_implicated_cid(), event); match map_errors(event)? { NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => { let mut local_path = None; while let Some(res) = handle.next().await { - log::trace!(target: "citadel", "REVFS PULL EVENT {:?}", res); + log::info!(target: "citadel", "REVFS PULL EVENT {:?}", res); match res { ObjectTransferStatus::ReceptionBeginning(path, _) => { local_path = Some(path) diff --git a/citadel_types/src/proto/mod.rs b/citadel_types/src/proto/mod.rs index 3e5fe6474..61132d56e 100644 --- a/citadel_types/src/proto/mod.rs +++ b/citadel_types/src/proto/mod.rs @@ -26,11 +26,13 @@ pub struct VirtualObjectMetadata { pub author: String, pub plaintext_length: usize, pub group_count: usize, - pub object_id: u64, + pub object_id: i64, pub cid: u64, pub transfer_type: TransferType, } +pub type ObjectId = i64; + impl VirtualObjectMetadata { pub fn serialize(&self) -> Vec { bincode2::serialize(self).unwrap() diff --git a/citadel_user/src/backend/utils/mod.rs b/citadel_user/src/backend/utils/mod.rs index 222b06f27..91e9dbce7 100644 --- a/citadel_user/src/backend/utils/mod.rs +++ b/citadel_user/src/backend/utils/mod.rs @@ -113,25 +113,11 @@ impl ObjectTransferHandler { } fn respond(&mut self, accept: bool) -> Result<(), AccountError> { - if matches!( - self.orientation, - ObjectTransferOrientation::Receiver { - is_revfs_pull: true - } - ) { - let _ = self.start_recv_tx.take(); - return Ok(()); + if let Some(tx) = self.start_recv_tx.take() { + tx.send(accept) + .map_err(|_| AccountError::msg("Failed to send response"))?; } - if matches!(self.orientation, ObjectTransferOrientation::Receiver { .. }) { - self.start_recv_tx - .take() - .ok_or_else(|| AccountError::msg("Start_recv_tx already called"))? - .send(accept) - .map_err(|err| AccountError::msg(err.to_string())) - } else { - let _ = self.start_recv_tx.take(); - Ok(()) - } + Ok(()) } } diff --git a/citadel_wire/src/standard/nat_identification.rs b/citadel_wire/src/standard/nat_identification.rs index 6adfff0f8..f5e452fab 100644 --- a/citadel_wire/src/standard/nat_identification.rs +++ b/citadel_wire/src/standard/nat_identification.rs @@ -4,13 +4,12 @@ use crate::error::FirewallError; use crate::socket_helpers::is_ipv6_enabled; use async_ip::IpAddressInfo; use futures::stream::FuturesUnordered; -use futures::{Future, StreamExt}; +use futures::StreamExt; use itertools::Itertools; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::net::{IpAddr, SocketAddr}; use std::ops::Sub; -use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -97,6 +96,12 @@ impl NatType { tracing::instrument(level = "trace", target = "citadel", skip_all, ret, err(Debug)) )] pub async fn identify(stun_servers: Option>) -> Result { + if cfg!(feature = "localhost-testing") { + if let Some(nat_type) = LOCALHOST_TESTING_NAT_TYPE.lock().as_ref() { + return Ok(nat_type.clone()); + } + } + match Self::identify_timeout(IDENTIFY_TIMEOUT, stun_servers).await { Ok(nat_type) => Ok(nat_type), Err(err) => { @@ -393,9 +398,8 @@ pub enum TraversalTypeRequired { } // we only need to check the NAT type once per node -lazy_static::lazy_static! { - pub static ref LOCALHOST_TESTING_NAT_TYPE: citadel_io::Mutex> = citadel_io::Mutex::new(None); -} +static LOCALHOST_TESTING_NAT_TYPE: citadel_io::Mutex> = + citadel_io::Mutex::new(None); impl NatType { /// Returns the NAT traversal type required to access self and other, respectively @@ -547,42 +551,31 @@ async fn get_nat_type(stun_servers: Option>) -> Result Err(anyhow::Error::msg("Unable to get all three STUN addrs")), } }; - let ip_info_future = if cfg!(feature = "localhost-testing") { - Box::pin(async move { Ok(Some(async_ip::IpAddressInfo::localhost())) }) - as Pin< - Box< - dyn Future, async_ip::IpRetrieveError>> - + Send, - >, - > - } else { - Box::pin(async move { - match tokio::time::timeout( - Duration::from_millis(1500), - async_ip::get_all_multi_concurrent(None), - ) - .await - { - Ok(Ok(ip_info)) => Ok(Some(ip_info)), - Ok(Err(err)) => Err(err), - Err(_) => Ok(None), - } - }) + let ip_info_future = async move { + match tokio::time::timeout( + Duration::from_millis(2000), + async_ip::get_all_multi_concurrent(None), + ) + .await + { + Ok(Ok(ip_info)) => Ok(Some(ip_info)), + Ok(Err(err)) => Err(err), + Err(_) => Ok(None), + } }; let (nat_type, ip_info) = tokio::join!(nat_type, ip_info_future); let mut nat_type = nat_type?; - log::trace!(target: "citadel", "NAT Type: {nat_type:?} | IpInfo: {ip_info:?}"); let ip_info = match ip_info { Ok(Some(ip_info)) => ip_info, @@ -597,6 +590,9 @@ async fn get_nat_type(stun_servers: Option>) -> Result Result { + log::trace!(target: "citadel", "[driver] Starting hole puncher ..."); // create stream let stream = &(conn.initiate_subscription().await?); let stun_servers = encrypted_config_container.take_stun_servers(); @@ -91,6 +92,7 @@ async fn driver( conn, )? .await; + res.map_err(|err| { anyhow::Error::msg(format!( "**HOLE-PUNCH-ERR**: {err:?} | local_nat_type: {local_nat_type:?} | peer_nat_type: {peer_nat_type:?}",