Skip to content

Commit

Permalink
libpcap-analyzer: implement Toeplitz and XOR+xxHash RSS functions
Browse files Browse the repository at this point in the history
Receive-Side Scaling (RSS) distributes packets across several queues (and workers).
Toeplitz-based hash used to be the default, but is much slower than XOR+xxHash.
However, it seems Toeplitz-based hash distributes data more evenly.

Default to XOR+xxHash
  • Loading branch information
pc-anssi authored and chifflier committed Nov 26, 2024
1 parent 5db221e commit 9530d3d
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 19 deletions.
1 change: 1 addition & 0 deletions libpcap-analyzer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ serde_json = "1.0"
sha1 = { version="0.10", features=["std"], optional=true }
tls-parser = { version="0.12", optional=true }
tracing = { version="0.1", features=["log"] }
twox-hash = { version="2.0", default-features=false, features=["xxhash32"] }

[dependencies.rusticata]
# path = "../../rusticata"
Expand Down
92 changes: 85 additions & 7 deletions libpcap-analyzer/src/threaded_analyzer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::analyzer::{handle_l3, run_plugins_v2_link, run_plugins_v2_physical, Analyzer};
use crate::layers::LinkLayerType;
use crate::plugin_registry::PluginRegistry;
use crate::toeplitz;
use crossbeam_channel::{unbounded, Receiver, Sender};
use libpcap_tools::*;
use pcap_parser::data::PacketData;
Expand Down Expand Up @@ -209,19 +208,22 @@ pub(crate) fn extern_dispatch_l3<'a>(
ethertype: EtherType,
) -> Result<(), Error> {
let n_workers = jobs.len();
let i = fan_out(data, ethertype, n_workers);
let i = softrss_xor_xxhash32(data, ethertype, n_workers);
debug_assert!(i < n_workers);
// trace!("sending job to worker {i}");
jobs[i]
.send(Job::New(packet, ctx.clone(), data, ethertype))
.or(Err(Error::Generic("Error while sending job")))
}

fn fan_out(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize {
#[allow(dead_code)]
fn softrss_toeplitz(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize {
use crate::toeplitz::{toeplitz_hash_aligned32_v2, AlignedU8, SYMMETRIC_KEY_U32BE};
match ethertype {
EtherTypes::Ipv4 => {
if data.len() >= 20 {
let mut buf: [u8; 20] = [0; 20];
let mut aligned = AlignedU8([0; 20]);
let buf = &mut aligned.0;
let sz = 8;
let src_dst_addrs = &data[12..20];
buf[0..sz].copy_from_slice(src_dst_addrs);
Expand All @@ -238,7 +240,7 @@ fn fan_out(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize {
// sz = 12;
// }
// }
let hash = toeplitz::toeplitz_hash(toeplitz::SYMMETRIC_KEY, &buf[..sz]);
let hash = toeplitz_hash_aligned32_v2(SYMMETRIC_KEY_U32BE, &buf[..sz]);

// debug!("{:?} -- hash --> 0x{:x}", buf, hash);
// ((hash >> 24) ^ (hash & 0xff)) as usize % n_workers
Expand All @@ -249,7 +251,8 @@ fn fan_out(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize {
}
EtherTypes::Ipv6 => {
if data.len() >= 40 {
let mut buf: [u8; 40] = [0; 40];
let mut aligned = AlignedU8([0; 40]);
let buf = &mut aligned.0;
let sz = 40;
let src_dst_addrs = &data[8..40];
buf[0..sz].copy_from_slice(src_dst_addrs);
Expand All @@ -266,7 +269,82 @@ fn fan_out(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize {
// sz += 4;
// }
// }
let hash = toeplitz::toeplitz_hash(toeplitz::SYMMETRIC_KEY, &buf[..sz]);
let hash = toeplitz_hash_aligned32_v2(SYMMETRIC_KEY_U32BE, &buf[..sz]);

// debug!("{:?} -- hash --> 0x{:x}", buf, hash);
// ((hash >> 24) ^ (hash & 0xff)) as usize % n_workers
hash as usize % n_workers
} else {
n_workers - 1
}
}
_ => 0,
}
}

// Receive-Side Scaling (RSS) based on XOR function and XxHash32 hash function
// NOTE: This seems to be balanced less evenly than toeplitz hash, but still good
#[allow(dead_code)]
fn softrss_xor_xxhash32(data: &[u8], ethertype: EtherType, n_workers: usize) -> usize {
// This seed is just random data. It has no cryptographic value, it is just used to
// seed the XxHash32::oneshot function
const SEED: u32 = 1;

match ethertype {
EtherTypes::Ipv4 => {
if data.len() >= 20 {
let mut buf: [u8; 20] = [0; 20];
let sz = 4;
buf[0] = data[12] ^ data[16];
buf[1] = data[13] ^ data[17];
buf[2] = data[14] ^ data[18];
buf[3] = data[15] ^ data[19];
// we may append source and destination ports
// XXX breaks fragmentation
// if data[9] == crate::plugin::TRANSPORT_TCP || data[9] == crate::plugin::TRANSPORT_UDP {
// if data.len() >= 24 {
// // source port, in network-order
// buf[8] = data[20];
// buf[9] = data[21];
// // destination port, in network-order
// buf[10] = data[22];
// buf[11] = data[23];
// sz = 12;
// }
// }
let hash = twox_hash::XxHash32::oneshot(SEED, &buf[..sz]);

// debug!("{:?} -- hash --> 0x{:x}", buf, hash);
// ((hash >> 24) ^ (hash & 0xff)) as usize % n_workers
hash as usize % n_workers
} else {
n_workers - 1
}
}
EtherTypes::Ipv6 => {
if data.len() >= 40 {
let mut buf: [u8; 40] = [0; 40];
// let sz = 32;
// source IP + destination IP, in network-order
// buf[0..32].copy_from_slice(&data[8..40]);
let sz = 16;
for i in 0..16 {
buf[i] = data[8 + i] ^ data[24 + i];
}
// we may append source and destination ports
// XXX breaks fragmentation
// if data[6] == crate::plugin::TRANSPORT_TCP || data[6] == crate::plugin::TRANSPORT_UDP {
// if data.len() >= 44 {
// // source port, in network-order
// buf[33] = data[40];
// buf[34] = data[41];
// // destination port, in network-order
// buf[35] = data[42];
// buf[36] = data[43];
// sz += 4;
// }
// }
let hash = twox_hash::XxHash32::oneshot(SEED, &buf[..sz]);

// debug!("{:?} -- hash --> 0x{:x}", buf, hash);
// ((hash >> 24) ^ (hash & 0xff)) as usize % n_workers
Expand Down
159 changes: 147 additions & 12 deletions libpcap-analyzer/src/toeplitz.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,63 @@
use std::ops::Deref;

/// Maximum key size used throughout. It's OK for hardware to use only the
/// first 16 bytes, which is all that's required for IPv4.
pub const RSS_KEYSIZE: usize = 40;

#[repr(align(8))]
pub struct AlignedU8<const SZ: usize>(pub [u8; SZ]);

pub fn try_align32_slice_u8(input: &[u8]) -> Option<&[u32]> {
let (_prefix, data, _suffix) = unsafe { input.align_to::<u32>() };
if _prefix.is_empty() && _suffix.is_empty() {
Some(data)
} else {
None
}
}

impl<const SZ: usize> AlignedU8<SZ> {
pub fn align32(&self) -> &[u32] {
// this will always succeed since the structure is annotated with `repr(align(4))`
let (_prefix, data, _suffix) = unsafe { self.0.align_to::<u32>() };
data
}
}

impl<const SZ: usize> AsRef<[u8]> for AlignedU8<SZ> {
fn as_ref(&self) -> &[u8] {
self.0.as_slice()
}
}

impl<const SZ: usize> Deref for AlignedU8<SZ> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
self.0.as_slice()
}
}

// original Microsoft's key
#[rustfmt::skip]
pub const DEFAULT_KEY : &[u8] = &[
pub const DEFAULT_KEY : AlignedU8<52> = AlignedU8([
0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00
]);

pub const DEFAULT_KEY_U32: &[u32] = &[
0xda565a6d, 0xc20e5b25, 0x3d256741, 0xb08fa343, 0xcb2bcad0, 0xb4307bae, 0xa32dcb77, 0xcf23080,
0x3bb7426a, 0xfa01acbe, 0x0, 0x0, 0x0,
];

pub const DEFAULT_KEY_U32BE: &[u32] = &[
0x6d5a56da, 0x255b0ec2, 0x4167253d, 0x43a38fb0, 0xd0ca2bcb, 0xae7b30b4, 0x77cb2da3, 0x8030f20c,
0x6a42b73b, 0xbeac01fa, 0x0, 0x0, 0x0,
];

// key from http://www.ndsl.kaist.edu/~shinae/papers/TR-symRSS.pdf
Expand All @@ -22,14 +68,19 @@ pub const DEFAULT_KEY : &[u8] = &[
// support the same hash value for these two inputs, the first 32bit of the key need to be
// identical to the second 32bit, and the 16bit afterwards should be identical to the next 16bit.
#[rustfmt::skip]
pub const SYMMETRIC_KEY : &[u8] = &[
pub const SYMMETRIC_KEY : AlignedU8<52> = AlignedU8([
0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a,
0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a,
0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a,
0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a,
0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00
]);

pub const SYMMETRIC_KEY_U32BE: &[u32] = &[
0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a, 0x6d5a6d5a,
0x6d5a6d5a, 0x6d5a6d5a, 0x0, 0x0, 0x0,
];

/// Toeplitz (RSS) hash algorithm
Expand All @@ -53,6 +104,65 @@ pub fn toeplitz_hash(key: &[u8], data: &[u8]) -> u32 {
hash
}

/// Toeplitz (RSS) hash algorithm, optimized if key and buffer and 32-bits aligned
pub fn toeplitz_hash_aligned32(key: &[u8], data: &[u8]) -> u32 {
let (_prefix, data32, _suffix) = unsafe { data.align_to::<u32>() };
debug_assert_eq!(_prefix.len(), 0, "data is not aligned properly");
debug_assert_eq!(_suffix.len(), 0, "input data length not a multiple of 4");
let (_prefix, key32, _suffix) = unsafe { key.align_to::<u32>() };
debug_assert_eq!(_prefix.len(), 0, "key is not aligned properly");
debug_assert_eq!(_suffix.len(), 0, "key length not a multiple of 4");
let mut hash: u32 = 0;
for j in 0..data32.len() {
let mut map = data32[j].to_be();
//eprintln!("{map:x}");
while map != 0 {
let i = map.trailing_zeros();
hash ^= (key32[j].to_be() << (31 - i))
| (u64::from(key32[j + 1].to_be()) >> (i + 1)) as u32;
// remove the least significant bit
map &= map - 1;
}
}
hash
}

/// Toeplitz (RSS) hash algorithm, optimized for 32-bits aligned data and *big-endian* key
pub fn toeplitz_hash_aligned32_v2(key: &[u32], data: &[u8]) -> u32 {
let (_prefix, data32, _suffix) = unsafe { data.align_to::<u32>() };
debug_assert_eq!(_prefix.len(), 0, "data is not aligned properly");
debug_assert_eq!(_suffix.len(), 0, "input data length not a multiple of 4");
debug_assert!(data32.len() < key.len());
let mut hash: u32 = 0;
for j in 0..data32.len() {
let mut map = data32[j].to_be();
//eprintln!("{map:x}");
while map != 0 {
let i = map.trailing_zeros();
hash ^= (key[j] << (31 - i)) | (u64::from(key[j + 1]) >> (i + 1)) as u32;
// remove the least significant bit
map &= map - 1;
}
}
hash
}

/// Toeplitz (RSS) hash algorithm, optimized for 32-bits *big-endian* data and *big-endian* key
pub fn toeplitz_hash_u32be(key: &[u32], data: &[u32]) -> u32 {
let mut hash: u32 = 0;
for j in 0..data.len() {
let mut map = data[j];
//eprintln!("{map:x}");
while map != 0 {
let i = map.trailing_zeros();
hash ^= (key[j] << (31 - i)) | (u64::from(key[j + 1]) >> (i + 1)) as u32;
// remove the least significant bit
map &= map - 1;
}
}
hash
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -64,10 +174,10 @@ mod tests {
#[test]
fn toeplitz_hash_test() {
const DATA1: &[u8] = &[66, 9, 149, 187, 161, 142, 100, 80, 10, 234, 6, 230];
let res = toeplitz_hash(DEFAULT_KEY, DATA1);
let res = toeplitz_hash(&DEFAULT_KEY, DATA1);
assert_eq!(res, 0x51cc_c178);
const DATA2: &[u8] = &[199, 92, 111, 2, 65, 69, 140, 83, 55, 150, 18, 131];
let res = toeplitz_hash(DEFAULT_KEY, DATA2);
let res = toeplitz_hash(&DEFAULT_KEY, DATA2);
assert_eq!(res, 0xc626_b0ea);
}

Expand Down Expand Up @@ -110,7 +220,11 @@ mod tests {

#[rustfmt::skip]
fn prepare_buffer(src_addr: IpAddr, dst_addr: IpAddr, src_port: u16, dst_port: u16) -> ([u8;40], usize) {
let mut buf = [0u8; 40];
#[repr(C, align(4))]
struct AlignedBuffer(pub [u8; 40]);

let mut aligned = AlignedBuffer([0; 40]);
let buf = &mut aligned.0;
let sz = match src_addr {
IpAddr::V4(v4) => { buf[..4].copy_from_slice(&v4.octets()); 4 },
IpAddr::V6(v6) => { buf[..16].copy_from_slice(&v6.octets()); 16 },
Expand All @@ -124,7 +238,7 @@ mod tests {
buf[sz + 2] = ((dst_port & 0xff00) >> 8) as u8;
buf[sz + 3] = (dst_port & 0x00ff) as u8;

(buf, sz + 4)
(aligned.0, sz + 4)
}

#[test]
Expand All @@ -133,11 +247,11 @@ mod tests {
for v in &test_vectors {
// println!("{:?}", v);
let (buf, sz) = prepare_buffer(v.src_addr, v.dst_addr, v.src_port, v.dst_port);
let without_tcp_hash = toeplitz_hash(DEFAULT_KEY, &buf[..sz - 4]);
let without_tcp_hash = toeplitz_hash(&DEFAULT_KEY, &buf[..sz - 4]);
// println!("{:02x?}", without_tcp_hash);
assert_eq!(without_tcp_hash, v.without_tcp_hash);

let with_tcp_hash = toeplitz_hash(DEFAULT_KEY, &buf[..sz]);
let with_tcp_hash = toeplitz_hash(&DEFAULT_KEY, &buf[..sz]);
// println!("{:02x?}", with_tcp_hash);
assert_eq!(with_tcp_hash, v.with_tcp_hash);
}
Expand All @@ -160,24 +274,45 @@ mod tests {
};

let (buf, sz) = prepare_buffer(v.src_addr, v.dst_addr, v.src_port, v.dst_port);
let without_tcp_hash = toeplitz_hash(SYMMETRIC_KEY, &buf[..sz - 4]);
let without_tcp_hash = toeplitz_hash(&SYMMETRIC_KEY, &buf[..sz - 4]);
let (buf2, sz2) = prepare_buffer(
v_sym.src_addr,
v_sym.dst_addr,
v_sym.src_port,
v_sym.dst_port,
);
let without_tcp_hash_sym = toeplitz_hash(SYMMETRIC_KEY, &buf2[..sz2 - 4]);
let without_tcp_hash_sym = toeplitz_hash(&SYMMETRIC_KEY, &buf2[..sz2 - 4]);
// println!("{:02x?}", without_tcp_hash);
assert_eq!(
without_tcp_hash, without_tcp_hash_sym,
"Symmetry without ports"
);

let with_tcp_hash = toeplitz_hash(SYMMETRIC_KEY, &buf[..sz]);
let with_tcp_hash_sym = toeplitz_hash(SYMMETRIC_KEY, &buf2[..sz2]);
let with_tcp_hash = toeplitz_hash(&SYMMETRIC_KEY, &buf[..sz]);
let with_tcp_hash_sym = toeplitz_hash(&SYMMETRIC_KEY, &buf2[..sz2]);
// println!("{:02x?}", with_tcp_hash);
assert_eq!(with_tcp_hash, with_tcp_hash_sym, "Symmetry with ports");
}
}

#[test]
fn toeplitz_test_optim() {
let test_vectors = create_test_vectors();
for v in &test_vectors {
// println!("{:?}", v);
let (buf, sz) = prepare_buffer(v.src_addr, v.dst_addr, v.src_port, v.dst_port);

let unopt = toeplitz_hash(&DEFAULT_KEY, &buf[..sz - 4]);
// println!("{:02x?}", without_tcp_hash);
assert_eq!(unopt, v.without_tcp_hash);

let opt = toeplitz_hash_aligned32(&DEFAULT_KEY, &buf[..sz - 4]);
// println!("{:02x?}", with_tcp_hash);
assert_eq!(unopt, opt);

let opt = toeplitz_hash_aligned32_v2(DEFAULT_KEY_U32BE, &buf[..sz - 4]);
// println!("{:02x?}", with_tcp_hash);
assert_eq!(unopt, opt);
}
}
}

0 comments on commit 9530d3d

Please sign in to comment.