diff --git a/crates/illumos-sys-hdrs/src/lib.rs b/crates/illumos-sys-hdrs/src/lib.rs index 12bb8d1d..bf43dab3 100644 --- a/crates/illumos-sys-hdrs/src/lib.rs +++ b/crates/illumos-sys-hdrs/src/lib.rs @@ -12,7 +12,10 @@ pub mod kernel; #[cfg(feature = "kernel")] pub use kernel::*; +use core::mem::transmute; use core::ptr; +use core::sync::atomic::AtomicU64; +use core::sync::atomic::Ordering; // The following are "C type" aliases for native Rust types so that // the native illumos structures may be defined almost verbatim to the @@ -117,8 +120,9 @@ impl kstat_named_t { Self::default() } + #[inline] pub fn val_u64(&self) -> u64 { - unsafe { self.value._u64 } + self.value.as_u64().load(Ordering::Relaxed) } } @@ -144,20 +148,48 @@ pub union KStatNamedValue { impl core::ops::AddAssign for KStatNamedValue { #[inline] fn add_assign(&mut self, other: u64) { - unsafe { self._u64 += other }; + self.incr_u64(other); } } impl core::ops::SubAssign for KStatNamedValue { #[inline] fn sub_assign(&mut self, other: u64) { - unsafe { self._u64 -= other }; + self.decr_u64(other); } } impl KStatNamedValue { - pub fn set_u64(&mut self, val: u64) { - self._u64 = val; + /// Validates at compile time whether ._u64 can be safely used as + /// an AtomicU64. + const KSTAT_ATOMIC_U64_SAFE: () = if align_of::() % 8 == 0 + { + } else { + panic!("Platform does not meet u64 8B alignment for AtomicU64"); + }; + + #[inline(always)] + #[allow(clippy::let_unit_value)] + pub fn as_u64(&self) -> &AtomicU64 { + _ = Self::KSTAT_ATOMIC_U64_SAFE; + // SAFETY: KStatNamedValue must have 8B alignment on target platform. + // Validated by compile time check in `KSTAT_ATOMIC_U64_SAFE`. + unsafe { transmute::<&u64, &AtomicU64>(&self._u64) } + } + + #[inline] + pub fn set_u64(&self, val: u64) { + core::hint::black_box(self.as_u64().store(val, Ordering::Relaxed)); + } + + #[inline] + pub fn incr_u64(&self, val: u64) { + core::hint::black_box(self.as_u64().fetch_add(val, Ordering::Relaxed)); + } + + #[inline] + pub fn decr_u64(&self, val: u64) { + core::hint::black_box(self.as_u64().fetch_sub(val, Ordering::Relaxed)); } } diff --git a/lib/opte/src/ddi/kstat.rs b/lib/opte/src/ddi/kstat.rs index a6587356..4c4abf7b 100644 --- a/lib/opte/src/ddi/kstat.rs +++ b/lib/opte/src/ddi/kstat.rs @@ -22,6 +22,9 @@ cfg_if! { c_void, kstat_t, kstat_create, kstat_delete, kstat_install, kstat_named_init, kstat_named_t, KSTAT_STRLEN, KSTAT_TYPE_NAMED, }; + } else { + use core::sync::atomic::AtomicU64; + use core::sync::atomic::Ordering; } } @@ -221,35 +224,41 @@ impl KStatU64 { Self { inner: kstat_named_t::new() } } - pub fn set(&mut self, val: u64) { + pub fn set(&self, val: u64) { self.inner.value.set_u64(val); } pub fn val(&self) -> u64 { self.inner.val_u64() } + + pub fn incr(&self, val: u64) { + self.inner.value.incr_u64(val); + } + + pub fn decr(&self, val: u64) { + self.inner.value.decr_u64(val); + } } -#[cfg(all(not(feature = "std"), not(test)))] impl core::ops::AddAssign for KStatU64 { #[inline] fn add_assign(&mut self, other: u64) { - self.inner.value += other; + self.incr(other); } } -#[cfg(all(not(feature = "std"), not(test)))] impl core::ops::SubAssign for KStatU64 { #[inline] fn sub_assign(&mut self, other: u64) { - self.inner.value -= other; + self.decr(other); } } #[cfg(any(feature = "std", test))] #[derive(Default)] pub struct KStatU64 { - value: u64, + value: AtomicU64, } #[cfg(any(feature = "std", test))] @@ -262,26 +271,20 @@ impl KStatU64 { Self::default() } - pub fn set(&mut self, val: u64) { - self.value = val; + pub fn set(&self, val: u64) { + self.value.store(val, Ordering::Relaxed); } pub fn val(&self) -> u64 { - self.value + self.value.load(Ordering::Relaxed) } -} -#[cfg(any(feature = "std", test))] -impl core::ops::AddAssign for KStatU64 { - fn add_assign(&mut self, other: u64) { - self.value += other; + pub fn incr(&self, val: u64) { + self.value.fetch_add(val, Ordering::Relaxed); } -} -#[cfg(any(feature = "std", test))] -impl core::ops::SubAssign for KStatU64 { - fn sub_assign(&mut self, other: u64) { - self.value -= other; + pub fn decr(&self, val: u64) { + self.value.fetch_sub(val, Ordering::Relaxed); } } diff --git a/lib/opte/src/engine/layer.rs b/lib/opte/src/engine/layer.rs index 45b67557..d3c28a9f 100644 --- a/lib/opte/src/engine/layer.rs +++ b/lib/opte/src/engine/layer.rs @@ -751,7 +751,7 @@ impl Layer { // Unwrap: We know this is fine because the stat names are // generated from the LayerStats structure. - let mut stats = KStatNamed::new( + let stats = KStatNamed::new( "xde", &format!("{}_{}", port, name), LayerStats::new(), diff --git a/lib/opte/src/engine/port.rs b/lib/opte/src/engine/port.rs index c0c4f9b8..97dadc34 100644 --- a/lib/opte/src/engine/port.rs +++ b/lib/opte/src/engine/port.rs @@ -65,6 +65,8 @@ use crate::ddi::mblk::MsgBlk; use crate::ddi::mblk::MsgBlkIterMut; use crate::ddi::sync::KMutex; use crate::ddi::sync::KMutexType; +use crate::ddi::sync::KRwLock; +use crate::ddi::sync::KRwLockType; use crate::ddi::time::Moment; use crate::engine::flow_table::ExpiryPolicy; use crate::engine::packet::EmitSpec; @@ -326,7 +328,6 @@ impl PortBuilder { ) -> result::Result, PortCreateError> { let data = PortData { state: PortState::Ready, - stats: KStatNamed::new("xde", &self.name, PortStats::new())?, // At this point the layer pipeline is immutable, thus we // move the layers out of the mutex. layers: self.layers.into_inner(), @@ -340,14 +341,18 @@ impl PortBuilder { ), }; + let mut data = KRwLock::new(data); + data.init(KRwLockType::Driver); + Ok(Port { name: self.name.clone(), name_cstr: self.name_cstr, mac: self.mac, ectx: self.ectx, epoch: AtomicU64::new(1), + stats: KStatNamed::new("xde", &self.name, PortStats::new())?, net, - data: KMutex::new(data, KMutexType::Driver), + data, }) } @@ -669,7 +674,6 @@ struct PortStats { struct PortData { state: PortState, - stats: KStatNamed, layers: Vec, uft_in: FlowTable>, uft_out: FlowTable>, @@ -741,8 +745,9 @@ pub struct Port { // probes. name_cstr: CString, mac: MacAddr, + stats: KStatNamed, net: N, - data: KMutex, + data: KRwLock, } // Convert: @@ -799,7 +804,7 @@ impl Port { /// /// * [`PortState::Running`] pub fn pause(&self) -> Result<()> { - let mut data = self.data.lock(); + let mut data = self.data.write(); check_state!(data.state, [PortState::Running])?; data.state = PortState::Paused; Ok(()) @@ -814,7 +819,7 @@ impl Port { /// This command is valid for all states. If the port is already /// in the running state, this is a no op. pub fn start(&self) { - self.data.lock().state = PortState::Running; + self.data.write().state = PortState::Running; } /// Reset the port. @@ -830,7 +835,7 @@ impl Port { // It's imperative to hold the lock for the entire function so // that its side effects are atomic from the point of view of // other threads. - let mut data = self.data.lock(); + let mut data = self.data.write(); data.state = PortState::Ready; // Clear all dynamic state related to the creation of flows. @@ -845,7 +850,7 @@ impl Port { /// Get the current [`PortState`]. pub fn state(&self) -> PortState { - self.data.lock().state + self.data.read().state } /// Add a new `Rule` to the layer named by `layer`. @@ -870,7 +875,7 @@ impl Port { dir: Direction, rule: Rule, ) -> Result<()> { - let mut data = self.data.lock(); + let mut data = self.data.write(); check_state!(data.state, [PortState::Ready, PortState::Running])?; for layer in &mut data.layers { @@ -958,7 +963,7 @@ impl Port { /// /// This command is valid for any [`PortState`]. pub fn dump_layer(&self, name: &str) -> Result { - let data = self.data.lock(); + let data = self.data.read(); for l in &data.layers { if l.name() == name { @@ -979,7 +984,7 @@ impl Port { /// * [`PortState::Paused`] /// * [`PortState::Restored`] pub fn dump_tcp_flows(&self) -> Result { - let data = self.data.lock(); + let data = self.data.read(); check_state!( data.state, [PortState::Running, PortState::Paused, PortState::Restored] @@ -996,7 +1001,7 @@ impl Port { /// /// * [`PortState::Running`] pub fn clear_uft(&self) -> Result<()> { - let mut data = self.data.lock(); + let mut data = self.data.write(); check_state!(data.state, [PortState::Running])?; data.uft_in.clear(); data.uft_out.clear(); @@ -1012,7 +1017,7 @@ impl Port { /// /// * [`PortState::Running`] pub fn clear_lft(&self, layer: &str) -> Result<()> { - let mut data = self.data.lock(); + let mut data = self.data.write(); check_state!(data.state, [PortState::Running])?; data.layers .iter_mut() @@ -1032,7 +1037,7 @@ impl Port { /// * [`PortState::Paused`] /// * [`PortState::Restored`] pub fn dump_uft(&self) -> Result { - let data = self.data.lock(); + let data = self.data.read(); check_state!( data.state, @@ -1083,7 +1088,7 @@ impl Port { #[inline(always)] fn expire_flows_inner(&self, now: Option) -> Result<()> { - let mut data = self.data.lock(); + let mut data = self.data.write(); let now = now.unwrap_or_else(Moment::now); check_state!(data.state, [PortState::Running])?; @@ -1121,7 +1126,7 @@ impl Port { dir: Direction, rule: &Rule, ) -> Result> { - let data = self.data.lock(); + let data = self.data.read(); for layer in &data.layers { if layer.name() == layer_name { @@ -1140,7 +1145,7 @@ impl Port { /// /// This command is valid for any [`PortState`]. pub fn layer_action(&self, layer: &str, idx: usize) -> Option { - let data = self.data.lock(); + let data = self.data.read(); for l in &data.layers { if l.name() == layer { return l.action(idx); @@ -1156,7 +1161,7 @@ impl Port { /// /// This command is valid for any [`PortState`]. pub fn layer_stats_snap(&self, layer: &str) -> Option { - let data = self.data.lock(); + let data = self.data.read(); for l in &data.layers { if l.name() == layer { @@ -1173,7 +1178,7 @@ impl Port { /// /// This command is valid for any [`PortState`]. pub fn list_layers(&self) -> ioctl::ListLayersResp { - let data = self.data.lock(); + let data = self.data.read(); let mut tmp = vec![]; for layer in &data.layers { @@ -1250,23 +1255,50 @@ impl Port { // // In case 1, we can also cache and reuse the same EmitSpec for // all hit packets. + // + // Lock management here is generally optimistic. The strategy we employ + // is to attempt to grab a UFT entry from a read lock -- on a miss or + // an invalidated entry, we upgrade to a write lock (assuming we need to + // fallback to the slowpath), and attempt to read again. A second miss + // then leads to the slowpath. A successful fastpath hit may need to + // reacquire the write-lock if we end up closing out a TCP flow. // The lock needs to be optional here because there is one // case wherein we need to reacquire the lock -- invalidation // by TCP state. - let mut lock = Some(self.data.lock()); - let data = lock.as_mut().expect("lock should be held on this codepath"); + let data = self.data.read(); // (1) Check for UFT and precompiled. - let epoch = self.epoch(); + let mut epoch = self.epoch(); check_state!(data.state, [PortState::Running]) .map_err(|_| ProcessError::BadState(data.state))?; self.port_process_entry_probe(dir, &flow_before, epoch, mblk_addr); - let uft: Option<&Arc>>> = match dir { + let uft: Option>>> = (match dir { Direction::Out => data.uft_out.get(&flow_before), Direction::In => data.uft_in.get(&flow_before), + }) + .map(Arc::clone); + + drop(data); + + // If we have a UFT miss or invalid entry, upgrade to a write lock and + // retry. This lets us use an optimistic lookup more often. + let (uft, mut lock) = match uft { + Some(ref entry) if entry.state().epoch == epoch => (uft, None), + Some(_) | None => { + let data = self.data.write(); + epoch = self.epoch(); + ( + (match dir { + Direction::Out => data.uft_out.get(&flow_before), + Direction::In => data.uft_in.get(&flow_before), + }) + .map(Arc::clone), + Some(data), + ) + } }; enum FastPathDecision { @@ -1278,18 +1310,21 @@ impl Port { let decision = match uft { // We have a valid UFT entry of some kind -- clone out the // saved transforms so that we can drop the lock ASAP. + // Recheck epoch in case we took a write lock and re-read + // the UFT. Some(entry) if entry.state().epoch == epoch => { // The Fast Path. + drop(lock.take()); let xforms = &entry.state().xforms; let out = if xforms.compiled.is_some() { - FastPathDecision::CompiledUft(Arc::clone(entry)) + FastPathDecision::CompiledUft(entry) } else { - FastPathDecision::Uft(Arc::clone(entry)) + FastPathDecision::Uft(entry) }; match dir { - Direction::In => data.stats.vals.in_uft_hit += 1, - Direction::Out => data.stats.vals.out_uft_hit += 1, + Direction::In => self.stats.vals.in_uft_hit.incr(1), + Direction::Out => self.stats.vals.out_uft_hit.incr(1), } out @@ -1297,7 +1332,12 @@ impl Port { // The entry is from a previous epoch; invalidate its UFT // entries and proceed to rule processing. + // We will have been upgraded to a write lock if this was + // possible. Some(entry) => { + let data = lock + .as_mut() + .expect("lock should be held on this codepath"); let epoch = entry.state().epoch; let owned_pair = *entry.state().pair.lock(); let (ufid_in, ufid_out) = match dir { @@ -1324,25 +1364,16 @@ impl Port { match &decision { FastPathDecision::CompiledUft(entry) | FastPathDecision::Uft(entry) => { - // TODO: Ideally the Kstat should be holding AtomicU64s, then we get - // out of the lock sooner. Note that we don't need to *apply* a given - // set of transforms in order to know which stats we'll modify. let dummy_res = Ok(InternalProcessResult::Modified); match dir { Direction::In => { - Self::update_stats_in(&mut data.stats.vals, &dummy_res); + self.update_stats_in(&dummy_res); } Direction::Out => { - Self::update_stats_out( - &mut data.stats.vals, - &dummy_res, - ); + self.update_stats_out(&dummy_res); } } - let _ = data; - drop(lock.take()); - entry.hit_at(process_start); self.uft_hit_probe(dir, &flow_before, epoch, &process_start); @@ -1389,7 +1420,7 @@ impl Port { // We know the lock is dropped -- reacquire the lock to remove the flow. // Elevate lock to full scope, if we are reprocessing as well. if let Some(entry) = invalidated_tcp { - let mut local_lock = self.data.lock(); + let mut local_lock = self.data.write(); let flow_lock = entry.state().inner.lock(); let ufid_out = &flow_lock.outbound_ufid; @@ -1496,7 +1527,7 @@ impl Port { if !(reprocess && matches!(res, Ok(InternalProcessResult::Modified))) { - Self::update_stats_in(&mut data.stats.vals, &res); + self.update_stats_in(&res); } drop(lock); @@ -1515,7 +1546,7 @@ impl Port { if !(reprocess && matches!(res, Ok(InternalProcessResult::Modified))) { - Self::update_stats_out(&mut data.stats.vals, &res); + self.update_stats_out(&res); } drop(lock); @@ -1571,7 +1602,7 @@ impl Port { dir: Direction, id: RuleId, ) -> Result<()> { - let mut data = self.data.lock(); + let mut data = self.data.write(); check_state!(data.state, [PortState::Ready, PortState::Running])?; for layer in &mut data.layers { @@ -1624,7 +1655,7 @@ impl Port { in_rules: Vec>, out_rules: Vec>, ) -> Result<()> { - let mut data = self.data.lock(); + let mut data = self.data.write(); check_state!(data.state, [PortState::Ready, PortState::Running])?; for layer in &mut data.layers { @@ -1645,7 +1676,7 @@ impl Port { in_rules: Vec>, out_rules: Vec>, ) -> Result<()> { - let mut data = self.data.lock(); + let mut data = self.data.write(); check_state!(data.state, [PortState::Ready, PortState::Running])?; for layer in &mut data.layers { @@ -1661,14 +1692,14 @@ impl Port { /// Grab a snapshot of the port statistics. pub fn stats_snap(&self) -> PortStatsSnap { - self.data.lock().stats.vals.snapshot() + self.stats.vals.snapshot() } /// Return the [`TcpState`] of a given flow. #[cfg(any(feature = "test-help", test))] pub fn tcp_state(&self, flow: &InnerFlowId) -> Option { self.data - .lock() + .read() .tcp_flows .get(flow) .map(|entry| entry.state().tcp_state()) @@ -2286,7 +2317,7 @@ impl Port { ) -> result::Result { use Direction::In; - data.stats.vals.in_uft_miss += 1; + self.stats.vals.in_uft_miss.incr(1); let mut xforms = Transforms::new(); let res = self.layers_process(data, In, pkt, &mut xforms, ameta); match res { @@ -2485,7 +2516,7 @@ impl Port { ) -> result::Result { use Direction::Out; - data.stats.vals.out_uft_miss += 1; + self.stats.vals.out_uft_miss.incr(1); let mut tcp_closed = false; // For outbound traffic the TCP flow table must be checked @@ -2672,23 +2703,24 @@ impl Port { } fn update_stats_in( - stats: &mut PortStats, + &self, res: &result::Result, ) { + let stats = &self.stats.vals; match res { Ok(InternalProcessResult::Drop { reason }) => { - stats.in_drop += 1; + stats.in_drop.incr(1); match reason { - DropReason::HandlePkt => stats.in_drop_handle_pkt += 1, - DropReason::Layer { .. } => stats.in_drop_layer += 1, - DropReason::TcpErr => stats.in_drop_tcp_err += 1, + DropReason::HandlePkt => stats.in_drop_handle_pkt.incr(1), + DropReason::Layer { .. } => stats.in_drop_layer.incr(1), + DropReason::TcpErr => stats.in_drop_tcp_err.incr(1), } } - Ok(InternalProcessResult::Modified) => stats.in_modified += 1, + Ok(InternalProcessResult::Modified) => stats.in_modified.incr(1), - Ok(InternalProcessResult::Hairpin(_)) => stats.in_hairpin += 1, + Ok(InternalProcessResult::Hairpin(_)) => stats.in_hairpin.incr(1), // XXX We should split the different error types out into // individual stats. However, I'm not sure exactly how I @@ -2698,28 +2730,29 @@ impl Port { // to just have a top-level error counter in the // PortStats, and then also publisher LayerStats for each // layer along with the different error counts. - Err(_) => stats.in_process_err += 1, + Err(_) => stats.in_process_err.incr(1), } } fn update_stats_out( - stats: &mut PortStats, + &self, res: &result::Result, ) { + let stats = &self.stats.vals; match res { Ok(InternalProcessResult::Drop { reason }) => { - stats.out_drop += 1; + stats.out_drop.incr(1); match reason { - DropReason::HandlePkt => stats.out_drop_handle_pkt += 1, - DropReason::Layer { .. } => stats.out_drop_layer += 1, - DropReason::TcpErr => stats.out_drop_tcp_err += 1, + DropReason::HandlePkt => stats.out_drop_handle_pkt.incr(1), + DropReason::Layer { .. } => stats.out_drop_layer.incr(1), + DropReason::TcpErr => stats.out_drop_tcp_err.incr(1), } } - Ok(InternalProcessResult::Modified) => stats.out_modified += 1, + Ok(InternalProcessResult::Modified) => stats.out_modified.incr(1), - Ok(InternalProcessResult::Hairpin(_)) => stats.out_hairpin += 1, + Ok(InternalProcessResult::Hairpin(_)) => stats.out_hairpin.incr(1), // XXX We should split the different error types out into // individual stats. However, I'm not sure exactly how I @@ -2729,7 +2762,7 @@ impl Port { // to just have a top-level error counter in the // PortStats, and then also publisher LayerStats for each // layer along with the different error counts. - Err(_) => stats.out_process_err += 1, + Err(_) => stats.out_process_err.incr(1), } } } @@ -2749,14 +2782,14 @@ impl Port { /// Return the list of layer names. pub fn layers(&self) -> Vec { - self.data.lock().layers.iter().map(|l| l.name().to_string()).collect() + self.data.read().layers.iter().map(|l| l.name().to_string()).collect() } /// Get the number of flows currently in the layer and direction /// specified. The value `"uft"` can be used to get the number of /// UFT flows. pub fn num_flows(&self, layer: &str, dir: Direction) -> u32 { - let data = self.data.lock(); + let data = self.data.read(); use Direction::*; match (layer, dir) { @@ -2777,7 +2810,7 @@ impl Port { /// Return the number of rules registered for the given layer in /// the given direction. pub fn num_rules(&self, layer_name: &str, dir: Direction) -> usize { - let data = self.data.lock(); + let data = self.data.read(); data.layers .iter() .find(|layer| layer.name() == layer_name)