diff --git a/libpcap-analyzer/Cargo.toml b/libpcap-analyzer/Cargo.toml index 3b718b6..f11270a 100644 --- a/libpcap-analyzer/Cargo.toml +++ b/libpcap-analyzer/Cargo.toml @@ -53,6 +53,7 @@ serde_json = "1.0" sha1 = { version="0.10", features=["std"], optional=true } tls-parser = { version="0.12", optional=true } tracing = { version="0.1", features=["log"] } +twox-hash = { version="2.0", default-features=false, features=["xxhash32"] } [dependencies.rusticata] # path = "../../rusticata" diff --git a/libpcap-analyzer/src/threaded_analyzer.rs b/libpcap-analyzer/src/threaded_analyzer.rs index 2256b8d..487457e 100644 --- a/libpcap-analyzer/src/threaded_analyzer.rs +++ b/libpcap-analyzer/src/threaded_analyzer.rs @@ -1,7 +1,6 @@ use crate::analyzer::{handle_l3, run_plugins_v2_link, run_plugins_v2_physical, Analyzer}; use crate::layers::LinkLayerType; use crate::plugin_registry::PluginRegistry; -use crate::toeplitz; use crossbeam_channel::{unbounded, Receiver, Sender}; use libpcap_tools::*; use pcap_parser::data::PacketData; @@ -209,7 +208,7 @@ pub(crate) fn extern_dispatch_l3<'a>( ethertype: EtherType, ) -> Result<(), Error> { let n_workers = jobs.len(); - let i = fan_out(data, ethertype, n_workers); + let i = softrss_xor_xxhash32(data, ethertype, n_workers); debug_assert!(i < n_workers); // trace!("sending job to worker {i}"); jobs[i] @@ -217,11 +216,14 @@ pub(crate) fn extern_dispatch_l3<'a>( .or(Err(Error::Generic("Error while sending job"))) } -fn fan_out(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize { +#[allow(dead_code)] +fn softrss_toeplitz(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize { + use crate::toeplitz::{toeplitz_hash_aligned32_v2, AlignedU8, SYMMETRIC_KEY_U32BE}; match ethertype { EtherTypes::Ipv4 => { if data.len() >= 20 { - let mut buf: [u8; 20] = [0; 20]; + let mut aligned = AlignedU8([0; 20]); + let buf = &mut aligned.0; let sz = 8; let src_dst_addrs = &data[12..20]; buf[0..sz].copy_from_slice(src_dst_addrs); @@ -238,7 +240,7 @@ fn fan_out(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize { // sz = 12; // } // } - let hash = toeplitz::toeplitz_hash(toeplitz::SYMMETRIC_KEY, &buf[..sz]); + let hash = toeplitz_hash_aligned32_v2(SYMMETRIC_KEY_U32BE, &buf[..sz]); // debug!("{:?} -- hash --> 0x{:x}", buf, hash); // ((hash >> 24) ^ (hash & 0xff)) as usize % n_workers @@ -249,7 +251,8 @@ fn fan_out(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize { } EtherTypes::Ipv6 => { if data.len() >= 40 { - let mut buf: [u8; 40] = [0; 40]; + let mut aligned = AlignedU8([0; 40]); + let buf = &mut aligned.0; let sz = 40; let src_dst_addrs = &data[8..40]; buf[0..sz].copy_from_slice(src_dst_addrs); @@ -266,7 +269,82 @@ fn fan_out(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize { // sz += 4; // } // } - let hash = toeplitz::toeplitz_hash(toeplitz::SYMMETRIC_KEY, &buf[..sz]); + let hash = toeplitz_hash_aligned32_v2(SYMMETRIC_KEY_U32BE, &buf[..sz]); + + // debug!("{:?} -- hash --> 0x{:x}", buf, hash); + // ((hash >> 24) ^ (hash & 0xff)) as usize % n_workers + hash as usize % n_workers + } else { + n_workers - 1 + } + } + _ => 0, + } +} + +// Receive-Side Scaling (RSS) based on XOR function and XxHash32 hash function +// NOTE: This seems to be balanced less evenly than toeplitz hash, but still good +#[allow(dead_code)] +fn softrss_xor_xxhash32(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize { + // This seed is just random data. It has no cryptographic value, it is just used to + // seed the XxHash32::oneshot function + const SEED: u32 = 1; + + match ethertype { + EtherTypes::Ipv4 => { + if data.len() >= 20 { + let mut buf: [u8; 20] = [0; 20]; + let sz = 4; + buf[0] = data[12] ^ data[16]; + buf[1] = data[13] ^ data[17]; + buf[2] = data[14] ^ data[18]; + buf[3] = data[15] ^ data[19]; + // we may append source and destination ports + // XXX breaks fragmentation + // if data[9] == crate::plugin::TRANSPORT_TCP || data[9] == crate::plugin::TRANSPORT_UDP { + // if data.len() >= 24 { + // // source port, in network-order + // buf[8] = data[20]; + // buf[9] = data[21]; + // // destination port, in network-order + // buf[10] = data[22]; + // buf[11] = data[23]; + // sz = 12; + // } + // } + let hash = twox_hash::XxHash32::oneshot(SEED, &buf[..sz]); + + // debug!("{:?} -- hash --> 0x{:x}", buf, hash); + // ((hash >> 24) ^ (hash & 0xff)) as usize % n_workers + hash as usize % n_workers + } else { + n_workers - 1 + } + } + EtherTypes::Ipv6 => { + if data.len() >= 40 { + let mut buf: [u8; 40] = [0; 40]; + // let sz = 32; + // source IP + destination IP, in network-order + // buf[0..32].copy_from_slice(&data[8..40]); + let sz = 16; + for i in 0..16 { + buf[i] = data[8 + i] ^ data[24 + i]; + } + // we may append source and destination ports + // XXX breaks fragmentation + // if data[6] == crate::plugin::TRANSPORT_TCP || data[6] == crate::plugin::TRANSPORT_UDP { + // if data.len() >= 44 { + // // source port, in network-order + // buf[33] = data[40]; + // buf[34] = data[41]; + // // destination port, in network-order + // buf[35] = data[42]; + // buf[36] = data[43]; + // sz += 4; + // } + // } + let hash = twox_hash::XxHash32::oneshot(SEED, &buf[..sz]); // debug!("{:?} -- hash --> 0x{:x}", buf, hash); // ((hash >> 24) ^ (hash & 0xff)) as usize % n_workers diff --git a/libpcap-analyzer/src/toeplitz.rs b/libpcap-analyzer/src/toeplitz.rs index c3477e9..c51a27c 100644 --- a/libpcap-analyzer/src/toeplitz.rs +++ b/libpcap-analyzer/src/toeplitz.rs @@ -1,10 +1,46 @@ +use std::ops::Deref; + /// Maximum key size used throughout. It's OK for hardware to use only the /// first 16 bytes, which is all that's required for IPv4. pub const RSS_KEYSIZE: usize = 40; +#[repr(align(8))] +pub struct AlignedU8(pub [u8; SZ]); + +pub fn try_align32_slice_u8(input: &[u8]) -> Option<&[u32]> { + let (_prefix, data, _suffix) = unsafe { input.align_to::() }; + if _prefix.is_empty() && _suffix.is_empty() { + Some(data) + } else { + None + } +} + +impl AlignedU8 { + pub fn align32(&self) -> &[u32] { + // this will always succeed since the structure is annotated with `repr(align(4))` + let (_prefix, data, _suffix) = unsafe { self.0.align_to::() }; + data + } +} + +impl AsRef<[u8]> for AlignedU8 { + fn as_ref(&self) -> &[u8] { + self.0.as_slice() + } +} + +impl Deref for AlignedU8 { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.0.as_slice() + } +} + // original Microsoft's key #[rustfmt::skip] -pub const DEFAULT_KEY : &[u8] = &[ +pub const DEFAULT_KEY : AlignedU8<52> = AlignedU8([ 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2, 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0, 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4, @@ -12,6 +48,16 @@ pub const DEFAULT_KEY : &[u8] = &[ 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 +]); + +pub const DEFAULT_KEY_U32: &[u32] = &[ + 0xda565a6d, 0xc20e5b25, 0x3d256741, 0xb08fa343, 0xcb2bcad0, 0xb4307bae, 0xa32dcb77, 0xcf23080, + 0x3bb7426a, 0xfa01acbe, 0x0, 0x0, 0x0, +]; + +pub const DEFAULT_KEY_U32BE: &[u32] = &[ + 0x6d5a56da, 0x255b0ec2, 0x4167253d, 0x43a38fb0, 0xd0ca2bcb, 0xae7b30b4, 0x77cb2da3, 0x8030f20c, + 0x6a42b73b, 0xbeac01fa, 0x0, 0x0, 0x0, ]; // key from http://www.ndsl.kaist.edu/~shinae/papers/TR-symRSS.pdf @@ -22,7 +68,7 @@ pub const DEFAULT_KEY : &[u8] = &[ // support the same hash value for these two inputs, the first 32bit of the key need to be // identical to the second 32bit, and the 16bit afterwards should be identical to the next 16bit. #[rustfmt::skip] -pub const SYMMETRIC_KEY : &[u8] = &[ +pub const SYMMETRIC_KEY : AlignedU8<52> = AlignedU8([ 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, @@ -30,6 +76,11 @@ pub const SYMMETRIC_KEY : &[u8] = &[ 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 +]); + +pub const SYMMETRIC_KEY_U32BE: &[u32] = &[ + 0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a, + 0x6d5a6d5a, 0x6d5a6d5a, 0x0, 0x0, 0x0, ]; /// Toeplitz (RSS) hash algorithm @@ -53,6 +104,65 @@ pub fn toeplitz_hash(key: &[u8], data: &[u8]) -> u32 { hash } +/// Toeplitz (RSS) hash algorithm, optimized if key and buffer and 32-bits aligned +pub fn toeplitz_hash_aligned32(key: &[u8], data: &[u8]) -> u32 { + let (_prefix, data32, _suffix) = unsafe { data.align_to::() }; + debug_assert_eq!(_prefix.len(), 0, "data is not aligned properly"); + debug_assert_eq!(_suffix.len(), 0, "input data length not a multiple of 4"); + let (_prefix, key32, _suffix) = unsafe { key.align_to::() }; + debug_assert_eq!(_prefix.len(), 0, "key is not aligned properly"); + debug_assert_eq!(_suffix.len(), 0, "key length not a multiple of 4"); + let mut hash: u32 = 0; + for j in 0..data32.len() { + let mut map = data32[j].to_be(); + //eprintln!("{map:x}"); + while map != 0 { + let i = map.trailing_zeros(); + hash ^= (key32[j].to_be() << (31 - i)) + | (u64::from(key32[j + 1].to_be()) >> (i + 1)) as u32; + // remove the least significant bit + map &= map - 1; + } + } + hash +} + +/// Toeplitz (RSS) hash algorithm, optimized for 32-bits aligned data and *big-endian* key +pub fn toeplitz_hash_aligned32_v2(key: &[u32], data: &[u8]) -> u32 { + let (_prefix, data32, _suffix) = unsafe { data.align_to::() }; + debug_assert_eq!(_prefix.len(), 0, "data is not aligned properly"); + debug_assert_eq!(_suffix.len(), 0, "input data length not a multiple of 4"); + debug_assert!(data32.len() < key.len()); + let mut hash: u32 = 0; + for j in 0..data32.len() { + let mut map = data32[j].to_be(); + //eprintln!("{map:x}"); + while map != 0 { + let i = map.trailing_zeros(); + hash ^= (key[j] << (31 - i)) | (u64::from(key[j + 1]) >> (i + 1)) as u32; + // remove the least significant bit + map &= map - 1; + } + } + hash +} + +/// Toeplitz (RSS) hash algorithm, optimized for 32-bits *big-endian* data and *big-endian* key +pub fn toeplitz_hash_u32be(key: &[u32], data: &[u32]) -> u32 { + let mut hash: u32 = 0; + for j in 0..data.len() { + let mut map = data[j]; + //eprintln!("{map:x}"); + while map != 0 { + let i = map.trailing_zeros(); + hash ^= (key[j] << (31 - i)) | (u64::from(key[j + 1]) >> (i + 1)) as u32; + // remove the least significant bit + map &= map - 1; + } + } + hash +} + #[cfg(test)] mod tests { use super::*; @@ -64,10 +174,10 @@ mod tests { #[test] fn toeplitz_hash_test() { const DATA1: &[u8] = &[66, 9, 149, 187, 161, 142, 100, 80, 10, 234, 6, 230]; - let res = toeplitz_hash(DEFAULT_KEY, DATA1); + let res = toeplitz_hash(&DEFAULT_KEY, DATA1); assert_eq!(res, 0x51cc_c178); const DATA2: &[u8] = &[199, 92, 111, 2, 65, 69, 140, 83, 55, 150, 18, 131]; - let res = toeplitz_hash(DEFAULT_KEY, DATA2); + let res = toeplitz_hash(&DEFAULT_KEY, DATA2); assert_eq!(res, 0xc626_b0ea); } @@ -110,7 +220,11 @@ mod tests { #[rustfmt::skip] fn prepare_buffer(src_addr: IpAddr, dst_addr: IpAddr, src_port: u16, dst_port: u16) -> ([u8;40], usize) { - let mut buf = [0u8; 40]; + #[repr(C, align(4))] + struct AlignedBuffer(pub [u8; 40]); + + let mut aligned = AlignedBuffer([0; 40]); + let buf = &mut aligned.0; let sz = match src_addr { IpAddr::V4(v4) => { buf[..4].copy_from_slice(&v4.octets()); 4 }, IpAddr::V6(v6) => { buf[..16].copy_from_slice(&v6.octets()); 16 }, @@ -124,7 +238,7 @@ mod tests { buf[sz + 2] = ((dst_port & 0xff00) >> 8) as u8; buf[sz + 3] = (dst_port & 0x00ff) as u8; - (buf, sz + 4) + (aligned.0, sz + 4) } #[test] @@ -133,11 +247,11 @@ mod tests { for v in &test_vectors { // println!("{:?}", v); let (buf, sz) = prepare_buffer(v.src_addr, v.dst_addr, v.src_port, v.dst_port); - let without_tcp_hash = toeplitz_hash(DEFAULT_KEY, &buf[..sz - 4]); + let without_tcp_hash = toeplitz_hash(&DEFAULT_KEY, &buf[..sz - 4]); // println!("{:02x?}", without_tcp_hash); assert_eq!(without_tcp_hash, v.without_tcp_hash); - let with_tcp_hash = toeplitz_hash(DEFAULT_KEY, &buf[..sz]); + let with_tcp_hash = toeplitz_hash(&DEFAULT_KEY, &buf[..sz]); // println!("{:02x?}", with_tcp_hash); assert_eq!(with_tcp_hash, v.with_tcp_hash); } @@ -160,24 +274,45 @@ mod tests { }; let (buf, sz) = prepare_buffer(v.src_addr, v.dst_addr, v.src_port, v.dst_port); - let without_tcp_hash = toeplitz_hash(SYMMETRIC_KEY, &buf[..sz - 4]); + let without_tcp_hash = toeplitz_hash(&SYMMETRIC_KEY, &buf[..sz - 4]); let (buf2, sz2) = prepare_buffer( v_sym.src_addr, v_sym.dst_addr, v_sym.src_port, v_sym.dst_port, ); - let without_tcp_hash_sym = toeplitz_hash(SYMMETRIC_KEY, &buf2[..sz2 - 4]); + let without_tcp_hash_sym = toeplitz_hash(&SYMMETRIC_KEY, &buf2[..sz2 - 4]); // println!("{:02x?}", without_tcp_hash); assert_eq!( without_tcp_hash, without_tcp_hash_sym, "Symmetry without ports" ); - let with_tcp_hash = toeplitz_hash(SYMMETRIC_KEY, &buf[..sz]); - let with_tcp_hash_sym = toeplitz_hash(SYMMETRIC_KEY, &buf2[..sz2]); + let with_tcp_hash = toeplitz_hash(&SYMMETRIC_KEY, &buf[..sz]); + let with_tcp_hash_sym = toeplitz_hash(&SYMMETRIC_KEY, &buf2[..sz2]); // println!("{:02x?}", with_tcp_hash); assert_eq!(with_tcp_hash, with_tcp_hash_sym, "Symmetry with ports"); } } + + #[test] + fn toeplitz_test_optim() { + let test_vectors = create_test_vectors(); + for v in &test_vectors { + // println!("{:?}", v); + let (buf, sz) = prepare_buffer(v.src_addr, v.dst_addr, v.src_port, v.dst_port); + + let unopt = toeplitz_hash(&DEFAULT_KEY, &buf[..sz - 4]); + // println!("{:02x?}", without_tcp_hash); + assert_eq!(unopt, v.without_tcp_hash); + + let opt = toeplitz_hash_aligned32(&DEFAULT_KEY, &buf[..sz - 4]); + // println!("{:02x?}", with_tcp_hash); + assert_eq!(unopt, opt); + + let opt = toeplitz_hash_aligned32_v2(DEFAULT_KEY_U32BE, &buf[..sz - 4]); + // println!("{:02x?}", with_tcp_hash); + assert_eq!(unopt, opt); + } + } }