diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index 22951e2a5..af803f6d6 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -18,7 +18,7 @@ jobs: core_libs: strategy: matrix: - os: [ubuntu-latest, macos-latest, windows-latest] + os: [ ubuntu-latest, macos-latest, windows-latest ] runs-on: ${{ matrix.os }} timeout-minutes: 80 steps: @@ -40,7 +40,7 @@ jobs: run: mysql -uroot -h127.0.0.1 -ppassword -e 'CREATE DATABASE hyxewave; CREATE DATABASE hyxewave2; set global max_connections = 1000;' - name: Augment connection size for psql if: startsWith(matrix.os, 'ubuntu') - run: psql -c 'ALTER SYSTEM SET max_connections TO 1000' postgresql://postgres:postgres@localhost/hyxewave && psql -c 'ALTER SYSTEM SET max_connections TO 1000' postgresql://postgres:postgres@localhost/hyxewave2 + run: psql -c 'ALTER SYSTEM SET max_connections TO 1000' postgresql://postgres:postgres@localhost/hyxewave && psql -c 'ALTER SYSTEM SET max_connections TO 1000' postgresql://postgres:postgres@localhost/hyxewave2 - name: Add sqlite databases if: startsWith(matrix.os, 'ubuntu') run: touch /home/runner/hyxewave.db && touch /home/runner/hyxewave2.db @@ -127,21 +127,21 @@ jobs: runs-on: ubuntu-latest steps: - uses: Avarok-Cybersecurity/gh-actions-deps@master -# - name: Install Valgrind -# run: | -# sudo apt-get update -y -# sudo apt-get install -y valgrind -# # Compile tests -# - name: cargo build secmem_bytes_test -# run: cargo build --bin secmem_bytes_test + # - name: Install Valgrind + # run: | + # sudo apt-get update -y + # sudo apt-get install -y valgrind + # # Compile tests + # - name: cargo build secmem_bytes_test + # run: cargo build --bin secmem_bytes_test # Run with valgrind -# - name: Run valgrind secmem_bytes -# run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/secmem_bytes_test -# - name: cargo build secmem_string_test -# run: cargo build --bin secmem_string_test + # - name: Run valgrind secmem_bytes + # run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/secmem_bytes_test + # - name: cargo build secmem_string_test + # run: cargo build --bin secmem_string_test # Run with valgrind -# - name: Run valgrind secmem_string -# run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/secmem_string_test + # - name: Run valgrind secmem_string + # run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/secmem_string_test - run: cargo check --package citadel_sdk --release --features=webrtc,sql,redis,multi-threaded - run: cargo install --locked cargo-deny && cargo deny check all - run: rustup component add clippy-preview @@ -149,25 +149,25 @@ jobs: - run: cargo clippy --features=webrtc,sql,redis,multi-threaded --release -- -D warnings - run: cargo clippy --features=webrtc,sql,redis -- -D warnings - run: cargo clippy --features=webrtc,sql,redis --release -- -D warnings - - run: cargo clippy --tests -- -D warnings + - run: cargo clippy --tests --examples -- -D warnings - run: cargo fmt --check - run: RUSTDOCFLAGS="-D warnings" cargo make docs - run: cargo test --package citadel_sdk --doc # - name: cargo build pq_kems -# run: cargo build --bin pq_kems_test -# # Run with valgrind -# - name: Run valgrind pq_kems -# run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/pq_kems_test + # run: cargo build --bin pq_kems_test + # # Run with valgrind + # - name: Run valgrind pq_kems + # run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/pq_kems_test -# features_check: -# name: check features -# runs-on: ubuntu-latest -# steps: -# - uses: Avarok-Cybersecurity/gh-actions-deps@master -# - name: Install features checker -# run: cargo install cargo-all-features -# - name: Check all feature combinations -# run: cargo check-all-features --package citadel_sdk + # features_check: + # name: check features + # runs-on: ubuntu-latest + # steps: + # - uses: Avarok-Cybersecurity/gh-actions-deps@master + # - name: Install features checker + # run: cargo install cargo-all-features + # - name: Check all feature combinations + # run: cargo check-all-features --package citadel_sdk coverage: runs-on: macos-latest @@ -192,11 +192,11 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - nat_type_client_a: - - "full_cone" - - "address_restricted" - - "port_restricted" - - "symmetric" + nat_type_client_a: + - "full_cone" + - "address_restricted" + - "port_restricted" + - "symmetric" timeout-minutes: 60 env: NAT_TYPE_CLIENT_A: ${{ matrix.nat_type_client_a }} diff --git a/Cargo.toml b/Cargo.toml index 8426e664a..c9ebc014a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,38 +3,41 @@ resolver = "2" members = [ "citadel_sdk", - "citadel_wire", - "citadel_user", - "citadel_crypt", - "async_ip", - "citadel_pqcrypto", - "citadel_proto", + "citadel_wire", + "citadel_user", + "citadel_crypt", + "async_ip", + "citadel_pqcrypto", + "citadel_proto", "firebase-rtdb", "netbeam", - "citadel_logging", + "citadel_logging", "citadel_io", "citadel_types", + "example-library", ] exclude = [ "./target/*", - "./examples" ] [workspace.dependencies] # workspace deps -citadel_sdk = { path = "./citadel_sdk", default-features = false, version = "0.11.2" } -citadel_wire = { path = "./citadel_wire", default-features = false, version = "0.11.2" } -citadel_user = { path = "./citadel_user", default-features = false, version = "0.11.2" } -citadel_crypt = { path = "./citadel_crypt", default-features = false, version = "0.11.2" } -citadel_pqcrypto = { path = "./citadel_pqcrypto", default-features = false, version = "0.11.2" } -citadel_proto = { path = "./citadel_proto", default-features = false, version = "0.11.2" } -citadel_logging = { path = "./citadel_logging", default-features = false, version = "0.11.2" } -citadel_io = { path = "./citadel_io", default-features = false, version = "0.11.2" } -citadel_types = { path = "./citadel_types", default-features = false, version = "0.11.2" } -netbeam = { path = "./netbeam", default-features = false, version = "0.11.2" } -firebase-rtdb = { path = "./firebase-rtdb", default-features = false, version = "0.11.2" } -async_ip = { path = "./async_ip", default-features = false, version = "0.11.2" } +citadel_sdk = { path = "./citadel_sdk", default-features = false } +citadel_wire = { path = "./citadel_wire", default-features = false } +citadel_user = { path = "./citadel_user", default-features = false } +citadel_crypt = { path = "./citadel_crypt", default-features = false } +citadel_pqcrypto = { path = "./citadel_pqcrypto", default-features = false } +citadel_proto = { path = "./citadel_proto", default-features = false } +citadel_logging = { path = "./citadel_logging", default-features = false } +citadel_io = { path = "./citadel_io", default-features = false } +citadel_types = { path = "./citadel_types", default-features = false } +netbeam = { path = "./netbeam", default-features = false } +firebase-rtdb = { path = "./firebase-rtdb", default-features = false } +async_ip = { path = "./async_ip", default-features = false } + +# examples +citadel-examples = { path = "./example-library", default-features = false } # ordinary deps generic-array = { version = "0.14.6" } @@ -50,7 +53,7 @@ getrandom = { version = "0.2.8", default-features = false } serde-big-array = { default-features = false, version = "0.5.0" } ascon-aead = { default-features = false, version = "0.4.0" } oqs = { version = "0.9.0", default-features = false } -pqcrypto-falcon-wasi = { version = "0.2.14", default-features=false} +pqcrypto-falcon-wasi = { version = "0.2.14", default-features = false } pqcrypto-traits-wasi = { version = "0.3.4", default-features = false } tracing-subscriber = { version = "0.3.16" } reqwest_wasi = { version = "0.11.16", default-features = false } @@ -65,9 +68,9 @@ sync_wrapper = { default-features = false, version = "1.0.0" } async-recursion = { version = "1.0.4" } rstest = { version = "0.23.0" } bincode = { default-features = false, version = "1.3.3" } -serde = { version="1.0.152", default-features = false } +serde = { version = "1.0.152", default-features = false } futures = { version = "0.3.25", default-features = false } -byteorder = { version = "1.4.3", default-features=false } +byteorder = { version = "1.4.3", default-features = false } num-integer = { default-features = false, version = "0.1.45" } arrayvec = { version = "0.7.2", default-features = false } bitvec = { default-features = false, version = "1.0.1" } @@ -112,7 +115,7 @@ embedded-semver = { version = "0.3.0", default-features = false } auto_impl = { default-features = false, version = "1.0.1" } zerocopy = { default-features = false, version = "0.7.7" } atomic = { default-features = false, version = "0.6.0" } -bytemuck = { default-features = false, version = "1.13.1"} +bytemuck = { default-features = false, version = "1.13.1" } either = { default-features = false, version = "1.8.0" } once_cell = { default-features = false, version = "1.17.0" } webrtc-util = { version = "0.8.0" } diff --git a/Cargo.wasix.toml b/Cargo.wasix.toml index 9cd4f5b01..b102b086a 100644 --- a/Cargo.wasix.toml +++ b/Cargo.wasix.toml @@ -22,18 +22,18 @@ exclude = [ [workspace.dependencies] # workspace deps -citadel_sdk = { path = "./citadel_sdk", default-features = false, version = "0.9.0" } -citadel_wire = { path = "./citadel_wire", default-features = false, version = "0.9.0" } -citadel_user = { path = "./citadel_user", default-features = false, version = "0.9.0" } -citadel_crypt = { path = "./citadel_crypt", default-features = false, version = "0.9.0" } -citadel_pqcrypto = { path = "./citadel_pqcrypto", default-features = false, version = "0.9.0" } -citadel_proto = { path = "./citadel_proto", default-features = false, version = "0.9.0" } -citadel_logging = { path = "./citadel_logging", default-features = false, version = "0.9.0" } -citadel_io = { path = "./citadel_io", default-features = false, version = "0.9.0" } -citadel_types = { path = "./citadel_types", default-features = false, version = "0.9.0" } -netbeam = { path = "./netbeam", default-features = false, version = "0.8.0" } -firebase-rtdb = { path = "./firebase-rtdb", default-features = false, version = "0.8.0" } -async_ip = { path = "./async_ip", default-features = false, version = "0.8.0" } +citadel_sdk = { path = "./citadel_sdk", default-features = false } +citadel_wire = { path = "./citadel_wire", default-features = false } +citadel_user = { path = "./citadel_user", default-features = false } +citadel_crypt = { path = "./citadel_crypt", default-features = false } +citadel_pqcrypto = { path = "./citadel_pqcrypto", default-features = false } +citadel_proto = { path = "./citadel_proto", default-features = false } +citadel_logging = { path = "./citadel_logging", default-features = false } +citadel_io = { path = "./citadel_io", default-features = false } +citadel_types = { path = "./citadel_types", default-features = false } +netbeam = { path = "./netbeam", default-features = false } +firebase-rtdb = { path = "./firebase-rtdb", default-features = false } +async_ip = { path = "./async_ip", default-features = false } # ordinary deps generic-array = { version = "0.14.6" } diff --git a/citadel_proto/src/auth.rs b/citadel_proto/src/auth.rs index d3b3ca1fa..6d73bb8ba 100644 --- a/citadel_proto/src/auth.rs +++ b/citadel_proto/src/auth.rs @@ -29,7 +29,7 @@ impl AuthenticationRequest { } /// No credentials will be used for login, only a one-time device-dependent cryptographic bundle - pub fn passwordless(uuid: Uuid, server_addr: SocketAddr) -> Self { + pub fn transient(uuid: Uuid, server_addr: SocketAddr) -> Self { Self::Passwordless { username: uuid.to_string(), server_addr, diff --git a/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs b/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs index 4f1f3721f..a17242679 100644 --- a/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs +++ b/citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs @@ -625,6 +625,12 @@ pub async fn process_peer_cmd( ) }; + let udp_mode = if udp_rx_opt.is_some() { + UdpMode::Enabled + } else { + UdpMode::Disabled + }; + let channel_signal = NodeResult::PeerChannelCreated(PeerChannelCreated { ticket: ticket_for_chan.unwrap_or(ticket), @@ -660,6 +666,7 @@ pub async fn process_peer_cmd( app, encrypted_config_container, client_config, + udp_mode, ) .await; } @@ -762,6 +769,12 @@ pub async fn process_peer_cmd( ) }; + let udp_mode = if udp_rx_opt.is_some() { + UdpMode::Enabled + } else { + UdpMode::Disabled + }; + let channel_signal = NodeResult::PeerChannelCreated(PeerChannelCreated { ticket: ticket_for_chan.unwrap_or(ticket), @@ -810,6 +823,7 @@ pub async fn process_peer_cmd( app, encrypted_config_container, client_config, + udp_mode, ) .await; } @@ -1136,6 +1150,7 @@ async fn process_signal_command_as_server( ) .await; } + drop(peer_layer_lock); let peer_alert_signal = PeerSignal::DeregistrationSuccess { peer_conn_type: peer_conn_type.reverse(), }; diff --git a/citadel_proto/src/proto/packet_processor/preconnect_packet.rs b/citadel_proto/src/proto/packet_processor/preconnect_packet.rs index 1fbeeae1f..85f90858a 100644 --- a/citadel_proto/src/proto/packet_processor/preconnect_packet.rs +++ b/citadel_proto/src/proto/packet_processor/preconnect_packet.rs @@ -24,7 +24,15 @@ use netbeam::sync::network_endpoint::NetworkEndpoint; use std::sync::atomic::Ordering; /// Handles preconnect packets. Handles the NAT traversal -#[cfg_attr(feature = "localhost-testing", tracing::instrument(level = "trace", target = "citadel", skip_all, ret, err, fields(is_server = session_orig.is_server, src = packet.parse().unwrap().0.session_cid.get(), target = packet.parse().unwrap().0.target_cid.get())))] +#[cfg_attr(feature = "localhost-testing", tracing::instrument( + level = "trace", + target = "citadel", + skip_all, + ret, + err, + fields(is_server = session_orig.is_server, src = packet.parse().unwrap().0.session_cid.get(), target = packet.parse().unwrap().0.target_cid.get() + ) +))] pub async fn process_preconnect( session_orig: &CitadelSession, packet: HdpPacket, @@ -601,14 +609,16 @@ fn handle_success_as_receiver( if let Some(udp_splittable) = udp_splittable { let peer_addr = udp_splittable.peer_addr(); // the UDP subsystem will automatically engage at this point - CitadelSession::udp_socket_loader( - session.clone(), - VirtualTargetType::LocalGroupServer { implicated_cid }, - udp_splittable, - peer_addr, - session.kernel_ticket.get(), - Some(tcp_loaded_alerter_rx), - ); + if state_container.udp_mode == UdpMode::Enabled { + CitadelSession::udp_socket_loader( + session.clone(), + VirtualTargetType::LocalGroupServer { implicated_cid }, + udp_splittable, + peer_addr, + session.kernel_ticket.get(), + Some(tcp_loaded_alerter_rx), + ); + } } else { log::warn!(target: "citadel", "No UDP splittable was specified. UdpMode: {:?}", state_container.udp_mode); } diff --git a/citadel_proto/src/proto/peer/p2p_conn_handler.rs b/citadel_proto/src/proto/peer/p2p_conn_handler.rs index 90c30ccb5..f2f8ca67c 100644 --- a/citadel_proto/src/proto/peer/p2p_conn_handler.rs +++ b/citadel_proto/src/proto/peer/p2p_conn_handler.rs @@ -18,6 +18,7 @@ use crate::proto::peer::peer_layer::PeerConnectionType; use crate::proto::remote::Ticket; use crate::proto::session::CitadelSession; use crate::proto::state_container::VirtualConnectionType; +use citadel_types::prelude::UdpMode; use citadel_user::re_exports::__private::Formatter; use citadel_wire::exports::tokio_rustls::rustls; use citadel_wire::udp_traversal::linear::encrypted_config_container::HolePunchConfigContainer; @@ -75,6 +76,7 @@ async fn setup_listener_non_initiator( v_conn: VirtualConnectionType, hole_punched_addr: TargettedSocketAddr, ticket: Ticket, + udp_mode: UdpMode, ) -> Result<(), NetworkError> { // TODO: use custom self-signed let (listener, _) = Node::create_listen_socket( @@ -90,6 +92,7 @@ async fn setup_listener_non_initiator( v_conn, hole_punched_addr, ticket, + udp_mode, ) .await } @@ -101,6 +104,7 @@ async fn p2p_conn_handler( v_conn: VirtualConnectionType, hole_punched_addr: TargettedSocketAddr, ticket: Ticket, + udp_mode: UdpMode, ) -> Result<(), NetworkError> { let kernel_tx = session.kernel_tx.clone(); let implicated_cid = session.implicated_cid.clone(); @@ -115,12 +119,6 @@ async fn p2p_conn_handler( let session = CitadelSession::upgrade_weak(weak) .ok_or(NetworkError::InternalError("HdpSession dropped"))?; - /* - if p2p_stream.peer_addr()?.ip() != necessary_remote_addr.ip() { - log::warn!(target: "citadel", "Blocked p2p connection from {:?} since IP does not match {:?}", p2p_stream, necessary_remote_addr); - continue; - }*/ - handle_p2p_stream( p2p_stream, implicated_cid, @@ -130,6 +128,7 @@ async fn p2p_conn_handler( v_conn, hole_punched_addr, ticket, + udp_mode, )?; Ok(()) } @@ -159,6 +158,7 @@ fn handle_p2p_stream( v_conn: VirtualConnectionType, hole_punched_addr: TargettedSocketAddr, ticket: Ticket, + udp_mode: UdpMode, ) -> std::io::Result<()> { // SECURITY: Since this branch only occurs IF the primary session is connected, then the primary user is // logged-in. However, what if a malicious user decides to connect here? @@ -205,14 +205,17 @@ fn handle_p2p_stream( state_container .insert_direct_p2p_connection(direct_p2p_remote, v_conn.get_target_cid()) .map_err(|err| generic_error(err.into_string()))?; - CitadelSession::udp_socket_loader( - sess.clone(), - v_conn, - UdpSplittableTypes::Quic(udp_conn), - hole_punched_addr, - ticket, - None, - ); + + if udp_mode == UdpMode::Enabled { + CitadelSession::udp_socket_loader( + sess.clone(), + v_conn, + UdpSplittableTypes::Quic(udp_conn), + hole_punched_addr, + ticket, + None, + ); + } std::mem::drop(state_container); @@ -283,7 +286,15 @@ async fn p2p_stopper(receiver: Receiver<()>) -> Result<(), NetworkError> { } /// Both sides need to begin this process at `sync_time` -#[cfg_attr(feature = "localhost-testing", tracing::instrument(level = "trace", target = "citadel", skip_all, ret, err, fields(implicated_cid=implicated_cid.get(), peer_cid=peer_connection_type.get_original_target_cid())))] +#[cfg_attr(feature = "localhost-testing", tracing::instrument( + level = "trace", + target = "citadel", + skip_all, + ret, + err, + fields(implicated_cid=implicated_cid.get(), peer_cid=peer_connection_type.get_original_target_cid() + ) +))] #[allow(clippy::too_many_arguments)] pub(crate) async fn attempt_simultaneous_hole_punch( peer_connection_type: PeerConnectionType, @@ -297,6 +308,7 @@ pub(crate) async fn attempt_simultaneous_hole_punch( app: NetworkEndpoint, encrypted_config_container: HolePunchConfigContainer, client_config: Arc, + udp_mode: UdpMode, ) -> std::io::Result<()> { let is_initiator = app.is_initiator(); let kernel_tx = &kernel_tx; @@ -346,13 +358,14 @@ pub(crate) async fn attempt_simultaneous_hole_punch( v_conn, addr, ticket, + udp_mode, ) } else { log::trace!(target: "citadel", "Non-initiator will begin listening immediately"); drop(hole_punched_socket); // drop to prevent conflicts caused by SO_REUSE_ADDR - setup_listener_non_initiator(local_addr, remote_connect_addr, session.clone(), v_conn, addr, ticket) + setup_listener_non_initiator(local_addr, remote_connect_addr, session.clone(), v_conn, addr, ticket, udp_mode) .await - .map_err(|err|generic_error(format!("Non-initiator was unable to secure connection despite hole-punching success: {err:?}"))) + .map_err(|err| generic_error(format!("Non-initiator was unable to secure connection despite hole-punching success: {err:?}"))) } }; diff --git a/citadel_proto/src/proto/session.rs b/citadel_proto/src/proto/session.rs index c932533ec..f736788e7 100644 --- a/citadel_proto/src/proto/session.rs +++ b/citadel_proto/src/proto/session.rs @@ -20,43 +20,44 @@ use citadel_wire::hypernode_type::NodeType; use citadel_wire::udp_traversal::targetted_udp_socket_addr::TargettedSocketAddr; use netbeam::time_tracker::TimeTracker; +//use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender, channel, TrySendError}; +use crate::auth::AuthenticationRequest; use crate::constants::{ DRILL_UPDATE_FREQUENCY_LOW_BASE, FIREWALL_KEEP_ALIVE_UDP, GROUP_EXPIRE_TIME_MS, HDP_HEADER_BYTE_LEN, INITIAL_RECONNECT_LOCKOUT_TIME_NS, KEEP_ALIVE_INTERVAL_MS, KEEP_ALIVE_TIMEOUT_NS, LOGIN_EXPIRATION_TIME, }; use crate::error::NetworkError; -use crate::proto::packet::{packet_flags, HdpPacket}; -use crate::proto::packet_crafter::peer_cmd::C2S_ENCRYPTION_ONLY; -use crate::proto::packet_crafter::{self, GroupTransmitter, RatchetPacketCrafterContainer}; -use citadel_types::proto::VirtualObjectMetadata; -//use futures_codec::Framed; -use crate::proto::misc; -use crate::proto::misc::clean_shutdown::{CleanShutdownSink, CleanShutdownStream}; -use crate::proto::misc::dual_rwlock::DualRwLock; -use crate::proto::misc::net::GenericNetworkStream; -use crate::proto::packet_processor::includes::{Duration, SocketAddr}; -use crate::proto::packet_processor::{self, PrimaryProcessorResult}; -use crate::proto::session_manager::HdpSessionManager; -use citadel_types::proto::ConnectMode; -use citadel_types::proto::SessionSecuritySettings; -//use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender, channel, TrySendError}; -use crate::auth::AuthenticationRequest; use crate::kernel::RuntimeFuture; use crate::prelude::{GroupBroadcast, PeerEvent, PeerResponse, PreSharedKey, SecureProtocolPacket}; use crate::proto::endpoint_crypto_accessor::EndpointCryptoAccessor; +//use futures_codec::Framed; +use crate::proto::misc; +use crate::proto::misc::clean_shutdown::{CleanShutdownSink, CleanShutdownStream}; use crate::proto::misc::dual_cell::DualCell; use crate::proto::misc::dual_late_init::DualLateInit; +use crate::proto::misc::dual_rwlock::DualRwLock; +use crate::proto::misc::net::GenericNetworkStream; use crate::proto::misc::udp_internal_interface::{UdpSplittableTypes, UdpStream}; +//use futures_codec::Framed; +use crate::proto::node_result::{Disconnect, InternalServerError, NodeResult}; use crate::proto::outbound_sender::{ channel, unbounded, SendError, UnboundedReceiver, UnboundedSender, }; use crate::proto::outbound_sender::{ OutboundPrimaryStreamReceiver, OutboundPrimaryStreamSender, OutboundUdpSender, }; +use crate::proto::packet::{packet_flags, HdpPacket}; +use crate::proto::packet_crafter::peer_cmd::C2S_ENCRYPTION_ONLY; +use crate::proto::packet_crafter::{self, GroupTransmitter, RatchetPacketCrafterContainer}; +use crate::proto::packet_processor::disconnect_packet::SUCCESS_DISCONNECT; +use crate::proto::packet_processor::includes::{Duration, SocketAddr}; use crate::proto::packet_processor::raw_primary_packet::{check_proxy, ReceivePortType}; +use crate::proto::packet_processor::{self, PrimaryProcessorResult}; use crate::proto::peer::p2p_conn_handler::P2PInboundHandle; use crate::proto::peer::peer_layer::{HyperNodePeerLayer, PeerSignal}; +use crate::proto::remote::{NodeRemote, Ticket}; +use crate::proto::session_manager::HdpSessionManager; use crate::proto::session_queue_handler::{ QueueWorkerResult, QueueWorkerTicket, SessionQueueWorker, SessionQueueWorkerHandle, DRILL_REKEY_WORKER, FIREWALL_KEEP_ALIVE, KEEP_ALIVE_CHECKER, PROVISIONAL_CHECKER, @@ -70,9 +71,14 @@ use crate::proto::state_subcontainers::preconnect_state_container::UdpChannelSen use crate::proto::state_subcontainers::rekey_container::calculate_update_frequency; use crate::proto::transfer_stats::TransferStats; use atomic::Atomic; +use bytemuck::NoUninit; use citadel_crypt::prelude::{ConstructorOpts, FixedSizedSource}; use citadel_crypt::streaming_crypt_scrambler::{scramble_encrypt_source, ObjectSource}; +use citadel_types::crypto::SecurityLevel; +use citadel_types::proto::ConnectMode; +use citadel_types::proto::SessionSecuritySettings; use citadel_types::proto::TransferType; +use citadel_types::proto::VirtualObjectMetadata; use citadel_user::backend::PersistenceHandler; use citadel_wire::exports::tokio_rustls::rustls; use citadel_wire::exports::Connection; @@ -81,12 +87,6 @@ use std::ops::Deref; use std::path::PathBuf; use std::pin::Pin; use std::time::{SystemTime, UNIX_EPOCH}; -//use futures_codec::Framed; -use crate::proto::node_result::{Disconnect, InternalServerError, NodeResult}; -use crate::proto::packet_processor::disconnect_packet::SUCCESS_DISCONNECT; -use crate::proto::remote::{NodeRemote, Ticket}; -use bytemuck::NoUninit; -use citadel_types::crypto::SecurityLevel; //use crate::define_struct; @@ -687,6 +687,8 @@ impl CitadelSession { if udp_mode == UdpMode::Enabled { state_container.pre_connect_state.udp_channel_oneshot_tx = UdpChannelSender::default(); + } else { + state_container.pre_connect_state.udp_channel_oneshot_tx = UdpChannelSender::empty(); } // NEXT STEP: check preconnect, and update internal security-level recv side to the security level found in transfer to ensure all future packages are at that security-level diff --git a/citadel_proto/src/proto/session_manager.rs b/citadel_proto/src/proto/session_manager.rs index 15a20e846..a16cb11bf 100644 --- a/citadel_proto/src/proto/session_manager.rs +++ b/citadel_proto/src/proto/session_manager.rs @@ -341,7 +341,15 @@ impl HdpSessionManager { /// Ensures that the session is removed even if there is a technical error in the underlying stream /// TODO: Make this code less hacky, and make the removal process cleaner. Use RAII on HdpSessionInner? - #[cfg_attr(feature = "localhost-testing", tracing::instrument(level = "trace", target = "citadel", skip_all, ret, err, fields(implicated_cid=new_session.implicated_cid.get(), is_server=new_session.is_server, peer_addr=peer_addr.to_string())))] + #[cfg_attr(feature = "localhost-testing", tracing::instrument( + level = "trace", + target = "citadel", + skip_all, + ret, + err, + fields(implicated_cid=new_session.implicated_cid.get(), is_server=new_session.is_server, peer_addr=peer_addr.to_string() + ) + ))] async fn execute_session_with_safe_shutdown( session_manager: HdpSessionManager, new_session: CitadelSession, @@ -425,11 +433,11 @@ impl HdpSessionManager { log::trace!(target: "citadel", "Alerting {} that {} disconnected", peer_cid, implicated_cid); let peer_conn_type = PeerConnectionType::LocalGroupPeer { implicated_cid, - peer_cid + peer_cid, }; let signal = PeerSignal::Disconnect { peer_conn_type, - disconnect_response: Some(PeerResponse::Disconnected(format!("{peer_cid} disconnected from {implicated_cid} forcibly"))) + disconnect_response: Some(PeerResponse::Disconnected(format!("{peer_cid} disconnected from {implicated_cid} forcibly"))), }; if let Err(_err) = sess_mgr.send_signal_to_peer_direct(peer_cid, |peer_hyper_ratchet| { super::packet_crafter::peer_cmd::craft_peer_signal(peer_hyper_ratchet, signal, Ticket(0), timestamp, security_level) @@ -1094,27 +1102,25 @@ impl HdpSessionManager { return Err("Target CID cannot be equal to the implicated CID".to_string()); } - let account_manager = { inner!(self).account_manager.clone() }; + let (account_manager, peer_sess) = { + let this = inner!(self); + ( + this.account_manager.clone(), + this.sessions.get(&target_cid).map(|r| r.1.clone()), + ) + }; - log::trace!(target: "citadel", "Checking if {} is registered locally ... {:?}", target_cid, signal); + log::trace!(target: "citadel", "Checking if {target_cid} is registered locally ... {signal:?}"); if account_manager .hyperlan_cid_is_registered(target_cid) .await .map_err(|err| err.into_string())? { - let (sess, pers) = { - let this = inner!(self); - let sess = this.sessions.get(&target_cid).map(|r| r.1.clone()); - (sess, this.account_manager.get_persistence_handler().clone()) - }; + let pers = account_manager.get_persistence_handler().clone(); // get the target cid's session - if let Some(ref sess_ref) = sess { - sess_ref - .hypernode_peer_layer - .inner - .write() - .await + if let Some(ref sess_ref) = peer_sess { + peer_layer .insert_tracked_posting(implicated_cid, timeout, ticket, signal, on_timeout) .await; let peer_sender = sess_ref.to_primary_stream.as_ref().unwrap(); diff --git a/citadel_proto/src/proto/state_container.rs b/citadel_proto/src/proto/state_container.rs index c284db01e..c66679c17 100644 --- a/citadel_proto/src/proto/state_container.rs +++ b/citadel_proto/src/proto/state_container.rs @@ -155,7 +155,7 @@ pub(crate) struct InboundFileTransfer { pub total_groups: usize, pub groups_rendered: usize, pub last_group_window_len: usize, - pub last_group_finish_time: Instant, + pub last_group_finish_time: i64, pub ticket: Ticket, pub virtual_target: VirtualTargetType, pub metadata: VirtualObjectMetadata, @@ -1135,7 +1135,7 @@ impl StateContainerInner { let (reception_complete_tx, success_receiving_rx) = citadel_io::tokio::sync::oneshot::channel(); let entry = InboundFileTransfer { - last_group_finish_time: Instant::now(), + last_group_finish_time: tt.get_global_time_ns(), last_group_window_len: 0, object_id, total_groups: metadata_orig.group_count, @@ -1556,6 +1556,7 @@ impl StateContainerInner { GroupReceiverStatus::GROUP_COMPLETE(_last_wid) => { let receiver = self.inbound_groups.remove(&group_key).unwrap().receiver; let mut chunk = receiver.finalize(); + let bytes_in_group = chunk.len(); log::trace!(target: "citadel", "GROUP {} COMPLETE. Total groups: {} | Plaintext len: {} | Received plaintext len: {}", group_id, file_container.total_groups, file_container.metadata.plaintext_length, chunk.len()); if let Some(local_encryption_level) = file_container.local_encryption_level { @@ -1597,20 +1598,22 @@ impl StateContainerInner { ) })?; } else { - let now = Instant::now(); - let bytes_per_sec = file_container.metadata.plaintext_length as f32 - / now - .duration_since(file_container.last_group_finish_time) - .as_secs_f32() - .round(); - let mb_per_sec = bytes_per_sec / (1024.0f32 * 1024.0f32); - log::info!(target: "citadel", "Sending reception tick for group {} of {} | {:.2} MB/s", group_id, file_container.total_groups, mb_per_sec); + let now = self.time_tracker.get_global_time_ns(); + let elapsed_nanos = + now.saturating_sub(file_container.last_group_finish_time) as f64; + let bytes_per_ns = bytes_in_group as f64 / elapsed_nanos; // unit: bytes/ns + // convert bytes per period into MB/s + let mb_per_sec = bytes_per_ns * 1_000_000_000f64; // unit: bytes/sec + let mb_per_sec = mb_per_sec / 1_000_000f64; // unit: MB/sec + // Only use 2 decimals + let mb_per_sec = (mb_per_sec * 100.0).round() / 100.0; + log::trace!(target: "citadel", "Sending reception tick for group {} of {} | {} MB/s", group_id, file_container.total_groups, mb_per_sec); file_container.last_group_finish_time = now; let status = ObjectTransferStatus::ReceptionTick( group_id as usize, file_container.total_groups, - mb_per_sec, + mb_per_sec as f32, ); // sending the wave ack will complete the group on the initiator side file_transfer_handle.unbounded_send(status).map_err(|err| { diff --git a/citadel_sdk/src/builder/node_builder.rs b/citadel_sdk/src/builder/node_builder.rs index 839a59fd3..d2d8a0b80 100644 --- a/citadel_sdk/src/builder/node_builder.rs +++ b/citadel_sdk/src/builder/node_builder.rs @@ -190,7 +190,7 @@ impl NodeBuilder { self } - /// Attaches miscellaneous server settings (e.g., passwordless mode, credential requirements) + /// Attaches miscellaneous server settings (e.g., transient mode, credential requirements) pub fn with_server_misc_settings(&mut self, misc_settings: ServerMiscSettings) -> &mut Self { self.server_misc_settings = Some(misc_settings); self @@ -354,11 +354,14 @@ mod tests { #[timeout(std::time::Duration::from_secs(60))] #[allow(clippy::let_underscore_must_use)] async fn test_options( - #[values(ServerUnderlyingProtocol::new_quic_self_signed(), ServerUnderlyingProtocol::new_tls_self_signed().unwrap())] + #[values(ServerUnderlyingProtocol::new_quic_self_signed(), ServerUnderlyingProtocol::new_tls_self_signed().unwrap() + )] underlying_protocol: ServerUnderlyingProtocol, - #[values(NodeType::Peer, NodeType::Server(std::net::SocketAddr::from_str("127.0.0.1:9999").unwrap()))] + #[values(NodeType::Peer, NodeType::Server(std::net::SocketAddr::from_str("127.0.0.1:9999").unwrap() + ))] node_type: NodeType, - #[values(KernelExecutorSettings::default(), KernelExecutorSettings::default().with_max_concurrency(2))] + #[values(KernelExecutorSettings::default(), KernelExecutorSettings::default().with_max_concurrency(2) + )] kernel_settings: KernelExecutorSettings, #[values(BackendType::InMemory, BackendType::new("file:/hello_world/path/").unwrap())] backend_type: BackendType, diff --git a/citadel_sdk/src/fs.rs b/citadel_sdk/src/fs.rs index e41cd8325..763549b17 100644 --- a/citadel_sdk/src/fs.rs +++ b/citadel_sdk/src/fs.rs @@ -78,15 +78,14 @@ pub async fn delete + Send>( #[cfg(test)] mod tests { use crate::prefabs::client::single_connection::SingleClientServerConnectionKernel; - use crate::prefabs::server::accept_file_transfer_kernel::{ - exhaust_file_transfer, AcceptFileTransferKernel, - }; + use crate::prefabs::server::accept_file_transfer_kernel::AcceptFileTransferKernel; use crate::prefabs::client::peer_connection::{FileTransferHandleRx, PeerConnectionKernel}; use crate::prefabs::client::ServerConnectionSettingsBuilder; use crate::prelude::*; use crate::test_common::wait_for_peers; use citadel_io::tokio; + use futures::StreamExt; use rstest::rstest; use std::net::SocketAddr; use std::path::PathBuf; @@ -109,7 +108,7 @@ mod tests { KemAlgorithm::Kyber, SigAlgorithm::Falcon1024 )] - #[timeout(std::time::Duration::from_secs(90))] + #[timeout(Duration::from_secs(90))] #[citadel_io::tokio::test] async fn test_c2s_file_transfer_revfs( #[case] enx: EncryptionAlgorithm, @@ -131,7 +130,7 @@ mod tests { .unwrap(); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .disable_udp() .with_session_security_settings(session_security_settings) .build() @@ -204,7 +203,7 @@ mod tests { .unwrap(); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .disable_udp() .with_session_security_settings(session_security_settings) .build() @@ -279,7 +278,7 @@ mod tests { .unwrap(); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .disable_udp() .with_session_security_settings(session_security_settings) .build() @@ -327,15 +326,19 @@ mod tests { assert!(client_success.load(Ordering::Relaxed)); } + #[rstest] + #[case(SecrecyMode::BestEffort)] + #[timeout(Duration::from_secs(60))] #[citadel_io::tokio::test(flavor = "multi_thread")] - async fn test_p2p_file_transfer_revfs() { + async fn test_p2p_file_transfer_revfs( + #[case] secrecy_mode: SecrecyMode, + #[values(KemAlgorithm::Kyber)] kem: KemAlgorithm, + #[values(EncryptionAlgorithm::AES_GCM_256)] enx: EncryptionAlgorithm, + ) { citadel_logging::setup_log(); crate::test_common::TestBarrier::setup(2); let client0_success = &AtomicBool::new(false); let client1_success = &AtomicBool::new(false); - let enx = EncryptionAlgorithm::AES_GCM_256; - let secrecy_mode = SecrecyMode::BestEffort; - let kem = KemAlgorithm::Kyber; let (server, server_addr) = crate::test_common::server_info(); @@ -352,16 +355,23 @@ mod tests { let source_dir = &PathBuf::from("../resources/TheBridge.pdf"); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid0) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid0) .disable_udp() .with_session_security_settings(session_security) .build() .unwrap(); + let peer_conn_0 = PeerConnectionSetupAggregator::default() + .with_peer_custom(uuid1) + .ensure_registered() + .with_session_security_settings(session_security) + .enable_udp() + .add(); + // TODO: SinglePeerConnectionKernel let client_kernel0 = PeerConnectionKernel::new( server_connection_settings, - uuid1, + peer_conn_0, move |mut connection, remote_outer| async move { wait_for_peers().await; let mut connection = connection.recv().await.unwrap()?; @@ -400,15 +410,22 @@ mod tests { ); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid1) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid1) .disable_udp() .with_session_security_settings(session_security) .build() .unwrap(); + let peer_conn_1 = PeerConnectionSetupAggregator::default() + .with_peer_custom(uuid0) + .ensure_registered() + .with_session_security_settings(session_security) + .enable_udp() + .add(); + let client_kernel1 = PeerConnectionKernel::new( server_connection_settings, - uuid0, + peer_conn_1, move |mut connection, remote_outer| async move { wait_for_peers().await; let mut connection = connection.recv().await.unwrap()?; @@ -478,4 +495,22 @@ mod tests { drop(handle); } + + pub fn exhaust_file_transfer(mut handle: ObjectTransferHandler) { + // Exhaust the stream + let handle = citadel_io::tokio::task::spawn(async move { + while let Some(evt) = handle.next().await { + log::info!(target: "citadel", "File Transfer Event: {evt:?}"); + if let ObjectTransferStatus::Fail(err) = &evt { + log::error!(target: "citadel", "File Transfer Failed: {err:?}"); + } else if let ObjectTransferStatus::TransferComplete = &evt { + break; + } else if let ObjectTransferStatus::ReceptionComplete = &evt { + break; + } + } + }); + + drop(handle); + } } diff --git a/citadel_sdk/src/lib.rs b/citadel_sdk/src/lib.rs index 96b7a4a62..2d5aa2647 100644 --- a/citadel_sdk/src/lib.rs +++ b/citadel_sdk/src/lib.rs @@ -177,7 +177,7 @@ //! //! // await the server to execute //! # async move { -//! let result = server.await; +//! let result = server.await; //! # }; //! # Ok::<(), Box>(()) //! ``` @@ -202,7 +202,7 @@ //! //! let client = NodeBuilder::default().build(client_kernel)?; //! # async move { -//! let result = client.await; +//! let result = client.await; //! # }; //! # Ok::<(), Box>(()) //! ``` diff --git a/citadel_sdk/src/prefabs/client/broadcast.rs b/citadel_sdk/src/prefabs/client/broadcast.rs index 679220722..41e6d7bd3 100644 --- a/citadel_sdk/src/prefabs/client/broadcast.rs +++ b/citadel_sdk/src/prefabs/client/broadcast.rs @@ -423,7 +423,7 @@ mod tests { }; let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .build() .unwrap(); @@ -505,7 +505,7 @@ mod tests { .collect::>(); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .build() .unwrap(); diff --git a/citadel_sdk/src/prefabs/client/mod.rs b/citadel_sdk/src/prefabs/client/mod.rs index 5692a6773..f453fce08 100644 --- a/citadel_sdk/src/prefabs/client/mod.rs +++ b/citadel_sdk/src/prefabs/client/mod.rs @@ -77,13 +77,20 @@ pub struct ServerConnectionSettingsBuilder { address: Option, udp_mode: Option, session_security_settings: Option, - authless_uuid: Option, + transient_uuid: Option, is_connect: bool, } impl ServerConnectionSettingsBuilder { - /// Creates a new connection to a central server that uses no credentialed authentication, only a secure channel - pub fn no_credentials(addr: T, id: Uuid) -> Self { + /// Creates a new connection to a central server that does not persist client metadata and account information + /// after the connection is dropped to the server. This is ideal for applications that do not require + /// persistence. + pub fn transient(addr: T) -> Self { + Self::transient_with_id(addr, Uuid::new_v4()) + } + + /// See docs for `transient`. This function allows you to specify a custom UUID for the transient connection. + pub fn transient_with_id(addr: T, id: impl Into) -> Self { Self { password: None, username: None, @@ -91,7 +98,7 @@ impl ServerConnectionSettingsBuilder { session_security_settings: None, name: None, psk: None, - authless_uuid: Some(id), + transient_uuid: Some(id.into()), address: Some(addr), is_connect: false, } @@ -110,7 +117,7 @@ impl ServerConnectionSettingsBuilder { username: Some(username.into()), name: Some(alias.into()), psk: None, - authless_uuid: None, + transient_uuid: None, address: Some(addr), udp_mode: None, session_security_settings: None, @@ -129,7 +136,7 @@ impl ServerConnectionSettingsBuilder { username: Some(username.into()), name: None, psk: None, - authless_uuid: None, + transient_uuid: None, address: Some(addr), udp_mode: None, session_security_settings: None, @@ -154,6 +161,10 @@ impl ServerConnectionSettingsBuilder { self.with_udp_mode(UdpMode::Disabled) } + pub fn enable_udp(self) -> Self { + self.with_udp_mode(UdpMode::Enabled) + } + /// Adds a session security settings to the client-to-server connection. This is necessary for the server to know how to handle the connection. pub fn with_session_security_settings>( mut self, @@ -176,8 +187,8 @@ impl ServerConnectionSettingsBuilder { None }; - if let Some(uuid) = self.authless_uuid { - Ok(ServerConnectionSettings::NoCredentials { + if let Some(uuid) = self.transient_uuid { + Ok(ServerConnectionSettings::Transient { server_addr: server_addr .ok_or(NetworkError::Generic("No address found".to_string()))?, uuid, @@ -220,7 +231,7 @@ impl ServerConnectionSettingsBuilder { /// The settings for a client-to-server connection pub enum ServerConnectionSettings { - NoCredentials { + Transient { server_addr: SocketAddr, uuid: Uuid, udp_mode: UdpMode, @@ -248,7 +259,7 @@ pub enum ServerConnectionSettings { impl ServerConnectionSettings { pub(crate) fn udp_mode(&self) -> UdpMode { match self { - Self::NoCredentials { udp_mode, .. } => *udp_mode, + Self::Transient { udp_mode, .. } => *udp_mode, Self::CredentialedRegister { udp_mode, .. } => *udp_mode, Self::CredentialedConnect { udp_mode, .. } => *udp_mode, } @@ -256,7 +267,7 @@ impl ServerConnectionSettings { pub(crate) fn session_security_settings(&self) -> SessionSecuritySettings { match self { - Self::NoCredentials { + Self::Transient { session_security_settings, .. } => *session_security_settings, @@ -273,7 +284,7 @@ impl ServerConnectionSettings { pub(crate) fn pre_shared_key(&self) -> Option<&PreSharedKey> { match self { - Self::NoCredentials { pre_shared_key, .. } => pre_shared_key.as_ref(), + Self::Transient { pre_shared_key, .. } => pre_shared_key.as_ref(), Self::CredentialedRegister { pre_shared_key, .. } => pre_shared_key.as_ref(), Self::CredentialedConnect { pre_shared_key, .. } => pre_shared_key.as_ref(), } diff --git a/citadel_sdk/src/prefabs/client/peer_connection.rs b/citadel_sdk/src/prefabs/client/peer_connection.rs index d8d0c7be6..6902c08e0 100644 --- a/citadel_sdk/src/prefabs/client/peer_connection.rs +++ b/citadel_sdk/src/prefabs/client/peer_connection.rs @@ -3,12 +3,13 @@ use crate::prelude::results::PeerConnectSuccess; use crate::prelude::*; use crate::test_common::wait_for_peers; use citadel_io::tokio::sync::mpsc::{Receiver, UnboundedSender}; -use citadel_io::Mutex; +use citadel_io::{tokio, Mutex}; use citadel_proto::re_imports::async_trait; use citadel_user::hypernode_account::UserIdentifierExt; use futures::stream::FuturesUnordered; -use futures::{Future, TryStreamExt}; +use futures::TryStreamExt; use std::collections::HashMap; +use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; use uuid::Uuid; @@ -40,6 +41,27 @@ pub struct FileTransferHandleRx { pub conn_type: VirtualTargetType, } +impl FileTransferHandleRx { + /// Accepts all incoming file transfer handles and processes them in the background + pub fn accept_all(mut self) { + let task = tokio::task::spawn(async move { + let rx = &mut self.inner; + while let Some(mut handle) = rx.recv().await { + let task = tokio::task::spawn(async move { + if let Err(err) = handle.exhaust_stream().await { + let orientation = handle.orientation; + log::warn!(target: "citadel", "Error background handling of file transfer for {orientation:?}: {err:?}"); + } + }); + + drop(task); + } + }); + + drop(task); + } +} + impl std::ops::Deref for FileTransferHandleRx { type Target = citadel_io::tokio::sync::mpsc::UnboundedReceiver; @@ -194,6 +216,16 @@ impl AddedPeer { self } + /// Disables the UDP mode for the client-to-server connection. The default setting is Disabled + pub fn disable_udp(self) -> Self { + self.with_udp_mode(UdpMode::Disabled) + } + + /// Enables the UDP mode for the client-to-server connection. The default setting is Disabled + pub fn enable_udp(self) -> Self { + self.with_udp_mode(UdpMode::Enabled) + } + /// Sets the [`SessionSecuritySettings`] for this peer to peer connection pub fn with_session_security_settings( mut self, @@ -359,7 +391,7 @@ where for (mutually_registered, peer_to_connect) in peers_already_registered.into_iter().zip(peers_to_connect) { - // each task will be responsible for possibly registering to and connecting + // Each task will be responsible for possibly registering to and connecting // with the desired peer let remote = remote.clone(); let PeerConnectionSettings { @@ -378,6 +410,7 @@ where remote.find_target(implicated_cid, id).await? } else { // TODO: optimize peer registration + connection in one go + log::info!(target: "citadel", "{implicated_cid} proposing target {id:?} to central node"); let handle = remote.propose_target(implicated_cid, id.clone()).await?; // if the peer is not yet registered to the central node, wait for it to become registered // this is useful especially for testing purposes @@ -393,8 +426,9 @@ where } } + log::info!(target: "citadel", "{implicated_cid} registering to peer {id:?}"); let _reg_success = handle.register_to_peer().await?; - log::info!(target: "citadel", "Peer {:?} registered || success -> now connecting", id); + log::info!(target: "citadel", "{implicated_cid} registered to peer {id:?} registered || success -> now connecting"); handle }; @@ -454,13 +488,16 @@ mod tests { use crate::prefabs::client::peer_connection::PeerConnectionKernel; use crate::prefabs::client::ServerConnectionSettingsBuilder; use crate::prelude::*; + use crate::remote_ext::results::PeerConnectSuccess; use crate::test_common::{server_info, wait_for_peers, TestBarrier}; use citadel_io::tokio; + use citadel_io::tokio::sync::mpsc::{Receiver, UnboundedSender}; use citadel_user::prelude::UserIdentifierExt; use futures::stream::FuturesUnordered; use futures::TryStreamExt; use rstest::rstest; use std::collections::HashMap; + use std::future::Future; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use uuid::Uuid; @@ -505,6 +542,7 @@ mod tests { for peer in peers { agg = agg .with_peer_custom(peer) + .ensure_registered() .with_udp_mode(udp_mode) .with_session_security_settings(SessionSecuritySettings::default()) .add(); @@ -520,31 +558,38 @@ mod tests { .build() .unwrap(); + let username = username.clone(); + let client_kernel = PeerConnectionKernel::new( server_connection_settings, agg.clone(), - move |mut results, mut remote| async move { - let mut success = 0; - let mut p2p_remotes = HashMap::new(); - - while let Some(conn) = results.recv().await { - log::trace!(target: "citadel", "User {} received {:?}", username, conn); - let mut conn = conn?; - crate::test_common::udp_mode_assertions( - udp_mode, - conn.udp_channel_rx.take(), - ) - .await; - success += 1; - let _ = - p2p_remotes.insert(conn.channel.get_peer_cid(), conn.remote.clone()); - if success == peer_count - 1 { - break; - } - } + move |results, mut remote| async move { + log::info!(target: "citadel", "***PEER {username} CONNECTED ***"); + let implicated_cid = remote.conn_type.get_implicated_cid(); + let check = move |conn: PeerConnectSuccess| async move { + let implicated_cid = conn.channel.get_implicated_cid(); + let _mutual_peers = conn + .remote + .remote() + .get_local_group_mutual_peers(implicated_cid) + .await + .unwrap(); + conn + }; + let p2p_remotes = handle_peer_connect_successes( + results, + implicated_cid, + peer_count, + udp_mode, + check, + ) + .await + .into_iter() + .map(|r| (r.channel.get_peer_cid(), r.remote)) + .collect::>(); - // by now, all the network peers have been registered to - // test that getting the peers (not necessarily mutual) + // By now, all the network peers have been registered to. + // Test that getting the peers (not necessarily mutual) // show up let network_peers = remote.get_peers(None).await.unwrap(); for user in agg.inner { @@ -563,7 +608,7 @@ mod tests { assert!(mutual_peers.iter().any(|r| r.cid == peer_cid)) } - log::trace!(target: "citadel", "***PEER {} CONNECT RESULT: {}***", username, success); + log::info!(target: "citadel", "***PEER {username} finished all checks***"); let _ = client_success.fetch_add(1, Ordering::Relaxed); wait_for_peers().await; remote.shutdown_kernel().await @@ -586,12 +631,13 @@ mod tests { #[case(3)] #[timeout(Duration::from_secs(90))] #[tokio::test(flavor = "multi_thread")] - async fn peer_to_peer_connect_passwordless( + async fn peer_to_peer_connect_transient( #[case] peer_count: usize, ) -> Result<(), Box> { assert!(peer_count > 1); citadel_logging::setup_log(); TestBarrier::setup(peer_count); + let udp_mode = UdpMode::Enabled; let do_deregister = peer_count == 2; @@ -612,33 +658,36 @@ mod tests { .map(UserIdentifier::from) .collect::>(); + let mut agg = PeerConnectionSetupAggregator::default(); + + for peer in peers { + agg = agg + .with_peer_custom(peer) + .with_udp_mode(udp_mode) + .ensure_registered() + .with_session_security_settings(SessionSecuritySettings::default()) + .add(); + } + let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .build() .unwrap(); let client_kernel = PeerConnectionKernel::new( server_connection_settings, - peers, - move |mut results, remote| async move { - let mut success = 0; + agg, + move |results, remote| async move { + log::info!(target: "citadel", "***PEER {uuid} CONNECTED***"); let implicated_cid = remote.conn_type.get_implicated_cid(); - while let Some(conn) = results.recv().await { - log::trace!(target: "citadel", "User {} received {:?}", uuid, conn); - let mut conn = conn?; + let check = move |conn: PeerConnectSuccess| async move { let peer_cid = conn.channel.get_peer_cid(); - - crate::test_common::p2p_assertions(implicated_cid, &conn).await; - - crate::test_common::udp_mode_assertions( - Default::default(), - conn.udp_channel_rx.take(), - ) - .await; - if do_deregister { - conn.remote.deregister().await?; + conn.remote + .deregister() + .await + .expect("Deregistration failed"); assert!(!conn .remote .inner @@ -648,21 +697,26 @@ mod tests { .await .unwrap()); } + conn + }; - success += 1; - if success == peer_count - 1 { - break; - } - } + let _ = handle_peer_connect_successes( + results, + implicated_cid, + peer_count, + udp_mode, + check, + ) + .await; - log::trace!(target: "citadel", "***PEER {} CONNECT RESULT: {}***", uuid, success); + log::info!(target: "citadel", "***PEER {uuid} finished all checks***"); let _ = client_success.fetch_add(1, Ordering::Relaxed); wait_for_peers().await; remote.shutdown_kernel().await }, ); - let client = NodeBuilder::default().build(client_kernel).unwrap(); + let client = NodeBuilder::default().build(client_kernel)?; client_kernels.push(async move { client.await.map(|_| ()) }); } @@ -681,16 +735,18 @@ mod tests { #[rstest] #[case(2)] + #[case(3)] #[timeout(std::time::Duration::from_secs(90))] - #[citadel_io::tokio::test(flavor = "multi_thread")] + #[tokio::test(flavor = "multi_thread")] async fn test_peer_to_peer_file_transfer( #[case] peer_count: usize, ) -> Result<(), Box> { assert!(peer_count > 1); citadel_logging::setup_log(); TestBarrier::setup(peer_count); + let udp_mode = UdpMode::Enabled; - let client_success = &AtomicBool::new(false); + let sender_success = &AtomicBool::new(false); let receiver_success = &AtomicBool::new(false); let (server, server_addr) = server_info(); @@ -700,46 +756,59 @@ mod tests { .map(|_| Uuid::new_v4()) .collect::>(); + let sender_uuid = total_peers[0]; + for idx in 0..peer_count { let uuid = total_peers.get(idx).cloned().unwrap(); - let peers = total_peers + let mut peers = total_peers .clone() .into_iter() .filter(|r| r != &uuid) .map(UserIdentifier::from) .collect::>(); + // 0: [1, 2] <-- At idx 0, we want the sender to connect to all other peers + // 1: [0] <-- At idx 1, we want the receiver to connect to the sender + // 2: [0] <-- At idx 2, we want the receiver to connect to the sender + // .. + // n: [0] <-- At idx n, we want the receiver to connect to the sender + if idx != 0 { + peers = vec![sender_uuid.into()]; + } + + let mut agg = PeerConnectionSetupAggregator::default(); + + for peer in peers { + agg = agg + .with_peer_custom(peer) + .ensure_registered() + .with_udp_mode(udp_mode) + .with_session_security_settings(SessionSecuritySettings::default()) + .add(); + } let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .build() .unwrap(); let client_kernel = PeerConnectionKernel::new( server_connection_settings, - peers, - move |mut results, remote| async move { - let mut success = 0; + agg, + move |results, remote| async move { + log::info!(target: "citadel", "***PEER {uuid} CONNECTED***"); + wait_for_peers().await; let implicated_cid = remote.conn_type.get_implicated_cid(); - - while let Some(conn) = results.recv().await { - log::trace!(target: "citadel", "User {} received {:?}", uuid, conn); - wait_for_peers().await; - let mut conn = conn?; - //let peer_cid = conn.channel.get_peer_cid(); - - crate::test_common::p2p_assertions(implicated_cid, &conn).await; - - // one user will send the file, the other will receive the file - if idx == 0 { + let is_sender = idx == 0; // the first peer is the sender, the rest are receivers + let check = move |mut conn: PeerConnectSuccess| async move { + if is_sender { conn.remote .send_file_with_custom_opts( "../resources/TheBridge.pdf", 32 * 1024, TransferType::FileTransfer, ) - .await?; - - client_success.store(true, Ordering::Relaxed); + .await + .expect("Failed to send file"); } else { // TODO: route file-transfer + other events to peer channel let mut handle = conn @@ -757,7 +826,6 @@ mod tests { while let Some(status) = handle.next().await { match status { ObjectTransferStatus::ReceptionComplete => { - log::trace!(target: "citadel", "Peer has finished receiving the file!"); let cmp = include_bytes!("../../../../resources/TheBridge.pdf"); let streamed_data = @@ -770,6 +838,7 @@ mod tests { "Original data and streamed data does not match" ); + log::info!(target: "citadel", "Peer has finished receiving and verifying the file!"); break; } @@ -781,18 +850,31 @@ mod tests { _ => {} } } - - receiver_success.store(true, Ordering::Relaxed); } - success += 1; - if success == peer_count - 1 { - break; - } + conn + }; + // Use a peer count of two since we only have one sender and one receiver per pair + // However, we need a way of ensuring we collect three results + let peer_count = if idx == 0 { peer_count } else { 2 }; + let _ = handle_peer_connect_successes( + results, + implicated_cid, + peer_count, + udp_mode, + check, + ) + .await; + + if is_sender { + sender_success.store(true, Ordering::Relaxed); + } else { + receiver_success.store(true, Ordering::Relaxed); } - log::trace!(target: "citadel", "***PEER {} CONNECT RESULT: {}***", uuid, success); + log::info!(target: "citadel", "***PEER {uuid} (is_sender: {is_sender}) finished all checks***"); wait_for_peers().await; + log::info!(target: "citadel", "***PEER {uuid} (is_sender: {is_sender}) shutting down***"); remote.shutdown_kernel().await }, ); @@ -810,7 +892,7 @@ mod tests { }; } - assert!(client_success.load(Ordering::Relaxed)); + assert!(sender_success.load(Ordering::Relaxed)); assert!(receiver_success.load(Ordering::Relaxed)); Ok(()) } @@ -818,13 +900,14 @@ mod tests { #[rstest] #[case(2)] #[timeout(std::time::Duration::from_secs(90))] - #[citadel_io::tokio::test(flavor = "multi_thread")] + #[tokio::test(flavor = "multi_thread")] async fn test_peer_to_peer_rekey( #[case] peer_count: usize, ) -> Result<(), Box> { assert!(peer_count > 1); citadel_logging::setup_log(); TestBarrier::setup(peer_count); + let udp_mode = UdpMode::Enabled; let client_success = &AtomicUsize::new(0); let (server, server_addr) = server_info(); @@ -843,36 +926,51 @@ mod tests { .map(UserIdentifier::from) .collect::>(); + let mut agg = PeerConnectionSetupAggregator::default(); + + for peer in peers { + agg = agg + .with_peer_custom(peer) + .ensure_registered() + .with_udp_mode(udp_mode) + .with_session_security_settings(SessionSecuritySettings::default()) + .add(); + } + let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .build() .unwrap(); let client_kernel = PeerConnectionKernel::new( server_connection_settings, - peers, - move |mut results, remote| async move { - let mut success = 0; + agg, + move |results, remote| async move { + log::info!(target: "citadel", "***PEER {uuid} CONNECTED***"); let implicated_cid = remote.conn_type.get_implicated_cid(); - while let Some(conn) = results.recv().await { - log::trace!(target: "citadel", "User {} received {:?}", uuid, conn); - let conn = conn?; - crate::test_common::p2p_assertions(implicated_cid, &conn).await; - + let check = move |conn: PeerConnectSuccess| async move { if idx == 0 { for x in 1..10 { - assert_eq!(conn.remote.rekey().await?, Some(x)); + assert_eq!( + conn.remote.rekey().await.expect("Failed to rekey"), + Some(x) + ); } } - success += 1; - if success == peer_count - 1 { - break; - } - } + conn + }; + let _ = handle_peer_connect_successes( + results, + implicated_cid, + peer_count, + udp_mode, + check, + ) + .await; - log::trace!(target: "citadel", "***PEER {} CONNECT RESULT: {}***", uuid, success); + log::info!(target: "citadel", "***PEER {uuid} finished all checks***"); let _ = client_success.fetch_add(1, Ordering::Relaxed); wait_for_peers().await; remote.shutdown_kernel().await @@ -899,13 +997,14 @@ mod tests { #[rstest] #[case(2)] #[timeout(std::time::Duration::from_secs(90))] - #[citadel_io::tokio::test(flavor = "multi_thread")] + #[tokio::test(flavor = "multi_thread")] async fn test_peer_to_peer_disconnect( #[case] peer_count: usize, ) -> Result<(), Box> { assert!(peer_count > 1); citadel_logging::setup_log(); TestBarrier::setup(peer_count); + let udp_mode = UdpMode::Enabled; let client_success = &AtomicUsize::new(0); let (server, server_addr) = server_info(); @@ -924,30 +1023,46 @@ mod tests { .map(UserIdentifier::from) .collect::>(); + let mut agg = PeerConnectionSetupAggregator::default(); + + for peer in peers { + agg = agg + .with_peer_custom(peer) + .ensure_registered() + .with_udp_mode(udp_mode) + .with_session_security_settings(SessionSecuritySettings::default()) + .add(); + } + let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .build() .unwrap(); let client_kernel = PeerConnectionKernel::new( server_connection_settings, - peers, - move |mut results, remote| async move { - let mut success = 0; + agg, + move |results, remote| async move { + log::info!(target: "citadel", "***PEER {uuid} CONNECTED***"); let implicated_cid = remote.conn_type.get_implicated_cid(); - while let Some(conn) = results.recv().await { - log::trace!(target: "citadel", "User {} received {:?}", uuid, conn); - let conn = conn?; - crate::test_common::p2p_assertions(implicated_cid, &conn).await; - conn.remote.disconnect().await?; - success += 1; - if success == peer_count - 1 { - break; - } - } + let check = move |conn: PeerConnectSuccess| async move { + conn.remote + .disconnect() + .await + .expect("Failed to p2p disconnect"); + conn + }; + let _ = handle_peer_connect_successes( + results, + implicated_cid, + peer_count, + udp_mode, + check, + ) + .await; + log::info!(target: "citadel", "***PEER {uuid} finished all checks***"); - log::trace!(target: "citadel", "***PEER {} CONNECT RESULT: {}***", uuid, success); let _ = client_success.fetch_add(1, Ordering::Relaxed); wait_for_peers().await; remote.shutdown_kernel().await @@ -997,6 +1112,7 @@ mod tests { let mut peer0_agg = PeerConnectionSetupAggregator::default() .with_peer_custom(uuid1) + .ensure_registered() .with_session_security_settings(session_security); if let Some(password) = p2p_password { @@ -1007,6 +1123,7 @@ mod tests { let mut peer1_agg = PeerConnectionSetupAggregator::default() .with_peer_custom(uuid0) + .ensure_registered() .with_session_security_settings(session_security); if let Some(_password) = p2p_password { @@ -1016,14 +1133,14 @@ mod tests { let peer1_connection = peer1_agg.add(); let server_connection_settings0 = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid0) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid0) .with_udp_mode(UdpMode::Enabled) .with_session_security_settings(session_security) .build() .unwrap(); let server_connection_settings1 = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid1) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid1) .with_udp_mode(UdpMode::Enabled) .with_session_security_settings(session_security) .build() @@ -1078,4 +1195,83 @@ mod tests { assert!(!peer_0_error_received.load(Ordering::SeqCst)); assert!(!peer_1_error_received.load(Ordering::SeqCst)); } + + async fn handle_peer_connect_successes( + mut conn_rx: Receiver>, + implicated_cid: u64, + peer_count: usize, + udp_mode: UdpMode, + checks: F, + ) -> Vec + where + F: for<'a> Fn(PeerConnectSuccess) -> Fut + Send + Clone + 'static, + Fut: Future + Send, + { + let (finished_tx, finished_rx) = tokio::sync::oneshot::channel(); + + let task = async move { + let (done_tx, mut done_rx) = tokio::sync::mpsc::unbounded_channel(); + let mut conns = vec![]; + while let Some(conn) = conn_rx.recv().await { + conns.push(conn); + if conns.len() == peer_count - 1 { + break; + } + } + + for conn in conns { + let conn = conn.expect("Error receiving peer connection"); + handle_peer_connect_success( + conn, + done_tx.clone(), + implicated_cid, + udp_mode, + checks.clone(), + ); + } + + // Now, wait for all to finish + let mut ret = vec![]; + while let Some(done) = done_rx.recv().await { + ret.push(done); + if ret.len() == peer_count - 1 { + break; + } + } + + finished_tx + .send(ret) + .expect("Error sending finished signal in handle_peer_connect_successes"); + }; + + drop(tokio::task::spawn(task)); + let ret = finished_rx + .await + .expect("Error receiving finished signal in handle_peer_connect_successes"); + + assert_eq!(ret.len(), peer_count - 1); + ret + } + + fn handle_peer_connect_success( + mut conn: PeerConnectSuccess, + done_tx: UnboundedSender, + implicated_cid: u64, + udp_mode: UdpMode, + checks: F, + ) where + F: Fn(PeerConnectSuccess) -> Fut + Send + Clone + 'static, + Fut: Future + Send, + { + let task = async move { + crate::test_common::p2p_assertions(implicated_cid, &conn).await; + crate::test_common::udp_mode_assertions(udp_mode, conn.udp_channel_rx.take()).await; + let conn = checks(conn).await; + done_tx + .send(conn) + .expect("Error sending done signal in handle_peer_connect_success"); + }; + + drop(tokio::task::spawn(task)); + } } diff --git a/citadel_sdk/src/prefabs/client/single_connection.rs b/citadel_sdk/src/prefabs/client/single_connection.rs index 70701bbd1..d2428e1d4 100644 --- a/citadel_sdk/src/prefabs/client/single_connection.rs +++ b/citadel_sdk/src/prefabs/client/single_connection.rs @@ -41,7 +41,7 @@ pub(crate) enum ConnectionType { username: String, password: SecBuffer, }, - Passwordless { + Transient { uuid: Uuid, server_addr: SocketAddr, }, @@ -78,11 +78,11 @@ where username, password, .. } => ConnectionType::Connect { username, password }, - ServerConnectionSettings::NoCredentials { + ServerConnectionSettings::Transient { server_addr: address, uuid, .. - } => ConnectionType::Passwordless { + } => ConnectionType::Transient { uuid, server_addr: address, }, @@ -174,8 +174,8 @@ where AuthenticationRequest::credentialed(username, password) } - ConnectionType::Passwordless { uuid, server_addr } => { - AuthenticationRequest::passwordless(uuid, server_addr) + ConnectionType::Transient { uuid, server_addr } => { + AuthenticationRequest::transient(uuid, server_addr) } }; @@ -289,7 +289,8 @@ mod tests { #[citadel_io::tokio::test(flavor = "multi_thread")] async fn test_single_connection_registered( #[values(UdpMode::Enabled, UdpMode::Disabled)] udp_mode: UdpMode, - #[values(ServerUnderlyingProtocol::new_quic_self_signed(), ServerUnderlyingProtocol::new_tls_self_signed().unwrap())] + #[values(ServerUnderlyingProtocol::new_quic_self_signed(), ServerUnderlyingProtocol::new_tls_self_signed().unwrap() + )] underlying_protocol: ServerUnderlyingProtocol, ) { citadel_logging::setup_log(); @@ -350,7 +351,7 @@ mod tests { #[case(UdpMode::Enabled, Some("test-password"))] #[timeout(std::time::Duration::from_secs(90))] #[citadel_io::tokio::test(flavor = "multi_thread")] - async fn test_single_connection_passwordless( + async fn test_single_connection_transient( #[case] udp_mode: UdpMode, #[case] server_password: Option<&'static str>, ) { @@ -373,7 +374,7 @@ mod tests { let uuid = Uuid::new_v4(); let mut server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .with_udp_mode(udp_mode); if let Some(server_password) = server_password { @@ -410,7 +411,7 @@ mod tests { #[case(UdpMode::Enabled, Some("test-password"))] #[timeout(std::time::Duration::from_secs(90))] #[tokio::test(flavor = "multi_thread")] - async fn test_single_connection_passwordless_wrong_password( + async fn test_single_connection_transient_wrong_password( #[case] udp_mode: UdpMode, #[case] server_password: Option<&'static str>, ) { @@ -429,7 +430,7 @@ mod tests { let uuid = Uuid::new_v4(); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .with_udp_mode(udp_mode) .with_session_password("wrong-password") .build() @@ -461,7 +462,7 @@ mod tests { #[case(UdpMode::Disabled)] #[timeout(std::time::Duration::from_secs(90))] #[citadel_io::tokio::test(flavor = "multi_thread")] - async fn test_single_connection_passwordless_deregister(#[case] udp_mode: UdpMode) { + async fn test_single_connection_transient_deregister(#[case] udp_mode: UdpMode) { citadel_logging::setup_log(); TestBarrier::setup(2); @@ -478,7 +479,7 @@ mod tests { let uuid = Uuid::new_v4(); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .with_udp_mode(udp_mode) .build() .unwrap(); @@ -527,7 +528,7 @@ mod tests { let uuid = Uuid::new_v4(); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .with_udp_mode(udp_mode) .build() .unwrap(); @@ -605,7 +606,7 @@ mod tests { let uuid = Uuid::new_v4(); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .with_udp_mode(udp_mode) .build() .unwrap(); diff --git a/citadel_sdk/src/prefabs/mod.rs b/citadel_sdk/src/prefabs/mod.rs index 39cfe120a..8bb285190 100644 --- a/citadel_sdk/src/prefabs/mod.rs +++ b/citadel_sdk/src/prefabs/mod.rs @@ -71,7 +71,7 @@ impl TargetLockedRemote for ClientServerRemote { fn remote(&self) -> &NodeRemote { &self.inner } - fn target_username(&self) -> Option<&String> { + fn target_username(&self) -> Option<&str> { None } fn user_mut(&mut self) -> &mut VirtualTargetType { diff --git a/citadel_sdk/src/prefabs/server/accept_file_transfer_kernel.rs b/citadel_sdk/src/prefabs/server/accept_file_transfer_kernel.rs index 2861fb195..02410bfc6 100644 --- a/citadel_sdk/src/prefabs/server/accept_file_transfer_kernel.rs +++ b/citadel_sdk/src/prefabs/server/accept_file_transfer_kernel.rs @@ -1,5 +1,4 @@ use crate::prelude::*; -use futures::StreamExt; #[derive(Default)] pub struct AcceptFileTransferKernel; @@ -16,11 +15,11 @@ impl NetKernel for AcceptFileTransferKernel { async fn on_node_event_received(&self, message: NodeResult) -> Result<(), NetworkError> { if let NodeResult::ObjectTransferHandle(mut handle) = message { - handle + let _ = handle .handle - .accept() + .exhaust_stream() + .await .map_err(|err| NetworkError::Generic(err.into_string()))?; - exhaust_file_transfer(handle.handle); } Ok(()) @@ -30,20 +29,3 @@ impl NetKernel for AcceptFileTransferKernel { Ok(()) } } - -pub fn exhaust_file_transfer(mut handle: ObjectTransferHandler) { - // Exhaust the stream - let handle = citadel_io::tokio::task::spawn(async move { - while let Some(evt) = handle.next().await { - log::info!(target: "citadel", "File Transfer Event: {evt:?}"); - if let ObjectTransferStatus::Fail(err) = &evt { - log::error!(target: "citadel", "File Transfer Failed: {err:?}"); - std::process::exit(1); - } else if let ObjectTransferStatus::TransferComplete = &evt { - break; - } - } - }); - - drop(handle); -} diff --git a/citadel_sdk/src/prefabs/server/internal_service.rs b/citadel_sdk/src/prefabs/server/internal_service.rs index 7f3064dbb..2f644e090 100644 --- a/citadel_sdk/src/prefabs/server/internal_service.rs +++ b/citadel_sdk/src/prefabs/server/internal_service.rs @@ -133,7 +133,7 @@ mod test { }); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_bind_addr, Uuid::new_v4()) + ServerConnectionSettingsBuilder::transient_with_id(server_bind_addr, Uuid::new_v4()) .build() .unwrap(); @@ -214,7 +214,7 @@ mod test { }); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_bind_addr, Uuid::new_v4()) + ServerConnectionSettingsBuilder::transient_with_id(server_bind_addr, Uuid::new_v4()) .build() .unwrap(); @@ -262,7 +262,7 @@ mod test { Ok(()) }, ) - .await + .await }, ); diff --git a/citadel_sdk/src/remote_ext.rs b/citadel_sdk/src/remote_ext.rs index b300dd9b2..d0dde6b76 100644 --- a/citadel_sdk/src/remote_ext.rs +++ b/citadel_sdk/src/remote_ext.rs @@ -41,7 +41,7 @@ pub(crate) mod user_ids { pub trait TargetLockedRemote: Send + Sync { fn user(&self) -> &VirtualTargetType; fn remote(&self) -> &NodeRemote; - fn target_username(&self) -> Option<&String>; + fn target_username(&self) -> Option<&str>; fn user_mut(&mut self) -> &mut VirtualTargetType; fn session_security_settings(&self) -> Option<&SessionSecuritySettings>; } @@ -53,8 +53,8 @@ pub(crate) mod user_ids { fn remote(&self) -> &NodeRemote { self.remote } - fn target_username(&self) -> Option<&String> { - self.target_username.as_ref() + fn target_username(&self) -> Option<&str> { + self.target_username.as_deref() } fn user_mut(&mut self) -> &mut VirtualTargetType { &mut self.user @@ -72,8 +72,8 @@ pub(crate) mod user_ids { fn remote(&self) -> &NodeRemote { &self.remote } - fn target_username(&self) -> Option<&String> { - self.target_username.as_ref() + fn target_username(&self) -> Option<&str> { + self.target_username.as_deref() } fn user_mut(&mut self) -> &mut VirtualTargetType { &mut self.user @@ -555,22 +555,10 @@ pub trait ProtocolRemoteTargetExt: TargetLockedRemote { while let Some(event) = stream.next().await { match map_errors(event)? { NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => { - while let Some(res) = handle.next().await { - log::trace!(target: "citadel", "Client received RES {res:?}"); - match res { - ObjectTransferStatus::TransferComplete => { - return Ok(()); - } - - ObjectTransferStatus::Fail(err) => { - return Err(NetworkError::Generic(format!( - "File transfer failed: {err:?}" - ))); - } - - _ => {} - } - } + return handle + .transfer_file() + .await + .map_err(|err| NetworkError::Generic(err.into_string())); } NodeResult::PeerEvent(PeerEvent { @@ -656,28 +644,10 @@ pub trait ProtocolRemoteTargetExt: TargetLockedRemote { while let Some(event) = stream.next().await { match map_errors(event)? { NodeResult::ObjectTransferHandle(ObjectTransferHandle { mut handle, .. }) => { - let mut local_path = None; - while let Some(res) = handle.next().await { - match res { - ObjectTransferStatus::ReceptionBeginning(path, _) => { - local_path = Some(path) - } - ObjectTransferStatus::TransferComplete => { - break; - } - - ObjectTransferStatus::Fail(err) => { - return Err(NetworkError::Generic(format!( - "File download failed: {err:?}" - ))); - } - - _ => {} - } - } - - return local_path - .ok_or(NetworkError::InternalError("Local path never loaded")); + return handle + .receive_file() + .await + .map_err(|err| NetworkError::Generic(err.into_string())); } NodeResult::PeerEvent(PeerEvent { @@ -717,10 +687,10 @@ pub trait ProtocolRemoteTargetExt: TargetLockedRemote { while let Some(event) = stream.next().await { match map_errors(event)? { NodeResult::ReVFS(result) => { - if let Some(error) = result.error_message { - return Err(NetworkError::Generic(error)); + return if let Some(error) = result.error_message { + Err(NetworkError::Generic(error)) } else { - return Ok(()); + Ok(()) } } @@ -765,7 +735,7 @@ pub trait ProtocolRemoteTargetExt: TargetLockedRemote { channel, udp_rx_opt, }) => { - let username = self.target_username().cloned(); + let username = self.target_username().map(ToString::to_string); let remote = PeerRemote { inner: self.remote().clone(), peer: peer_target.as_virtual_connection(), @@ -823,7 +793,7 @@ pub trait ProtocolRemoteTargetExt: TargetLockedRemote { .get_username_by_cid(implicated_cid) .await? .ok_or_else(|| NetworkError::msg("Unable to find username for local user"))?; - let peer_username_opt = self.target_username().cloned(); + let peer_username_opt = self.target_username().map(ToString::to_string); let mut stream = self .remote() @@ -1137,14 +1107,13 @@ pub trait ProtocolRemoteTargetExt: TargetLockedRemote { // where the username was provided, but the cid was 0 (unknown). let peer_username = self .target_username() - .ok_or_else(|| NetworkError::msg("target_cid=0, yet, no username was provided"))? - .clone(); + .ok_or_else(|| NetworkError::msg("target_cid=0, yet, no username was provided"))?; let implicated_cid = self.user().get_implicated_cid(); let expected_peer_cid = self .remote() .account_manager() .get_persistence_handler() - .get_cid_by_username(&peer_username); + .get_cid_by_username(peer_username); // get the peer cid from the account manager (implying the peers are already registered). // fallback to the mapped cid if the peer is not registered let peer_cid = self @@ -1253,8 +1222,8 @@ pub mod remote_specialization { fn remote(&self) -> &NodeRemote { &self.inner } - fn target_username(&self) -> Option<&String> { - self.username.as_ref() + fn target_username(&self) -> Option<&str> { + self.username.as_deref() } fn user_mut(&mut self) -> &mut VirtualTargetType { &mut self.peer @@ -1376,7 +1345,7 @@ mod tests { .unwrap(); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .with_session_security_settings(session_security_settings) .disable_udp() .build() diff --git a/citadel_sdk/tests/stress_tests.rs b/citadel_sdk/tests/stress_tests.rs index 3a9416a61..28b955bf6 100644 --- a/citadel_sdk/tests/stress_tests.rs +++ b/citadel_sdk/tests/stress_tests.rs @@ -243,7 +243,7 @@ mod tests { .unwrap(); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .with_udp_mode(UdpMode::Enabled) .with_session_security_settings(session_security) .build() @@ -316,7 +316,7 @@ mod tests { .unwrap(); let mut connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .with_udp_mode(UdpMode::Enabled) .with_session_security_settings(session_security); @@ -402,7 +402,7 @@ mod tests { let peer1_connection = peer1_agg.add(); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid0) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid0) .with_udp_mode(UdpMode::Enabled) .with_session_security_settings(session_security) .build() @@ -425,7 +425,7 @@ mod tests { ); let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid1) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid1) .with_udp_mode(UdpMode::Enabled) .with_session_security_settings(session_security) .build() @@ -506,7 +506,7 @@ mod tests { }; let server_connection_settings = - ServerConnectionSettingsBuilder::no_credentials(server_addr, uuid) + ServerConnectionSettingsBuilder::transient_with_id(server_addr, uuid) .build() .unwrap(); diff --git a/citadel_user/Cargo.toml b/citadel_user/Cargo.toml index abc7a156c..8cf6074c9 100644 --- a/citadel_user/Cargo.toml +++ b/citadel_user/Cargo.toml @@ -47,7 +47,7 @@ serde_json = { workspace = true, features = ["alloc"] } bytes = { workspace = true } bstr = { workspace = true, features = ["alloc", "unicode"] } sqlx = { workspace = true, features = ["all-databases", "runtime-tokio-native-tls"], optional = true } -redis-base = { package = "redis", workspace = true, features = ["tokio-comp", "tokio-native-tls-comp"], optional=true } +redis-base = { workspace = true, features = ["tokio-comp", "tokio-native-tls-comp"], optional=true } mobc = { workspace = true, optional = true, features = ["tokio"] } firebase-rtdb = { workspace = true, optional = true } jwt = { workspace = true, features = ["openssl"], optional = true } diff --git a/citadel_user/src/backend/utils/mod.rs b/citadel_user/src/backend/utils/mod.rs index 2574507ec..1a51184c7 100644 --- a/citadel_user/src/backend/utils/mod.rs +++ b/citadel_user/src/backend/utils/mod.rs @@ -1,5 +1,6 @@ use futures::Stream; use std::ops::{Deref, DerefMut}; +use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; @@ -100,6 +101,70 @@ impl ObjectTransferHandler { (this, tx) } + /// Exhausts the steam, independently of the orientation + /// If the orientation is Sender, this will return no path + /// If the orientation is Receiver, this will return the path of the received file + pub async fn exhaust_stream(&mut self) -> Result, AccountError> { + self.accept()?; + + let mut save_path = None; + while let Some(event) = self.inner.inner.recv().await { + match event { + ObjectTransferStatus::ReceptionBeginning(path, _) => { + save_path = Some(path); + } + ObjectTransferStatus::ReceptionComplete => { + return Ok(save_path); + } + + ObjectTransferStatus::TransferComplete => { + return Ok(None); + } + + ObjectTransferStatus::Fail(err) => { + return Err(AccountError::msg(err)); + } + + _ => {} + } + } + + Err(AccountError::msg("Failed to receive file: stream ended")) + } + + /// Receives the file, exhausting the underlying stream and returning the save path + /// after completion. + /// + /// If the orientation is Sender, this will return an error + pub async fn receive_file(&mut self) -> Result { + if !matches!(self.orientation, ObjectTransferOrientation::Receiver { .. }) { + return Err(AccountError::msg( + "Cannot receive file: orientation is not Receiver", + )); + } + + let file = self.exhaust_stream().await?; + file.ok_or_else(|| AccountError::msg("Failed to receive file: no file path")) + } + + /// Transfers the file, exhausting the underlying stream + /// + /// If the orientation is Receiver, this will return an error + pub async fn transfer_file(&mut self) -> Result<(), AccountError> { + if !matches!(self.orientation, ObjectTransferOrientation::Sender { .. }) { + return Err(AccountError::msg( + "Cannot transfer file: orientation is not Sender", + )); + } + + let file = self.exhaust_stream().await?; + if file.is_some() { + Err(AccountError::msg("An unexpected error occurred: file transfer occurred, yet, returned a save path. Please report to developers")) + } else { + Ok(()) + } + } + /// When the local handle type is for a Receiver, /// the receiver must accept the transfer before /// receiving the data diff --git a/citadel_user/src/external_services/google_auth.rs b/citadel_user/src/external_services/google_auth.rs index 7eb684ba6..7b6679496 100644 --- a/citadel_user/src/external_services/google_auth.rs +++ b/citadel_user/src/external_services/google_auth.rs @@ -21,7 +21,7 @@ impl GoogleAuth { pub async fn load_from_google_services_file>( path: P, ) -> Result { - let string = tokio::fs::read_to_string(path) + let string = citadel_io::tokio::fs::read_to_string(path) .await .map_err(|err| AccountError::Generic(err.to_string()))?; let mut map: HashMap = serde_json::from_str(string.as_str()) diff --git a/citadel_user/tests/crypto.rs b/citadel_user/tests/crypto.rs index 6d975ae95..bb08bfb6a 100644 --- a/citadel_user/tests/crypto.rs +++ b/citadel_user/tests/crypto.rs @@ -8,7 +8,7 @@ mod tests { use citadel_pqcrypto::constructor_opts::ConstructorOpts; use std::collections::HashMap; - #[tokio::test] + #[citadel_io::tokio::test] async fn jwt() { citadel_logging::setup_log(); const USER: u64 = 999; @@ -17,8 +17,8 @@ mod tests { citadel_user::external_services::google_auth::GoogleAuth::load_from_google_services_file( "/Users/nologik/googlesvc.json", ) - .await - .unwrap(); + .await + .unwrap(); let jwt = auth.sign_new_custom_jwt_auth(USER).unwrap(); log::trace!(target: "citadel", "JWT: {}", jwt); diff --git a/example-library/Cargo.toml b/example-library/Cargo.toml new file mode 100644 index 000000000..b2685befd --- /dev/null +++ b/example-library/Cargo.toml @@ -0,0 +1,61 @@ +[package] +name = "citadel-examples" +description = "Example library for educational purposes" +authors = ["Thomas Braun "] +edition = "2021" +workspace = ".." +homepage = "https://avarok.net/" +repository = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol" +readme = "../README.md" +categories = ["cryptography", "network-programming", "asynchronous"] +license = "MIT OR Apache-2.0" +publish = false + +[[example]] +name = "server_echo" +path = "examples/c2s/server_echo.rs" + +[[example]] +name = "client_echo" +path = "examples/c2s/client_echo.rs" + +[[example]] +name = "server_basic" +path = "examples/c2s/server_basic.rs" + +[[example]] +name = "server_basic_with_password" +path = "examples/c2s/server_basic_with_password.rs" + +[[example]] +name = "client_basic_with_password" +path = "examples/c2s/client_basic_with_server_password.rs" + +[[example]] +name = "client_basic_transient_connection" +path = "examples/c2s/client_basic_transient_connection.rs" + +[[example]] +name = "p2p_chat" +path = "examples/p2p/chat.rs" + +[[example]] +name = "file_transfer" +path = "examples/p2p/file_transfer.rs" + +[[example]] +name = "p2p_refvs_read_write" +path = "examples/p2p/revfs_read_write.rs" + +[[example]] +name = "p2p_refvs_take" +path = "examples/p2p/revfs_take.rs" + +[[example]] +name = "p2p_refvs_delete" +path = "examples/p2p/revfs_delete.rs" + +[dependencies] +citadel_sdk = { path = "../citadel_sdk" } +tokio = { version = "1", features = ["full"] } +futures = "0.3" \ No newline at end of file diff --git a/example-library/README.md b/example-library/README.md new file mode 100644 index 000000000..4fc868c92 --- /dev/null +++ b/example-library/README.md @@ -0,0 +1,103 @@ +# Citadel Protocol Examples + +This directory contains examples demonstrating how to use the Citadel Protocol for various networking scenarios, including client-server (C2S) and peer-to-peer (P2P) communication patterns. + +## Important First Step: Running a Server + +**Before running any example, you must have a Citadel server running!** + +1. First, set the server address: +```bash +export CITADEL_SERVER_ADDR="127.0.0.1:25000" +``` + +2. Start a basic server: +```bash +cargo run --example server_basic +``` + +3. Keep this server running while you try other examples in separate terminals. + +## Prerequisites + +Additional environment variables needed for specific examples: + +```bash +# For P2P examples +export CITADEL_MY_USER="user1" # Your username +export CITADEL_OTHER_USER="user2" # Peer's username you want to connect to +``` + +## Available Examples + +### Client-Server (C2S) Examples + +1. **Basic Client Examples** + - `client_basic_transient_connection.rs`: Demonstrates temporary connections without persistent user accounts + - `client_basic_with_server_password.rs`: Shows how to connect to password-protected servers + - `client_echo.rs`: A simple echo client demonstrating basic message exchange using a credentialed, persistent account + +2. **Server Examples** + - `server_basic.rs`: A basic Citadel server implementation + - `server_basic_with_password.rs`: Server with password protection enabled + - `server_echo.rs`: Echo server implementation responding to client messages + +### Peer-to-Peer (P2P) Examples + +1. **Chat Application** + - `chat.rs`: Interactive P2P chat application using standard input/output + +2. **File Operations** + - `file_transfer.rs`: Secure file transfer between peers + +3. **Remote Encrypted Virtual Filesystem (RE-VFS)** + - `revfs_read_write.rs`: Basic read/write operations using RE-VFS + - `revfs_delete.rs`: File deletion operations + - `revfs_take.rs`: Takes a file from the RE-VFS, deleting it from the RE-VFS in the process + +## Running the Examples + +### For Client-Server Examples: + +1. Make sure you have a server running (see "Important First Step" above) + +2. Then run a client: +```bash +# Basic client +cargo run --example client_basic_transient_connection + +# Or with server password +cargo run --example client_basic_with_server_password +``` + +### For P2P Examples: + +1. Make sure you have a server running (see "Important First Step" above) + +2. For the chat example, run two instances: +```bash +# First peer +export CITADEL_MY_USER="user1" +export CITADEL_OTHER_USER="user2" +cargo run --example chat + +# Second peer (in another terminal) +export CITADEL_MY_USER="user2" +export CITADEL_OTHER_USER="user1" +cargo run --example chat +``` + +3. For file transfer: +```bash +# Sender +export CITADEL_MY_USER="sender" +export CITADEL_OTHER_USER="receiver" +cargo run --example file_transfer + +# Receiver (in another terminal) +export CITADEL_MY_USER="receiver" +export CITADEL_OTHER_USER="sender" +cargo run --example file_transfer +``` + +Each example contains detailed documentation at the top of its source file explaining specific usage and features. For more detailed information about each example, see the [examples/README.md](examples/README.md) file. diff --git a/example-library/examples/c2s/client_basic_transient_connection.rs b/example-library/examples/c2s/client_basic_transient_connection.rs new file mode 100644 index 000000000..89fa36b7e --- /dev/null +++ b/example-library/examples/c2s/client_basic_transient_connection.rs @@ -0,0 +1,75 @@ +//! # Basic Transient Connection Example +//! +//! This example demonstrates how to create a temporary, non-persistent connection +//! to a Citadel server. A transient connection exists only for the duration of +//! the session and does not maintain any state between connections. +//! +//! ## Features Demonstrated +//! - Transient (temporary) connections +//! - Basic client-server communication +//! - Session-only state management +//! - Connection cleanup handling +//! +//! ## Usage +//! ```bash +//! export CITADEL_SERVER_ADDR="127.0.0.1:25000" +//! cargo run --example client_basic_transient_connection +//! ``` +//! +//! ## How it Works +//! 1. Client establishes temporary connection to server +//! 2. No persistent account or credentials are created +//! 3. Connection remains active for the session duration +//! 4. All state is cleared when connection closes +//! +//! ## Use Cases +//! Transient connections are ideal for: +//! - Temporary or one-time connections +//! - Applications without need for persistence +//! - Testing and development +//! - Scenarios where state persistence isn't required +//! +//! ## Security Note +//! While transient connections don't maintain persistent identity, the server +//! can still enforce security through client certificates at the transport +//! layer to ensure only authorized clients can connect. + +use citadel_sdk::{ + prefabs::client::single_connection::SingleClientServerConnectionKernel, prelude::*, +}; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_addr = env::var("CITADEL_SERVER_ADDR").expect("CITADEL_SERVER_ADDR not set"); + println!("Connecting to server at {}", server_addr); + + // Set up session security + let session_security = SessionSecuritySettingsBuilder::default() + .with_secrecy_mode(SecrecyMode::Perfect) + .with_crypto_params(KemAlgorithm::Kyber + EncryptionAlgorithm::AES_GCM_256) + .build()?; + + // Create server connection settings. If a custom transient ID is required, use `transient_with_id` over `transient`. + let server_connection_settings = ServerConnectionSettingsBuilder::transient(server_addr) + .with_session_security_settings(session_security) + .disable_udp() + .build()?; + + // Create client kernel + let kernel = SingleClientServerConnectionKernel::new( + server_connection_settings, + |connect_success, remote| async move { + println!("Connected to server! CID: {}", connect_success.cid); + remote.shutdown_kernel().await + }, + ); + + // Build the node + let client = NodeBuilder::default().build(kernel)?; + + // Run the node + client.await?; + + Ok(()) +} diff --git a/example-library/examples/c2s/client_basic_with_server_password.rs b/example-library/examples/c2s/client_basic_with_server_password.rs new file mode 100644 index 000000000..ab7154fcf --- /dev/null +++ b/example-library/examples/c2s/client_basic_with_server_password.rs @@ -0,0 +1,85 @@ +//! # Password-Protected Server Connection Example +//! +//! This example demonstrates how to connect to a Citadel server that requires +//! a password for authentication. It shows proper password handling and secure +//! connection establishment. +//! +//! ## Features Demonstrated +//! - Server password authentication +//! - Perfect Forward Secrecy mode +//! - Secure password handling +//! - Connection establishment with protected server +//! - Error handling for authentication failures +//! +//! ## Prerequisites +//! - A running Citadel server with password protection enabled +//! (use `server_basic_with_password.rs`) +//! +//! ## Usage +//! ```bash +//! export CITADEL_SERVER_ADDR="127.0.0.1:25000" +//! export CITADEL_SERVER_PASSWORD="your_secure_password" +//! cargo run --example client_basic_with_server_password +//! ``` +//! +//! ## Security Notes +//! - Never hardcode passwords in production code +//! - Avoid storing passwords in environment variables in production +//! - Use secure password management systems for production deployments +//! - The server password is different from user account credentials +//! +//! ## How it Works +//! 1. Client reads server password from environment +//! 2. Establishes encrypted connection to server +//! 3. Performs password-based authentication +//! 4. Maintains secure session after authentication + +use citadel_sdk::{ + prefabs::client::single_connection::SingleClientServerConnectionKernel, prelude::*, +}; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_addr = env::var("CITADEL_SERVER_ADDR").expect("CITADEL_SERVER_ADDR not set"); + // Security note: Do not hardcode password in production into the environment. It exposes your application + // to local attacks resultant from rogue processes or users that scan the env for secrets. + let connect_password = + env::var("CITADEL_SERVER_PASSWORD").expect("CITADEL_SERVER_PASSWORD not set"); + println!("Connecting to server at {}", server_addr); + + // Set up session security + let session_security = SessionSecuritySettingsBuilder::default() + .with_secrecy_mode(SecrecyMode::Perfect) + .with_crypto_params(KemAlgorithm::Kyber + EncryptionAlgorithm::AES_GCM_256) + .build()?; + + // Create server connection settings + let server_connection_settings = ServerConnectionSettingsBuilder::credentialed_registration( + server_addr, + "my_username", + "My Name", + "notsecurepassword", + ) + .with_session_security_settings(session_security) + .with_session_password(connect_password) + .disable_udp() + .build()?; + + // Create client kernel + let kernel = SingleClientServerConnectionKernel::new( + server_connection_settings, + |connect_success, remote| async move { + println!("Connected to server! CID: {}", connect_success.cid); + remote.shutdown_kernel().await + }, + ); + + // Build the node + let client = NodeBuilder::default().build(kernel)?; + + // Run the node + client.await?; + + Ok(()) +} diff --git a/example-library/examples/c2s/client_echo.rs b/example-library/examples/c2s/client_echo.rs new file mode 100644 index 000000000..d4ca2d5e0 --- /dev/null +++ b/example-library/examples/c2s/client_echo.rs @@ -0,0 +1,96 @@ +//! # Echo Client Example with Persistent Account +//! +//! This example demonstrates how to create a client that establishes a persistent, +//! credentialed connection with a Citadel server and exchanges messages. Unlike the +//! transient connection example, this client creates a persistent account that can +//! be reused across sessions. +//! +//! ## Features Demonstrated +//! - Credentialed registration with persistent account +//! - Perfect Forward Secrecy mode for enhanced security +//! - Bidirectional message exchange with server +//! - Channel-based communication +//! - Error handling and connection management +//! +//! ## Prerequisites +//! - A running Citadel server (use `server_echo.rs` for full functionality) +//! - Server must be configured to accept credentialed connections +//! +//! ## Usage +//! ```bash +//! export CITADEL_SERVER_ADDR="127.0.0.1:25000" +//! cargo run --example client_echo +//! ``` +//! +//! ## How it Works +//! 1. Establishes a credentialed connection with username/password +//! 2. Creates a secure channel with the server +//! 3. Sends a message and waits for the server's echo response +//! 4. Demonstrates proper connection cleanup on exit + +use citadel_sdk::{ + prefabs::client::single_connection::SingleClientServerConnectionKernel, prelude::*, +}; +use futures::StreamExt; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_addr = env::var("CITADEL_SERVER_ADDR").expect("CITADEL_SERVER_ADDR not set"); + println!("Connecting to server at {}", server_addr); + + // Set up session security + let session_security = SessionSecuritySettingsBuilder::default() + .with_secrecy_mode(SecrecyMode::Perfect) + .with_crypto_params(KemAlgorithm::Kyber + EncryptionAlgorithm::AES_GCM_256) + .build()?; + + // Create server connection settings + let server_connection_settings = ServerConnectionSettingsBuilder::credentialed_registration( + server_addr, + "my_username", + "My Name", + "notsecurepassword", + ) + .with_session_security_settings(session_security) + .disable_udp() + .build()?; + + // Create client kernel + let kernel = SingleClientServerConnectionKernel::new( + server_connection_settings, + |connect_success, remote| async move { + println!("Connected to server! CID: {}", connect_success.cid); + let (tx, mut rx) = connect_success.channel.split(); + + let message = "Hello from client!"; + // Send initial message + let msg = SecBuffer::from(message); + if let Err(e) = tx.send_message(msg.into()).await { + println!("Error sending message: {}", e); + return Err(e); + } + + // Receive messages using Stream trait + if let Some(echo) = rx.next().await { + let response = String::from_utf8(echo.as_ref().to_vec()) + .expect("Failed to convert message to string"); + println!("Received echo from server: {response}",); + assert_eq!(&response, message); + } else { + println!("No message received from server"); + return Err(NetworkError::msg("No message received from server")); + } + + remote.shutdown_kernel().await + }, + ); + + // Build the node + let client = NodeBuilder::default().build(kernel)?; + + // Run the node + client.await?; + + Ok(()) +} diff --git a/example-library/examples/c2s/server_basic.rs b/example-library/examples/c2s/server_basic.rs new file mode 100644 index 000000000..14b751dd4 --- /dev/null +++ b/example-library/examples/c2s/server_basic.rs @@ -0,0 +1,49 @@ +//! # Basic Citadel Server Example +//! +//! This example demonstrates how to create a basic Citadel server that facilitates +//! peer-to-peer and group connections. This is the simplest form of a Citadel server, +//! which acts primarily as a connection broker. +//! +//! ## Features Demonstrated +//! - Basic server setup +//! - Connection brokering for P2P and group connections +//! - Server configuration using environment variables +//! - Use of the EmptyKernel for simple connection handling +//! +//! ## Usage +//! ```bash +//! export CITADEL_SERVER_ADDR="127.0.0.1:25000" +//! cargo run --example server_basic +//! ``` +//! +//! ## Note +//! This server only facilitates connections between peers. It does not handle direct +//! client-server communication. For bidirectional client-server communication, +//! see the `server_echo.rs` example which uses a `ClientConnectListenerKernel`. + +use citadel_sdk::prefabs::server; +use citadel_sdk::prelude::*; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_addr = env::var("CITADEL_SERVER_ADDR").expect("CITADEL_SERVER_ADDR not set"); + println!("Starting server on {}", server_addr); + + // This is a basic server. It will only help facilitate p2p and group connections. + // Clients will not be able to communicate directly with this server through channels. + // If post-connection client-server bidirectional communication is needed, use a + // `ClientConnectListenerKernel` instead which runs a closure each time a new connection is + // established with a client + let kernel = server::empty::EmptyKernel; + + // Build the server + let node = NodeBuilder::default() + .with_node_type(NodeType::server(server_addr)?) + .build(kernel)?; + + // Run the server + node.await?; + + Ok(()) +} diff --git a/example-library/examples/c2s/server_basic_with_password.rs b/example-library/examples/c2s/server_basic_with_password.rs new file mode 100644 index 000000000..42cca905c --- /dev/null +++ b/example-library/examples/c2s/server_basic_with_password.rs @@ -0,0 +1,70 @@ +//! # Password-Protected Citadel Server Example +//! +//! This example demonstrates how to create a Citadel server that requires password +//! authentication from clients. It provides an additional layer of security by +//! requiring clients to provide a valid password before establishing a connection. +//! +//! ## Features Demonstrated +//! - Server password protection +//! - Basic connection brokering +//! - Server configuration with environment variables +//! - Secure password handling +//! - Use of EmptyKernel for connection handling +//! +//! ## Usage +//! ```bash +//! export CITADEL_SERVER_ADDR="127.0.0.1:25000" +//! export CITADEL_SERVER_PASSWORD="your_secure_password" +//! cargo run --example server_basic_with_password +//! ``` +//! +//! ## Security Notes +//! - Never hardcode passwords in production code +//! - Avoid storing passwords in environment variables in production +//! - Use secure password management systems for production deployments +//! - Server password protects initial connection, different from user accounts +//! +//! ## How it Works +//! 1. Server starts with password protection enabled +//! 2. Clients must provide correct password to connect +//! 3. After authentication, server acts as connection broker +//! 4. Facilitates P2P and group connections between authenticated clients +//! +//! ## Note +//! This server only facilitates connections between peers. It does not handle +//! direct client-server communication. For bidirectional client-server +//! communication, use a `ClientConnectListenerKernel` instead. + +use citadel_sdk::prefabs::server; +use citadel_sdk::prelude::*; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_addr = env::var("CITADEL_SERVER_ADDR").expect("CITADEL_SERVER_ADDR not set"); + // Security note: Do not hardcode password in production into the environment. It exposes your application + // to local attacks resultant from rogue processes or users that scan the env for secrets. + let connect_password = + env::var("CITADEL_SERVER_PASSWORD").expect("CITADEL_SERVER_PASSWORD not set"); + println!("Starting server on {}", server_addr); + + // This is a basic server. It will only help facilitate p2p and group connections. + // Clients will not be able to communicate directly with this server through channels. + // If post-connection client-server bidirectional communication is needed, use a + // `ClientConnectListenerKernel` instead which runs each time a new connection is + // established with a client + let kernel = server::empty::EmptyKernel; + + // Build the server. It is password-protected, meaning that each time + // a client attempts to register or connect, they must provide the password. + // This "password" is effectively a pre-shared key (PSK) + let node = NodeBuilder::default() + .with_node_type(NodeType::server(server_addr)?) + .with_server_password(connect_password) + .build(kernel)?; + + // Run the server + node.await?; + + Ok(()) +} diff --git a/example-library/examples/c2s/server_echo.rs b/example-library/examples/c2s/server_echo.rs new file mode 100644 index 000000000..ab3f069f8 --- /dev/null +++ b/example-library/examples/c2s/server_echo.rs @@ -0,0 +1,81 @@ +//! # Echo Server Example +//! +//! This example demonstrates how to create a Citadel server that actively +//! communicates with clients. Unlike the basic server examples, this server +//! uses a `ClientConnectListenerKernel` to handle bidirectional communication +//! with connected clients. +//! +//! ## Features Demonstrated +//! - Active client-server communication +//! - ClientConnectListenerKernel usage +//! - Channel-based message handling +//! - Connection event handling +//! - Asynchronous message processing +//! +//! ## Usage +//! ```bash +//! export CITADEL_SERVER_ADDR="127.0.0.1:25000" +//! cargo run --example server_echo +//! ``` +//! +//! Then run the corresponding client: +//! ```bash +//! cargo run --example client_echo +//! ``` +//! +//! ## How it Works +//! 1. Server starts and listens for connections +//! 2. For each new client connection: +//! - Creates a new message channel +//! - Listens for incoming messages +//! - Echoes received messages back to the client +//! 3. Handles multiple clients concurrently +//! +//! ## Note +//! This server demonstrates active client-server communication using +//! the `ClientConnectListenerKernel`. It processes each message and +//! sends it back to the client, making it ideal for testing client +//! connectivity and message handling. + +use citadel_sdk::{ + prefabs::server::client_connect_listener::ClientConnectListenerKernel, prelude::*, +}; +use futures::StreamExt; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_addr = env::var("CITADEL_SERVER_ADDR").expect("CITADEL_SERVER_ADDR not set"); + println!("Starting server on {}", server_addr); + + // Set up the server kernel. The provided closure will be called every time a new client connects + let kernel = ClientConnectListenerKernel::new(move |conn_success, _remote| async move { + println!("New client connected! CID: {}", conn_success.cid); + + let (tx, mut rx) = conn_success.channel.split(); + while let Some(msg) = rx.next().await { + println!( + "Received message from client {}: {}", + conn_success.cid, + String::from_utf8_lossy(msg.as_ref()) + ); + + // Echo the message back + if let Err(e) = tx.send_message(msg.into()).await { + println!("Error sending response: {}", e); + } + } + + Ok(()) + }); + + // Build the server + let node = NodeBuilder::default() + .with_node_type(NodeType::server(server_addr)?) + .build(kernel)?; + + // Run the server + node.await?; + + Ok(()) +} diff --git a/example-library/examples/p2p/chat.rs b/example-library/examples/p2p/chat.rs new file mode 100644 index 000000000..dd2b119aa --- /dev/null +++ b/example-library/examples/p2p/chat.rs @@ -0,0 +1,127 @@ +//! # P2P Chat Example +//! +//! This example demonstrates how to implement a peer-to-peer chat application using the Citadel Protocol. +//! It shows how to: +//! - Set up secure P2P connections between two peers +//! - Handle real-time message exchange +//! - Use the Perfect Forward Secrecy mode for enhanced security +//! - Implement interactive input/output for chat functionality +//! +//! ## Usage +//! +//! Run two instances with different user identities: +//! ```bash +//! # First peer +//! export CITADEL_MY_USER="user1" +//! export CITADEL_OTHER_USER="user2" +//! cargo run --example chat +//! +//! # Second peer (in another terminal) +//! export CITADEL_MY_USER="user2" +//! export CITADEL_OTHER_USER="user1" +//! cargo run --example chat +//! ``` +//! +//! ## Features Demonstrated +//! - Secure P2P connection establishment +//! - Perfect Forward Secrecy mode +//! - Async message handling +//! - Interactive terminal I/O +//! - Error handling and connection management + +use citadel_sdk::{ + prefabs::client::peer_connection::{PeerConnectionKernel, PeerConnectionSetupAggregator}, + prelude::*, +}; +use futures::StreamExt; +use std::env; +use tokio::io::AsyncBufReadExt; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_addr = env::var("CITADEL_SERVER_ADDR").expect("CITADEL_SERVER_ADDR not set"); + let my_user = env::var("CITADEL_MY_USER").expect("MY_USER not set"); + let other_user = env::var("CITADEL_OTHER_USER").expect("OTHER_USER not set"); + + println!("Starting P2P chat as peer {my_user}"); + println!("Will connect to peer {other_user}"); + + // Set up session security + let session_security = SessionSecuritySettingsBuilder::default() + .with_secrecy_mode(SecrecyMode::Perfect) + .with_crypto_params(KemAlgorithm::Kyber + EncryptionAlgorithm::AES_GCM_256) + .build()?; + + // Create server connection settings + let server_connection_settings = ServerConnectionSettingsBuilder::credentialed_registration( + server_addr, + my_user, + "Name", + "notsecurepassword", + ) + .with_session_security_settings(session_security) + .disable_udp() + .build()?; + + // Create peer connection setup + let peer_connection = PeerConnectionSetupAggregator::default() + .with_peer_custom(other_user) + .with_session_security_settings(session_security) + .enable_udp() + .add(); + + // Set up the peer connection kernel + let kernel = PeerConnectionKernel::new( + server_connection_settings, + peer_connection, + move |mut connection, remote| async move { + println!("Connected to server successfully!"); + + // Wait for peer connection + let peer_conn = connection.recv().await.unwrap()?; + println!( + "Connected to peer {:?}!", + peer_conn.remote.target_username() + ); + + // Set up message handling + let (tx, mut message_stream) = peer_conn.channel.split(); + let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()).lines(); + + println!("Type your messages (press Enter to send, Ctrl+C to quit):"); + + loop { + tokio::select! { + msg = message_stream.next() => { + if let Some(msg) = msg { + println!("\rReceived: {}", String::from_utf8_lossy(msg.as_ref())); + } + } + line = stdin.next_line() => { + match line { + Ok(Some(msg)) if !msg.is_empty() => { + tx.send_message(msg.into_bytes().into()).await?; + } + Ok(None) => break, // EOF + _ => continue, + } + } + _ = tokio::signal::ctrl_c() => { + println!("\nReceived Ctrl+C, shutting down..."); + break; + } + } + } + + remote.shutdown_kernel().await + }, + ); + + // Build the peer + let node = NodeBuilder::default().build(kernel)?; + + // Run the peer + node.await?; + + Ok(()) +} diff --git a/example-library/examples/p2p/file_transfer.rs b/example-library/examples/p2p/file_transfer.rs new file mode 100644 index 000000000..70c82cf85 --- /dev/null +++ b/example-library/examples/p2p/file_transfer.rs @@ -0,0 +1,113 @@ +//! # P2P File Transfer Example +//! +//! This example demonstrates how to set up a peer connection between two peers and transfer files +//! directly between them. This example uses direct file transfer rather than the RE-VFS system. +//! +//! ## Features Demonstrated +//! - Direct P2P file transfer +//! - Binary data streaming +//! - Progress tracking +//! - Error handling for file operations +//! +//! ## Usage +//! +//! Run two instances - one sender and one receiver: +//! ```bash +//! # Sender +//! export CITADEL_MY_USER="sender" +//! export CITADEL_OTHER_USER="receiver" +//! export IS_SENDER="true" +//! cargo run --example file_transfer +//! +//! # Receiver (in another terminal) +//! export CITADEL_MY_USER="receiver" +//! export CITADEL_OTHER_USER="sender" +//! cargo run --example file_transfer +//! ``` +//! +//! ## Note +//! This example demonstrates basic file transfer. For more advanced file operations, +//! see the RE-VFS examples which provide a virtual filesystem interface. + +use citadel_sdk::{ + prefabs::client::peer_connection::{PeerConnectionKernel, PeerConnectionSetupAggregator}, + prelude::*, +}; +use std::env; +use std::path::PathBuf; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_addr = env::var("CITADEL_SERVER_ADDR").expect("CITADEL_SERVER_ADDR not set"); + let my_user = env::var("CITADEL_MY_USER").expect("MY_USER not set"); + let other_user = env::var("CITADEL_OTHER_USER").expect("OTHER_USER not set"); + let is_sender = env::var("IS_SENDER").unwrap_or_default().to_lowercase() == "true"; + + println!("Starting file transfer as peer {my_user}"); + println!("Will connect to peer {other_user}"); + + // Set up session security + let session_security = SessionSecuritySettingsBuilder::default() + .with_secrecy_mode(SecrecyMode::Perfect) + .with_crypto_params(KemAlgorithm::Kyber + EncryptionAlgorithm::AES_GCM_256) + .build()?; + + // Create server connection settings + let server_connection_settings = ServerConnectionSettingsBuilder::credentialed_registration( + server_addr, + my_user, + "Name", + "notsecurepassword", + ) + .with_session_security_settings(session_security) + .disable_udp() + .build()?; + + // Create peer connection setup + let peer_connection = PeerConnectionSetupAggregator::default() + .with_peer_custom(other_user) + .with_session_security_settings(session_security) + .enable_udp() + .add(); + + let file_path = PathBuf::from("test_file.txt"); + if !file_path.exists() { + tokio::fs::write(&file_path, "Hello, this is a test file!").await?; + } + + // Set up the peer connection kernel + let kernel = PeerConnectionKernel::new( + server_connection_settings, + peer_connection, + move |mut connection, remote| async move { + println!("Connected to server successfully!"); + + // Wait for peer connection + let peer_conn = connection.recv().await.unwrap()?; + let peer_remote = peer_conn.remote; + println!("Connected to peer {:?}!", peer_remote.target_username()); + if is_sender { + peer_remote.send_file(file_path).await?; + } else { + let mut incoming_file_requests = + remote.get_incoming_file_transfer_handle().unwrap(); + let mut file_handle = incoming_file_requests.recv().await.unwrap(); + let downloaded_file = file_handle.receive_file().await?; + // Compare the contents in "file_path" and "downloaded_file" + let file_contents = tokio::fs::read_to_string(file_path).await?; + let downloaded_file_contents = tokio::fs::read_to_string(downloaded_file).await?; + assert_eq!(file_contents, downloaded_file_contents); + } + + remote.shutdown_kernel().await + }, + ); + + // Build the peer + let node = NodeBuilder::default().build(kernel)?; + + // Run the peer + node.await?; + + Ok(()) +} diff --git a/example-library/examples/p2p/revfs_delete.rs b/example-library/examples/p2p/revfs_delete.rs new file mode 100644 index 000000000..925a5c3d0 --- /dev/null +++ b/example-library/examples/p2p/revfs_delete.rs @@ -0,0 +1,131 @@ +//! # RE-VFS Delete Operation Example +//! +//! This example demonstrates how to use the Remote Encrypted Virtual Filesystem (RE-VFS) +//! to store files and then delete them. It shows the proper way to manage file cleanup +//! in the RE-VFS system. +//! +//! ## Features Demonstrated +//! - RE-VFS file storage +//! - Secure file deletion +//! - Permission handling for delete operations +//! - P2P connection establishment +//! - Error handling for file operations +//! +//! ## Usage +//! Run two instances - one sender and one with delete permissions: +//! ```bash +//! # Sender (stores file in RE-VFS) +//! export CITADEL_MY_USER="sender" +//! export CITADEL_OTHER_USER="manager" +//! export IS_SENDER="true" +//! cargo run --example revfs_delete +//! +//! # Manager (deletes file from RE-VFS) +//! export CITADEL_MY_USER="manager" +//! export CITADEL_OTHER_USER="sender" +//! cargo run --example revfs_delete +//! ``` +//! +//! ## How it Works +//! 1. Sender stores a file in the RE-VFS +//! 2. Manager connects with appropriate permissions +//! 3. Manager issues delete command for the file +//! 4. File is permanently removed from RE-VFS +//! +//! ## Note +//! Delete operations are permanent and cannot be undone. Ensure proper +//! permissions and verification before deleting files from the RE-VFS. +//! Unlike the `revfs_take` example, this operation doesn't retrieve the +//! file before deletion. + +use citadel_sdk::{ + prefabs::client::peer_connection::{PeerConnectionKernel, PeerConnectionSetupAggregator}, + prelude::*, +}; +use futures::StreamExt; +use std::env; +use std::path::PathBuf; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_addr = env::var("CITADEL_SERVER_ADDR").expect("CITADEL_SERVER_ADDR not set"); + let my_user = env::var("CITADEL_MY_USER").expect("MY_USER not set"); + let other_user = env::var("CITADEL_OTHER_USER").expect("OTHER_USER not set"); + let is_sender = env::var("IS_SENDER").unwrap_or_default().to_lowercase() == "true"; + + println!("Starting file transfer as peer {my_user}"); + println!("Will connect to peer {other_user}"); + + // Set up session security + let session_security = SessionSecuritySettingsBuilder::default() + .with_secrecy_mode(SecrecyMode::Perfect) + .with_crypto_params(KemAlgorithm::Kyber + EncryptionAlgorithm::AES_GCM_256) + .build()?; + + // Create server connection settings + let server_connection_settings = ServerConnectionSettingsBuilder::credentialed_registration( + server_addr, + my_user, + "Name", + "notsecurepassword", + ) + .with_session_security_settings(session_security) + .disable_udp() + .build()?; + + // Create peer connection setup + let peer_connection = PeerConnectionSetupAggregator::default() + .with_peer_custom(other_user) + .with_session_security_settings(session_security) + .enable_udp() + .add(); + + let file_path = PathBuf::from("test_file.txt"); + if !file_path.exists() { + tokio::fs::write(&file_path, "Hello, this is a test file!").await?; + } + + // Set up the peer connection kernel + let kernel = PeerConnectionKernel::new( + server_connection_settings, + peer_connection, + move |mut connection, remote| async move { + println!("Connected to server successfully!"); + + // Wait for peer connection + let peer_conn = connection.recv().await.unwrap()?; + let (tx, mut rx) = peer_conn.channel.split(); + let peer_remote = peer_conn.remote; + println!("Connected to peer {:?}!", peer_remote.target_username()); + + let virtual_file_path = "/home/foo/bar/test_file.txt"; + + if is_sender { + // Securely store the file on the remote peer. The peer cannot read the file contents + citadel_sdk::fs::write(&peer_remote, file_path.clone(), virtual_file_path).await?; + // Now, delete the contents of the file from the remote peer + citadel_sdk::fs::delete(&peer_remote, virtual_file_path).await?; + // Alert the other side that the file has been successfully processed + tx.send_message(SecBuffer::from("success").into()).await?; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } else { + let incoming_file_requests = remote.get_incoming_file_transfer_handle().unwrap(); + // There will be a single file transfer handles sent to us: one for the file the peer sends to us + incoming_file_requests.accept_all(); + // Patiently wait for the "success" message from the sender + let msg = rx.next().await.unwrap(); + assert_eq!(msg.as_ref(), b"success"); + } + + remote.shutdown_kernel().await + }, + ); + + // Build the peer + let node = NodeBuilder::default().build(kernel)?; + + // Run the peer + node.await?; + + Ok(()) +} diff --git a/example-library/examples/p2p/revfs_read_write.rs b/example-library/examples/p2p/revfs_read_write.rs new file mode 100644 index 000000000..67d6380a5 --- /dev/null +++ b/example-library/examples/p2p/revfs_read_write.rs @@ -0,0 +1,136 @@ +//! # RE-VFS Read/Write Example +//! +//! This example demonstrates how to use the Remote Encrypted Virtual Filesystem (RE-VFS) +//! for basic file operations between peers. It shows how to store files in the RE-VFS +//! and retrieve them later, with all data being encrypted end-to-end. +//! +//! ## Features Demonstrated +//! - RE-VFS initialization and setup +//! - Secure file storage operations +//! - File retrieval from RE-VFS +//! - End-to-end encryption +//! - Error handling for file operations +//! +//! ## Usage +//! Run two instances - one sender and one receiver: +//! ```bash +//! # Sender (writes file to RE-VFS) +//! export CITADEL_MY_USER="sender" +//! export CITADEL_OTHER_USER="receiver" +//! export IS_SENDER="true" +//! cargo run --example revfs_read_write +//! +//! # Receiver (reads file from RE-VFS) +//! export CITADEL_MY_USER="receiver" +//! export CITADEL_OTHER_USER="sender" +//! cargo run --example revfs_read_write +//! ``` +//! +//! ## How it Works +//! 1. Sender stores a file in the RE-VFS using write operations +//! 2. File is encrypted and stored in the virtual filesystem +//! 3. Receiver connects and reads the file from RE-VFS +//! 4. File remains in RE-VFS for future access +//! +//! ## Note +//! Unlike the `revfs_take` example, reading a file does not remove it from +//! the RE-VFS. The file remains available for multiple reads by authorized peers. + +use citadel_sdk::{ + prefabs::client::peer_connection::{PeerConnectionKernel, PeerConnectionSetupAggregator}, + prelude::*, +}; +use futures::StreamExt; +use std::env; +use std::path::PathBuf; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_addr = env::var("CITADEL_SERVER_ADDR").expect("CITADEL_SERVER_ADDR not set"); + let my_user = env::var("CITADEL_MY_USER").expect("MY_USER not set"); + let other_user = env::var("CITADEL_OTHER_USER").expect("OTHER_USER not set"); + let is_sender = env::var("IS_SENDER").unwrap_or_default().to_lowercase() == "true"; + + println!("Starting file transfer as peer {my_user}"); + println!("Will connect to peer {other_user}"); + + // Set up session security + let session_security = SessionSecuritySettingsBuilder::default() + .with_secrecy_mode(SecrecyMode::Perfect) + .with_crypto_params(KemAlgorithm::Kyber + EncryptionAlgorithm::AES_GCM_256) + .build()?; + + // Create server connection settings + let server_connection_settings = ServerConnectionSettingsBuilder::credentialed_registration( + server_addr, + my_user, + "Name", + "notsecurepassword", + ) + .with_session_security_settings(session_security) + .disable_udp() + .build()?; + + // Create peer connection setup + let peer_connection = PeerConnectionSetupAggregator::default() + .with_peer_custom(other_user) + .with_session_security_settings(session_security) + .enable_udp() + .add(); + + let file_path = PathBuf::from("test_file.txt"); + if !file_path.exists() { + tokio::fs::write(&file_path, "Hello, this is a test file!").await?; + } + + // Set up the peer connection kernel + let kernel = PeerConnectionKernel::new( + server_connection_settings, + peer_connection, + move |mut connection, remote| async move { + println!("Connected to server successfully!"); + + // Wait for peer connection + let peer_conn = connection.recv().await.unwrap()?; + let (tx, mut rx) = peer_conn.channel.split(); + let peer_remote = peer_conn.remote; + println!("Connected to peer {:?}!", peer_remote.target_username()); + + let virtual_file_path = "/home/foo/bar/test_file.txt"; + + if is_sender { + // Securely store the file on the remote peer. The peer cannot read the file contents + citadel_sdk::fs::write(&peer_remote, file_path.clone(), virtual_file_path).await?; + // Now, download the contents of the file from the remote peer + let locally_downloaded_file = + citadel_sdk::fs::read(&peer_remote, virtual_file_path).await?; + // Compare the contents in "file_path" and "locally_downloaded_file" + let file_contents = tokio::fs::read_to_string(&file_path).await?; + let downloaded_file_contents = + tokio::fs::read_to_string(&locally_downloaded_file).await?; + assert_eq!(file_contents, downloaded_file_contents); + // Alert the other side that the file has been successfully stored + tx.send_message(SecBuffer::from("success").into()).await?; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } else { + let incoming_file_requests = remote.get_incoming_file_transfer_handle().unwrap(); + // There will be two file transfer handles sent to us: one for the file the peer sends to us, + // and another for when the peer requests a file from us. We will accept both. + incoming_file_requests.accept_all(); + // Patiently wait for the "success" message from the sender + let msg = rx.next().await.unwrap(); + assert_eq!(msg.as_ref(), b"success"); + } + + remote.shutdown_kernel().await + }, + ); + + // Build the peer + let node = NodeBuilder::default().build(kernel)?; + + // Run the peer + node.await?; + + Ok(()) +} diff --git a/example-library/examples/p2p/revfs_take.rs b/example-library/examples/p2p/revfs_take.rs new file mode 100644 index 000000000..cfa94a931 --- /dev/null +++ b/example-library/examples/p2p/revfs_take.rs @@ -0,0 +1,140 @@ +//! # RE-VFS Take Operation Example +//! +//! This example demonstrates how to use the Remote Encrypted Virtual Filesystem (RE-VFS) +//! to store and then "take" a file. The take operation removes the file from the RE-VFS +//! while retrieving it, effectively performing an atomic move operation. +//! +//! ## Features Demonstrated +//! - RE-VFS file storage +//! - Atomic take operation (retrieve and delete) +//! - P2P connection establishment +//! - Secure file transfer +//! - Error handling for file operations +//! +//! ## Usage +//! Run two instances - one sender and one receiver: +//! ```bash +//! # Sender (stores file in RE-VFS) +//! export CITADEL_MY_USER="sender" +//! export CITADEL_OTHER_USER="receiver" +//! export IS_SENDER="true" +//! cargo run --example revfs_take +//! +//! # Receiver (takes file from RE-VFS) +//! export CITADEL_MY_USER="receiver" +//! export CITADEL_OTHER_USER="sender" +//! cargo run --example revfs_take +//! ``` +//! +//! ## How it Works +//! 1. Sender stores a file in the RE-VFS +//! 2. Receiver connects and performs a take operation +//! 3. File is atomically moved from RE-VFS to receiver's local storage +//! 4. Original file in RE-VFS is automatically deleted +//! +//! ## Note +//! The take operation is useful when you want to ensure that a file can only be +//! retrieved once, or when you want to automatically clean up the RE-VFS after +//! file retrieval. + +//! This example demonstrates how to set up a peer connection between two peers and use REVFS to store then take a file + +use citadel_sdk::{ + prefabs::client::peer_connection::{PeerConnectionKernel, PeerConnectionSetupAggregator}, + prelude::*, +}; +use futures::StreamExt; +use std::env; +use std::path::PathBuf; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_addr = env::var("CITADEL_SERVER_ADDR").expect("CITADEL_SERVER_ADDR not set"); + let my_user = env::var("CITADEL_MY_USER").expect("MY_USER not set"); + let other_user = env::var("CITADEL_OTHER_USER").expect("OTHER_USER not set"); + let is_sender = env::var("IS_SENDER").unwrap_or_default().to_lowercase() == "true"; + + println!("Starting file transfer as peer {my_user}"); + println!("Will connect to peer {other_user}"); + + // Set up session security + let session_security = SessionSecuritySettingsBuilder::default() + .with_secrecy_mode(SecrecyMode::Perfect) + .with_crypto_params(KemAlgorithm::Kyber + EncryptionAlgorithm::AES_GCM_256) + .build()?; + + // Create server connection settings + let server_connection_settings = ServerConnectionSettingsBuilder::credentialed_registration( + server_addr, + my_user, + "Name", + "notsecurepassword", + ) + .with_session_security_settings(session_security) + .disable_udp() + .build()?; + + // Create peer connection setup + let peer_connection = PeerConnectionSetupAggregator::default() + .with_peer_custom(other_user) + .with_session_security_settings(session_security) + .enable_udp() + .add(); + + let file_path = PathBuf::from("test_file.txt"); + if !file_path.exists() { + tokio::fs::write(&file_path, "Hello, this is a test file!").await?; + } + + // Set up the peer connection kernel + let kernel = PeerConnectionKernel::new( + server_connection_settings, + peer_connection, + move |mut connection, remote| async move { + println!("Connected to server successfully!"); + + // Wait for peer connection + let peer_conn = connection.recv().await.unwrap()?; + let (tx, mut rx) = peer_conn.channel.split(); + let peer_remote = peer_conn.remote; + println!("Connected to peer {:?}!", peer_remote.target_username()); + + let virtual_file_path = "/home/foo/bar/test_file.txt"; + + if is_sender { + // Securely store the file on the remote peer. The peer cannot read the file contents + citadel_sdk::fs::write(&peer_remote, file_path.clone(), virtual_file_path).await?; + // Now, download the contents of the file from the remote peer, deleting the remote + // contents at the same time by taking it + let locally_downloaded_file = + citadel_sdk::fs::take(&peer_remote, virtual_file_path).await?; + // Compare the contents in "file_path" and "locally_downloaded_file" + let file_contents = tokio::fs::read_to_string(&file_path).await?; + let downloaded_file_contents = + tokio::fs::read_to_string(&locally_downloaded_file).await?; + assert_eq!(file_contents, downloaded_file_contents); + // Alert the other side that the file has been successfully processed + tx.send_message(SecBuffer::from("success").into()).await?; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } else { + let incoming_file_requests = remote.get_incoming_file_transfer_handle().unwrap(); + // There will be two file transfer handles sent to us: one for the file the peer sends to us, + // and another for when the peer requests a file from us. We will accept both. + incoming_file_requests.accept_all(); + // Patiently wait for the "success" message from the sender + let msg = rx.next().await.unwrap(); + assert_eq!(msg.as_ref(), b"success"); + } + + remote.shutdown_kernel().await + }, + ); + + // Build the peer + let node = NodeBuilder::default().build(kernel)?; + + // Run the peer + node.await?; + + Ok(()) +} diff --git a/keys/testing.p12 b/keys/testing.p12 deleted file mode 100644 index 18e6d7566..000000000 Binary files a/keys/testing.p12 and /dev/null differ diff --git a/mac_m1_README.txt b/mac_m1_README.txt index d9ff5a646..323bfdf56 100644 --- a/mac_m1_README.txt +++ b/mac_m1_README.txt @@ -1,4 +1,4 @@ -Clang 13 is required to compile oqs + liboqs +Clang 13+ is required to compile oqs + liboqs Install [rust](https://www.rust-lang.org/tools/install). diff --git a/tutorials/argon2id-params.txt b/notes/argon2id-params.txt similarity index 100% rename from tutorials/argon2id-params.txt rename to notes/argon2id-params.txt diff --git a/tutorials/lets-encrypt-to-pkcs12.txt b/notes/lets-encrypt-to-pkcs12.txt similarity index 100% rename from tutorials/lets-encrypt-to-pkcs12.txt rename to notes/lets-encrypt-to-pkcs12.txt