Skip to content

Commit

Permalink
fix: race conditions in simultaneous connect, NAT ID, Hole Punching, …
Browse files Browse the repository at this point in the history
…NetMutex. Fix FileKey Collisions (#221)
  • Loading branch information
tbraun96 authored Nov 24, 2024
1 parent f510cb7 commit 10c517b
Show file tree
Hide file tree
Showing 90 changed files with 1,340 additions and 1,056 deletions.
6 changes: 4 additions & 2 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ retries = { backoff = "exponential", count = 2, delay = "5s" }
# the string "num-cpus". Can be overridden through the `--test-threads` option.
test-threads = 1

slow-timeout = { period = "80s", terminate-after = 3, grace-period = "0s" }

# Show these test statuses in the output.
#
# The possible values this can take are:
Expand All @@ -25,10 +27,10 @@ test-threads = 1
# failed and retried tests.
#
# Can be overridden through the `--status-level` flag.
status-level = "pass"
status-level = "all"

# Similar to status-level, show these test statuses at the end of the run.
final-status-level = "none"
final-status-level = "all"

# "failure-output" defines when standard output and standard error for failing tests are produced.
# Accepted values are
Expand Down
16 changes: 10 additions & 6 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ env:
# 40 MiB stack
RUST_MIN_STACK: 40971520
RUST_LOG: "citadel=warn"
IN_CI: "true"

jobs:
core_libs:
Expand Down Expand Up @@ -95,14 +96,17 @@ jobs:
- uses: Avarok-Cybersecurity/gh-actions-deps@master
- uses: taiki-e/install-action@nextest
- name: Single-threaded testing
run: cargo nextest run --package citadel_sdk --features=localhost-testing,localhost-testing-loopback-only
run: cargo nextest run --package citadel_sdk --features=localhost-testing
if: ${{ !startsWith(matrix.os, 'windows') }}
- name: Single-threaded testing (windows only)
run: cargo nextest run --package citadel_sdk --features=localhost-testing,localhost-testing-loopback-only,vendored
run: cargo nextest run --package citadel_sdk --features=localhost-testing,vendored
if: startsWith(matrix.os, 'windows')
- name: Multi-threaded testing (windows only)
run: cargo nextest run --package citadel_sdk --features=multi-threaded,localhost-testing,vendored
if: startsWith(matrix.os, 'windows')
- name: Multi-threaded testing
if: startsWith(matrix.os, 'ubuntu')
run: cargo nextest run --package citadel_sdk --features=multi-threaded,localhost-testing,localhost-testing-loopback-only
if: ${{ !startsWith(matrix.os, 'windows') }}
run: cargo nextest run --package citadel_sdk --features=multi-threaded,localhost-testing

citadel_sdk_release:
strategy:
Expand All @@ -114,9 +118,9 @@ jobs:
- uses: Avarok-Cybersecurity/gh-actions-deps@master
- uses: taiki-e/install-action@nextest
- name: Single-threaded testing
run: cargo nextest run --package citadel_sdk --features=localhost-testing,localhost-testing-loopback-only --release
run: cargo nextest run --package citadel_sdk --features=localhost-testing --release
- name: Multi-threaded testing
run: cargo nextest run --package citadel_sdk --features=multi-threaded,localhost-testing,localhost-testing-loopback-only --release
run: cargo nextest run --package citadel_sdk --features=multi-threaded,localhost-testing --release

misc_checks:
name: miscellaneous
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ rand = { default-features = false, version = "0.8.5" }
async-stream = { default-features = false, version = "0.3.3" }
sync_wrapper = { default-features = false, version = "1.0.0" }
async-recursion = { version = "1.0.4" }
rstest = { version = "0.18.2" }
bincode2 = { default-features = false, version = "2.0.1" }
rstest = { version = "0.23.0" }
bincode = { default-features = false, version = "1.3.3" }
serde = { version="1.0.152", default-features = false }
futures = { version = "0.3.25", default-features = false }
byteorder = { version = "1.4.3", default-features=false }
Expand All @@ -82,7 +82,7 @@ async-trait-with-sync = { default-features = false, version = "0.1.36" }
uuid = { version = "1.2.2", default-features = false }
tracing = { version = "0.1.37", default-features = false }
lazy_static = { default-features = false, version = "1.4.0" }
socket2 = { version = "0.5.1", default-features = false }
socket2 = { version = "0.5.7", default-features = false }
rustls-native-certs = { version = "0.6.2", default-features = false }
igd = { version = "^0.12.0", default-features = false }
quinn = { version = "0.10.2", default-features = false }
Expand Down
12 changes: 1 addition & 11 deletions async_ip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,7 @@ pub async fn get_ip_from(client: Option<Client>, addr: &str) -> Result<IpAddr, I
.text()
.await
.map_err(|err| IpRetrieveError::Error(err.to_string()))?;
IpAddr::from_str(text.as_str())
.map_err(|err| IpRetrieveError::Error(err.to_string()))
.and_then(|res| {
if res.is_ipv4() {
Err(IpRetrieveError::Error(
"This node does not have an ipv6 addr".to_string(),
))
} else {
Ok(res)
}
})
IpAddr::from_str(text.as_str()).map_err(|err| IpRetrieveError::Error(err.to_string()))
}

/// Gets the internal IP address using DNS
Expand Down
3 changes: 2 additions & 1 deletion citadel_crypt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fcm = []

[dependencies]
async-trait = { workspace = true }
bincode2 = { workspace = true }
bincode = { workspace = true }
serde = { workspace = true, features=["rc", "derive"] }
futures = { workspace = true }
log = { workspace = true }
Expand All @@ -54,6 +54,7 @@ tokio-stream = { workspace = true }
auto_impl = { workspace = true }
zeroize = { workspace = true, features = ["zeroize_derive", "alloc", "serde"] }
citadel_types = { workspace = true }
uuid = { version = "1.8.0", features = ["v4"] }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
rayon = { workspace = true }
Expand Down
13 changes: 7 additions & 6 deletions citadel_crypt/src/endpoint_crypto_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use crate::toolset::{Toolset, UpdateStatus};
use citadel_pqcrypto::constructor_opts::ConstructorOpts;
use citadel_types::crypto::CryptoParameters;
use citadel_types::crypto::SecurityLevel;
use citadel_types::prelude::ObjectId;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use uuid::Uuid;

/// A container that holds the toolset as well as some boolean flags to ensure validity
/// in tight concurrency situations. It is up to the networking protocol to ensure
Expand All @@ -21,7 +23,7 @@ pub struct PeerSessionCrypto<R: Ratchet = StackedRatchet> {
pub update_in_progress: Arc<AtomicBool>,
// if local is initiator, then in the case both nodes send a FastMessage at the same time (causing an update to the keys), the initiator takes preference, and the non-initiator's upgrade attempt gets dropped (if update_in_progress)
pub local_is_initiator: bool,
pub rolling_object_id: u64,
pub rolling_object_id: ObjectId,
pub rolling_group_id: u64,
pub lock_set_by_alice: Option<bool>,
/// Alice sends to Bob, then bob updates internally the toolset. However. Bob can't send packets to Alice quite yet using that newest version. He must first wait from Alice to commit on her end and wait for an ACK.
Expand All @@ -39,7 +41,7 @@ impl<R: Ratchet> PeerSessionCrypto<R> {
toolset,
update_in_progress: Arc::new(AtomicBool::new(false)),
local_is_initiator,
rolling_object_id: 1,
rolling_object_id: ObjectId::random(),
rolling_group_id: 0,
lock_set_by_alice: None,
latest_usable_version: 0,
Expand Down Expand Up @@ -200,9 +202,8 @@ impl<R: Ratchet> PeerSessionCrypto<R> {
self.rolling_group_id.wrapping_sub(1)
}

pub fn get_and_increment_object_id(&mut self) -> u64 {
self.rolling_object_id = self.rolling_object_id.wrapping_add(1);
self.rolling_object_id.wrapping_sub(1)
pub fn get_next_object_id(&mut self) -> ObjectId {
Uuid::new_v4().as_u128().into()
}

/// Returns a new constructor only if a concurrent update isn't occurring
Expand Down Expand Up @@ -230,7 +231,7 @@ impl<R: Ratchet> PeerSessionCrypto<R> {
self.update_in_progress = Arc::new(AtomicBool::new(false));
self.lock_set_by_alice = None;
self.rolling_group_id = 0;
self.rolling_object_id = 0;
self.rolling_object_id = ObjectId::random();
}

/// Gets the parameters used at registrations
Expand Down
4 changes: 2 additions & 2 deletions citadel_crypt/src/entropy_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,12 @@ impl EntropyBank {

/// Serializes self to a vector
pub fn serialize_to_vec(&self) -> Result<Vec<u8>, CryptError<String>> {
bincode2::serialize(self).map_err(|err| CryptError::DrillUpdateError(err.to_string()))
bincode::serialize(self).map_err(|err| CryptError::DrillUpdateError(err.to_string()))
}

/// Deserializes self from a set of bytes
pub fn deserialize_from<T: AsRef<[u8]>>(drill: T) -> Result<Self, CryptError<String>> {
bincode2::deserialize(drill.as_ref())
bincode::deserialize(drill.as_ref())
.map_err(|err| CryptError::DrillUpdateError(err.to_string()))
}
}
Expand Down
1 change: 1 addition & 0 deletions citadel_crypt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod prelude {
pub use crate::entropy_bank::EntropyBank;
pub use crate::misc::CryptError;
pub use crate::packet_vector::PacketVector;
pub use crate::streaming_crypt_scrambler::FixedSizedSource;
pub use crate::toolset::Toolset;
pub use citadel_types::crypto::SecBuffer;
pub use citadel_types::crypto::SecurityLevel;
Expand Down
22 changes: 11 additions & 11 deletions citadel_crypt/src/scramble/crypt_splitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::entropy_bank::EntropyBank;
use crate::packet_vector::{generate_packet_vector, PacketVector};
use crate::prelude::CryptError;
use crate::stacked_ratchet::Ratchet;
pub use citadel_types::prelude::ObjectId;
#[cfg(not(target_family = "wasm"))]
use rayon::prelude::*;

Expand Down Expand Up @@ -62,7 +63,7 @@ pub fn generate_scrambler_metadata<T: AsRef<[u8]>>(
header_size_bytes: usize,
security_level: SecurityLevel,
group_id: u64,
object_id: u64,
object_id: ObjectId,
enx: EncryptionAlgorithm,
sig_alg: SigAlgorithm,
transfer_type: &TransferType,
Expand Down Expand Up @@ -141,7 +142,7 @@ fn get_scramble_encrypt_config<'a, R: Ratchet>(
header_size_bytes: usize,
security_level: SecurityLevel,
group_id: u64,
object_id: u64,
object_id: ObjectId,
transfer_type: &TransferType,
empty_transfer: bool,
) -> Result<
Expand Down Expand Up @@ -190,13 +191,13 @@ pub fn par_scramble_encrypt_group<T: AsRef<[u8]>, R: Ratchet, F, const N: usize>
static_aux_ratchet: &R,
header_size_bytes: usize,
target_cid: u64,
object_id: u64,
object_id: ObjectId,
group_id: u64,
transfer_type: TransferType,
header_inscriber: F,
) -> Result<GroupSenderDevice<N>, CryptError<String>>
where
F: Fn(&PacketVector, &EntropyBank, u64, u64, &mut BytesMut) + Send + Sync,
F: Fn(&PacketVector, &EntropyBank, ObjectId, u64, &mut BytesMut) + Send + Sync,
{
let mut plain_text = Cow::Borrowed(plain_text.as_ref());

Expand All @@ -211,10 +212,9 @@ where
}

if let TransferType::RemoteEncryptedVirtualFilesystem { security_level, .. } = &transfer_type {
log::trace!(target: "citadel", "Detected REVFS. Locally encrypting w/level {security_level:?} | Ratchet used: {} w/version {}", static_aux_ratchet.get_cid(), static_aux_ratchet.version());
log::trace!(target: "citadel", "Detected REVFS. Locally encrypting object {object_id} w/level {security_level:?} | Ratchet used: {} w/version {}", static_aux_ratchet.get_cid(), static_aux_ratchet.version());
// pre-encrypt
let local_encrypted = static_aux_ratchet.local_encrypt(plain_text, *security_level)?;

plain_text = Cow::Owned(local_encrypted);
}

Expand Down Expand Up @@ -303,9 +303,9 @@ fn scramble_encrypt_wave(
msg_pqc: &PostQuantumContainer,
scramble_drill: &EntropyBank,
target_cid: u64,
object_id: u64,
object_id: ObjectId,
header_size_bytes: usize,
header_inscriber: impl Fn(&PacketVector, &EntropyBank, u64, u64, &mut BytesMut) + Send + Sync,
header_inscriber: impl Fn(&PacketVector, &EntropyBank, ObjectId, u64, &mut BytesMut) + Send + Sync,
) -> Vec<(usize, PacketCoordinate)> {
let ciphertext = msg_drill
.encrypt(msg_pqc, bytes_to_encrypt_for_this_wave)
Expand Down Expand Up @@ -336,7 +336,7 @@ pub fn oneshot_unencrypted_group_unified<const N: usize>(
plain_text: SecureMessagePacket<N>,
header_size_bytes: usize,
group_id: u64,
object_id: u64,
object_id: ObjectId,
empty_transfer: bool,
) -> Result<GroupSenderDevice<N>, CryptError<String>> {
let len = plain_text.message_len() as u64;
Expand Down Expand Up @@ -435,7 +435,7 @@ pub struct GroupReceiverConfig {
// this is NOT inscribed; only for transmission
pub header_size_bytes: u64,
pub group_id: u64,
pub object_id: u64,
pub object_id: ObjectId,
// only relevant for files. Note: if transfer type is RemoteVirtualFileystem, then,
// the receiving endpoint won't decrypt the first level of encryption since the goal
// is to keep the file remotely encrypted
Expand All @@ -450,7 +450,7 @@ impl GroupReceiverConfig {
#[allow(clippy::too_many_arguments)]
pub fn new_refresh(
group_id: u64,
object_id: u64,
object_id: ObjectId,
header_size_bytes: u64,
plaintext_length: u64,
max_packet_payload_size: u32,
Expand Down
10 changes: 5 additions & 5 deletions citadel_crypt/src/stacked_ratchet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,23 +586,23 @@ pub mod constructor {

impl BobToAliceTransfer {
pub fn serialize_into(&self, buf: &mut BytesMut) -> Option<()> {
let len = bincode2::serialized_size(self).ok()?;
let len = bincode::serialized_size(self).ok()?;
buf.reserve(len as usize);
bincode2::serialize_into(buf.writer(), self).ok()
bincode::serialize_into(buf.writer(), self).ok()
}

pub fn deserialize_from<T: AsRef<[u8]>>(source: T) -> Option<BobToAliceTransfer> {
bincode2::deserialize(source.as_ref()).ok()
bincode::deserialize(source.as_ref()).ok()
}
}

impl AliceToBobTransfer {
pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
bincode2::serialize(self).ok()
bincode::serialize(self).ok()
}

pub fn deserialize_from(source: &[u8]) -> Option<AliceToBobTransfer> {
bincode2::deserialize(source).ok()
bincode::deserialize(source).ok()
}

/// Gets the declared new version
Expand Down
12 changes: 8 additions & 4 deletions citadel_crypt/src/streaming_crypt_scrambler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::stacked_ratchet::StackedRatchet;
use citadel_io::Mutex;
use citadel_io::{BlockingSpawn, BlockingSpawnError};
use citadel_types::crypto::SecurityLevel;
use citadel_types::prelude::ObjectId;
use citadel_types::proto::TransferType;
use futures::Future;
use num_integer::Integer;
Expand All @@ -41,11 +42,14 @@ impl FixedSizedSource for std::fs::File {

/// Generic function for inscribing headers on packets
pub trait HeaderInscriberFn:
for<'a> Fn(&'a PacketVector, &'a EntropyBank, u64, u64, &'a mut BytesMut) + Send + Sync + 'static
for<'a> Fn(&'a PacketVector, &'a EntropyBank, ObjectId, u64, &'a mut BytesMut)
+ Send
+ Sync
+ 'static
{
}
impl<
T: for<'a> Fn(&'a PacketVector, &'a EntropyBank, u64, u64, &'a mut BytesMut)
T: for<'a> Fn(&'a PacketVector, &'a EntropyBank, ObjectId, u64, &'a mut BytesMut)
+ Send
+ Sync
+ 'static,
Expand Down Expand Up @@ -162,7 +166,7 @@ impl<T: Into<Vec<u8>>> From<T> for BytesSource {
pub fn scramble_encrypt_source<S: ObjectSource, F: HeaderInscriberFn, const N: usize>(
mut source: S,
max_group_size: Option<usize>,
object_id: u64,
object_id: ObjectId,
group_sender: GroupChanneler<Result<GroupSenderDevice<N>, CryptError>>,
stop: Receiver<()>,
security_level: SecurityLevel,
Expand Down Expand Up @@ -266,7 +270,7 @@ struct AsyncCryptScrambler<F: HeaderInscriberFn, R: Read, const N: usize> {
transfer_type: TransferType,
file_len: usize,
read_cursor: usize,
object_id: u64,
object_id: ObjectId,
header_size_bytes: usize,
target_cid: u64,
group_id: u64,
Expand Down
4 changes: 2 additions & 2 deletions citadel_crypt/src/toolset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ impl<R: Ratchet> Toolset<R> {

/// Serializes the toolset to a buffer
pub fn serialize_to_vec(&self) -> Result<Vec<u8>, CryptError<String>> {
bincode2::serialize(self).map_err(|err| CryptError::DrillUpdateError(err.to_string()))
bincode::serialize(self).map_err(|err| CryptError::DrillUpdateError(err.to_string()))
}

/// Deserializes from a slice of bytes
pub fn deserialize_from_bytes<T: AsRef<[u8]>>(input: T) -> Result<Self, CryptError<String>> {
bincode2::deserialize(input.as_ref())
bincode::deserialize(input.as_ref())
.map_err(|err| CryptError::DrillUpdateError(err.to_string()))
}

Expand Down
Loading

0 comments on commit 10c517b

Please sign in to comment.