diff --git a/msim-tokio/src/sim/udp.rs b/msim-tokio/src/sim/udp.rs index 969e65b..e9c9022 100644 --- a/msim-tokio/src/sim/udp.rs +++ b/msim-tokio/src/sim/udp.rs @@ -83,8 +83,12 @@ impl UdpSocket { Ok(()) } - pub async fn ready(&self, _interest: Interest) -> io::Result { - todo!() + pub async fn ready(&self, interest: Interest) -> io::Result { + match interest { + Interest::READABLE => Ok(Ready::READABLE), + Interest::WRITABLE => Ok(Ready::WRITABLE), + _ => unimplemented!("unhandled interest flag {:?}", interest), + } } pub async fn writable(&self) -> io::Result<()> { diff --git a/msim/src/sim/net/mod.rs b/msim/src/sim/net/mod.rs index 24fd2da..18d29ff 100644 --- a/msim/src/sim/net/mod.rs +++ b/msim/src/sim/net/mod.rs @@ -36,11 +36,14 @@ //! ``` use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{HashMap, HashSet}, io, net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs}, os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}, - sync::{Arc, Mutex}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, + }, task::Context, }; use tap::TapFallible; @@ -70,7 +73,7 @@ pub struct NetSim { host_state: Mutex, rand: GlobalRng, time: TimeHandle, - next_tcp_id_map: Mutex>, + next_tcp_id: AtomicU32, // We always allocate new globally unique tcp id. } #[derive(Debug)] @@ -895,7 +898,8 @@ impl plugin::Simulator for NetSim { rand: rand.clone(), time: time.clone(), host_state: Default::default(), - next_tcp_id_map: Mutex::new(HashMap::new()), + // tcp ids start at 1, 0 is used for new connections (see poll_accept_internal) + next_tcp_id: AtomicU32::new(1), } } @@ -943,7 +947,7 @@ impl NetSim { let mut host_state = self.host_state.lock().unwrap(); host_state.delete_node(id); - // We do not reset self.next_tcp_id_map - we do not want to re-use tcp ids after a node is + // We do not reset self.next_tcp_id - we do not want to re-use tcp ids after a node is // restarted. } @@ -990,22 +994,9 @@ impl NetSim { self.time.sleep(delay).await; } - /// Get the next unused tcp id for this ip address. - pub fn next_tcp_id(&self, ip: IpAddr) -> u32 { - let mut map = self.next_tcp_id_map.lock().unwrap(); - match map.entry(ip) { - Entry::Occupied(mut cur) => { - let cur = cur.get_mut(); - // limited to 2^32 - 1 tcp sessions per ip per simulation run. - *cur = cur.checked_add(1).unwrap(); - *cur - } - Entry::Vacant(e) => { - // tcp ids start at 1, 0 is used for new connections (see poll_accept_internal) - e.insert(1); - 1 - } - } + /// Get the next unused tcp id. + pub fn next_tcp_id(&self) -> u32 { + self.next_tcp_id.fetch_add(1, Ordering::SeqCst) } } @@ -1106,7 +1097,13 @@ impl Endpoint { /// Allocate a new tcp id number for this node. Ids are never reused. pub fn allocate_local_tcp_id(&self) -> u32 { - let id = self.net.next_tcp_id(self.addr.ip()); + let id = self.net.next_tcp_id(); + trace!( + "Allocate local tcp id {} to node {} address {}", + id, + self.node, + self.addr.ip() + ); self.live_tcp_ids.lock().unwrap().insert(id); self.net .network diff --git a/msim/src/sim/task.rs b/msim/src/sim/task.rs index e24c1e1..07b2665 100644 --- a/msim/src/sim/task.rs +++ b/msim/src/sim/task.rs @@ -285,8 +285,13 @@ impl Executor { if let Some(restart_after) = panic_info.restart_after { let task = self.spawn_on_main_task(async move { crate::time::sleep(restart_after).await; - info!("restarting node {}", node_id); - runtime::Handle::current().restart(node_id); + let handle = runtime::Handle::current(); + // the node may have been deleted by the test harness + // before the restart timer fires. + if handle.task.get_node(node_id).is_some() { + info!("restarting node {}", node_id); + runtime::Handle::current().restart(node_id); + } }); task.fallible().detach();