-
Notifications
You must be signed in to change notification settings - Fork 0
/
eth.rs
318 lines (287 loc) · 11.4 KB
/
eth.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
use super::bpdu::{Bpdu, BpduBuf};
use anyhow::bail;
use pnet::{
datalink::{
self, Channel::Ethernet, Config, DataLinkReceiver, DataLinkSender, NetworkInterface,
},
packet::{ethernet::EthernetPacket, Packet},
util::MacAddr,
};
use std::{
cmp::Ordering,
collections::HashMap,
io::ErrorKind,
mem,
time::{Duration, Instant},
};
/// The state each port belonging to a switch can be in. Ports transition
/// to new states due to network events and initialization config.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum PortState {
/// The initial state. Packets aren't forwarded, but origins are added
/// to the forwarding table.
Learning,
/// This port is the switch's path to the root. All traffic is served.
Root,
/// This port is part of a loop. Only BPDU packets are accepted.
Block,
/// This port services other nodes' access to the root. All traffic is served.
Forward,
}
/// An ethernet port being served IO by the switch controller.
struct EthPort {
mac: MacAddr,
tx: Box<dyn DataLinkSender>,
state: PortState,
}
impl EthPort {
/// Builds a port handle that supports sending and receiving network
/// packets from an ethernet port. Every attempt at receiving a
/// packet blocks until a packet arries or `poll_timeout` has elapsed.
fn build(
intf: &NetworkInterface,
poll_timeout: Option<Duration>,
) -> anyhow::Result<(Self, Box<dyn DataLinkReceiver>)> {
let port_cfg = Config {
read_timeout: poll_timeout,
..Config::default()
};
let Ok(Ethernet(tx, rx)) = datalink::channel(&intf, port_cfg) else {
bail!("Failed to initialize an ethernet channel on interface: {intf:#?}");
};
let Some(mac) = intf.mac else {
bail!("Cannot create an eth port without a mac address");
};
Ok((
Self {
mac,
state: PortState::Learning,
tx,
},
rx,
))
}
/// If the given packet is for ethernet routing controls between switches,
/// returns the unpacked routing packet. Otherwise None.
/// Panics if the packet matches the BPDU mac address but cannot be serialized.
/// Such a case indicates a bug or some serious misunderstanding of the network.
fn try_routing<'a>(pkt: &'a EthernetPacket) -> Option<&'a Bpdu> {
if Bpdu::BPDU_MAC != pkt.get_destination() {
return None;
};
Some(bytemuck::from_bytes(pkt.payload()))
}
}
/// A configurable self-learning ethernet switch that supports STP.
pub struct EthSwitch {
ports: Vec<EthPort>,
inbound: Vec<Box<dyn DataLinkReceiver>>,
switch_id: MacAddr,
curr_bpdu: Bpdu,
bpdu_buf: BpduBuf,
bpdu_resend_timeout: Duration,
last_resent_bpdu: Instant,
/// MacAddr is the destination mac, and the value usize is an
/// index into the egress table.
fwd_table: HashMap<MacAddr, usize>,
}
impl EthSwitch {
/// Queries ethernet interfaces and opens read/write connections with all
/// mininet ports. Assigns a mac address to represent the whole switch and
/// establishes an initial Bpdu for this switch.
pub fn build(
switch_name: &str,
bpdu_resend_timeout: Duration,
eth_poll_timeout: Option<Duration>,
) -> anyhow::Result<Self> {
let interfaces = datalink::interfaces();
let mut ports = Vec::with_capacity(interfaces.len());
let mut inbound = Vec::with_capacity(interfaces.len());
let mut switch_id = MacAddr::broadcast();
// Note: Port egress and ingress are separated because simultanous mutable
// borrows to both the tx and rx are often needed. This enables single-copy
// routing from the inflow to the outflow ethernet buffer.
// Only watch ethernet ports that follow the mininet naming convention.
let mn_name = format!("{switch_name}-eth");
for intf in datalink::interfaces()
.iter()
.filter(|intf| intf.name.contains(&mn_name))
{
let (port, port_rx) = EthPort::build(intf, eth_poll_timeout)?;
switch_id = switch_id.min(port.mac);
ports.push(port);
inbound.push(port_rx);
}
if switch_id == MacAddr::broadcast() {
bail!("Failed to identify any viable interfaces for this switch");
}
Ok(EthSwitch {
ports,
inbound,
switch_id,
curr_bpdu: Bpdu::new(0, switch_id, switch_id),
bpdu_buf: Bpdu::make_buf(),
bpdu_resend_timeout,
last_resent_bpdu: Instant::now()
.checked_sub(bpdu_resend_timeout)
.unwrap_or_else(|| Instant::now()),
fwd_table: HashMap::new(),
})
}
/// Runs packet control and forwarding as long as the network is live.
/// Startup duration is the amount of time switches spend learning the
/// topology and negotiating the spanning tree before beginning to route
/// host packets. Recommended between 500 ms and 2 seconds.
pub fn run(mut self, startup_duration: Duration) -> anyhow::Result<()> {
// Separating inbound tx from self allows self to be mutably borrowed
// within the polling loop.
let mut inbound = mem::take(&mut self.inbound);
assert_eq!(inbound.len(), self.ports.len());
let time_entered = Instant::now();
let mut init_phase = true;
loop {
if init_phase && time_entered.elapsed() > startup_duration {
for port in &mut self.ports {
// Assume by now that all ports that aren't otherwise assigned
// are either silent or hosts.
if port.state == PortState::Learning {
port.state = PortState::Forward;
}
}
init_phase = false;
}
if self.bpdu_resend_timeout < self.last_resent_bpdu.elapsed() {
self.broadcast_bpdu();
self.last_resent_bpdu = Instant::now();
}
for (portnum_in, rx) in inbound.iter_mut().enumerate() {
let bytes = match rx.next() {
Ok(p) => p,
Err(e) => {
if e.kind() == ErrorKind::TimedOut {
continue;
}
bail!("Exiting on io error: {e:#?}");
}
};
let Some(eth_pkt) = EthernetPacket::new(bytes) else {
eprintln!("Failed to parse packet: {bytes:#?}");
continue;
};
let Some(neighbor) = EthPort::try_routing(ð_pkt) else {
self.fwd_client(portnum_in, ð_pkt);
continue;
};
// first take the smaller root id
// then take the shortest path to the smallest root id
let agree_on_root = match neighbor.root_id().cmp(&self.curr_bpdu.root_id()) {
Ordering::Less => {
self.reset_root(portnum_in, neighbor, ð_pkt);
self.broadcast_bpdu();
continue;
}
Ordering::Greater => {
self.broadcast_bpdu();
continue;
}
Ordering::Equal => true,
};
assert!(
agree_on_root,
"The code below only applies to switches that already agree on the root"
);
match (neighbor.cost() + 1).cmp(&self.curr_bpdu.cost()) {
Ordering::Less => {
self.reset_root(portnum_in, neighbor, ð_pkt);
self.broadcast_bpdu();
}
Ordering::Equal => {
let port = &mut self.ports[portnum_in];
if port.state != PortState::Root {
port.state = PortState::Block;
}
}
Ordering::Greater => {
self.ports[portnum_in].state = if neighbor.bridge_id() == self.switch_id {
PortState::Forward
} else {
PortState::Block
};
}
};
}
}
}
/// Sends the packet to the given outbound transmitter.
/// The packet is copied directly into the send buffer.
fn send(tx: &mut Box<dyn DataLinkSender>, pkt: &EthernetPacket) {
tx.build_and_send(1, pkt.packet().len(), &mut |outbound| {
outbound.clone_from_slice(pkt.packet());
});
}
/// Forwards client packets (not bpdu) using the forwarding table.
/// Learns source/port pairs when possible.
fn fwd_client(&mut self, portnum_in: usize, eth_pkt: &EthernetPacket) {
assert_ne!(
eth_pkt.get_destination(),
Bpdu::BPDU_MAC,
"These should only be host to host packets"
);
let inbound_state = self.ports[portnum_in].state;
if inbound_state == PortState::Block {
eprintln!("Denied client packet on a blocked port: {eth_pkt:#?}");
return;
};
// self learning
*self.fwd_table.entry(eth_pkt.get_source()).or_default() = portnum_in;
if inbound_state == PortState::Learning {
// No forwarding during learning
return;
}
// forward to known destination
if let Some(next_hop) = self.fwd_table.get(ð_pkt.get_destination()) {
let port = &mut self.ports[*next_hop];
assert_ne!(
port.state,
PortState::Block,
"The forwarding table shouldn't suggest blocked ports."
);
Self::send(&mut port.tx, eth_pkt);
return;
}
// flood to unknown destination
for (portnum_out, port) in self.ports.iter_mut().enumerate() {
if portnum_out == portnum_in {
continue;
}
match port.state {
PortState::Block | PortState::Learning => continue,
PortState::Root | PortState::Forward => Self::send(&mut port.tx, eth_pkt),
};
}
}
/// Makes a control packet with the current bpdu and sends it to all neighbors
/// (including blocked neighbors).
fn broadcast_bpdu(&mut self) {
let pkt = self
.curr_bpdu
.make_packet(&mut self.bpdu_buf, self.switch_id);
for port in &mut self.ports {
Self::send(&mut port.tx, &pkt);
}
}
/// Blocks the current root port, replacing it with the new root.
/// Also overwrites the current bpdu with the neighbor's cost-adjusted bpdu.
fn reset_root(&mut self, new_root: usize, neighbor: &Bpdu, pkt: &EthernetPacket) {
for (port_num, port) in self.ports.iter_mut().enumerate() {
if port_num == new_root {
port.state = PortState::Root;
continue;
}
if port.state == PortState::Root {
port.state = PortState::Block;
}
}
self.curr_bpdu = Bpdu::new(neighbor.cost() + 1, neighbor.root_id(), pkt.get_source());
}
}