Skip to content

Commit

Permalink
Fixed some issues in TCP. (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
cying17 authored Feb 3, 2024
1 parent c045e8a commit 8a52dbf
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 91 deletions.
41 changes: 23 additions & 18 deletions examples/fattree.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@

env = simpy.Environment()

n_flows = 512
k = 16
pir = 100000
n_flows = 200
finish_time = 10.0
k = 32
pir = 1000000000 # 1Gbps
buffer_size = 1000
mean_pkt_size = 1500.0


def size_dist():
return 1500
return 1024


def arrival_dist():
return 1.0
return 0.0008 # 10Mbps


ft = build_fattree(k)
Expand All @@ -37,15 +37,17 @@ def arrival_dist():
all_flows = generate_flows(ft, hosts, n_flows)

for fid in all_flows:
pg = DistPacketGenerator(env, f"Flow_{fid}", arrival_dist, size_dist, flow_id=fid)
pg = DistPacketGenerator(
env, f"Flow_{fid}", arrival_dist, size_dist, finish=finish_time, flow_id=fid
)
ps = PacketSink(env)

all_flows[fid].pkt_gen = pg
all_flows[fid].pkt_sink = ps

ft = generate_fib(ft, all_flows)

n_classes_per_port = 4
n_classes_per_port = n_flows
weights = {c: 1 for c in range(n_classes_per_port)}


Expand All @@ -56,12 +58,15 @@ def flow_to_classes(packet, n_id=0, fib=None):
for node_id in ft.nodes():
node = ft.nodes[node_id]
flow_classes = partial(flow_to_classes, n_id=node_id, fib=node["flow_to_port"])
# node["device"] = FairPacketSwitch(
# env, k, pir, buffer_size, weights, "DRR", flow_classes, element_id=f"{node_id}"
# )
node["device"] = SimplePacketSwitch(
env, k, pir, buffer_size, element_id=f"{node_id}"

node["device"] = FairPacketSwitch(
env, k, pir, buffer_size, weights, "DRR", flow_classes, element_id=f"{node_id}"
)

# node["device"] = SimplePacketSwitch(
# env, k, pir, buffer_size, element_id=f"{node_id}"
# )

node["device"].demux.fib = node["flow_to_port"]

for n in ft.nodes():
Expand All @@ -75,8 +80,8 @@ def flow_to_classes(packet, n_id=0, fib=None):

env.run(until=1000)

for flow_id in sample(sorted(all_flows.keys()), 5):
print(f"Flow {flow_id}")
print(all_flows[flow_id].pkt_sink.waits)
print(all_flows[flow_id].pkt_sink.arrivals)
print(all_flows[flow_id].pkt_sink.perhop_times)
# for flow_id in sample(sorted(all_flows.keys()), 5):
# print(f"Flow {flow_id}")
# print(all_flows[flow_id].pkt_sink.waits)
# print(all_flows[flow_id].pkt_sink.arrivals)
# print(all_flows[flow_id].pkt_sink.perhop_times)
10 changes: 5 additions & 5 deletions examples/fattree_tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def flow_to_classes(packet, n_id=0, fib=None):

env.run(until=100)

for flow_id in sample(sorted(all_flows.keys()), 5):
print(f"Flow {flow_id}")
# print(all_flows[flow_id].pkt_sink.waits)
# print(all_flows[flow_id].pkt_sink.arrivals)
# print(all_flows[flow_id].pkt_sink.perhop_times)
# for flow_id in sample(sorted(all_flows.keys()), 5):
# print(f"Flow {flow_id}")
# print(all_flows[flow_id].pkt_sink.waits)
# print(all_flows[flow_id].pkt_sink.arrivals)
# print(all_flows[flow_id].pkt_sink.perhop_times)
84 changes: 41 additions & 43 deletions ns/packet/tcp_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,9 @@ def put(self, ack):
self.congestion_control.dupack_over()
self.dupack = 0

if self.dupack == 3:
self.congestion_control.consecutive_dupacks_received()

print("!!!", self.element_id, self.sent_packets)
if self.dupack >= 3:
if self.dupack == 3:
self.congestion_control.consecutive_dupacks_received()

resent_pkt = self.sent_packets[ack.ack]
resent_pkt.time = self.env.now
Expand All @@ -196,50 +195,49 @@ def put(self, ack):

self.out.put(resent_pkt)

return
elif self.dupack > 3:
self.congestion_control.more_dupacks_received()

if self.last_ack + self.congestion_control.cwnd >= ack.ack:
packet = Packet(
self.env.now,
self.mss,
self.next_seq,
src=self.flow.src,
flow_id=self.flow.fid,
)
if self.dupack > 3:
self.congestion_control.more_dupacks_received()

self.sent_packets[packet.packet_id] = packet
if self.last_ack + self.congestion_control.cwnd >= ack.ack:
packet = Packet(
self.env.now,
self.mss,
self.next_seq,
src=self.flow.src,
flow_id=self.flow.fid,
)

if self.debug:
print(
"TCPPacketGenerator {:d} sent packet {:d} with size {:d}, "
"flow_id {:d} at time {:.4f} as dupack > 3.".format(
self.element_id,
packet.packet_id,
packet.size,
packet.flow_id,
self.env.now,
self.sent_packets[packet.packet_id] = packet

if self.debug:
print(
"TCPPacketGenerator {:d} sent packet {:d} with size {:d}, "
"flow_id {:d} at time {:.4f} as dupack > 3.".format(
self.element_id,
packet.packet_id,
packet.size,
packet.flow_id,
self.env.now,
)
)
)

self.out.put(packet)
self.out.put(packet)

self.next_seq += packet.size
self.timers[packet.packet_id] = Timer(
self.env,
timer_id=packet.packet_id,
timeout_callback=self.timeout_callback,
timeout=self.rto,
)
self.next_seq += packet.size
self.timers[packet.packet_id] = Timer(
self.env,
timer_id=packet.packet_id,
timeout_callback=self.timeout_callback,
timeout=self.rto,
)

if self.debug:
print(
"TCPPacketGenerator {:d} is setting a timer for packet {:d} with an RTO"
" of {:.4f}.".format(
self.element_id, packet.packet_id, self.rto
if self.debug:
print(
"TCPPacketGenerator {:d} is setting a timer for packet {:d} with an RTO"
" of {:.4f}.".format(
self.element_id, packet.packet_id, self.rto
)
)
)

return

Expand Down Expand Up @@ -273,8 +271,8 @@ def put(self, ack):
# first duplicate ACK, if any
acked_packets = [
packet_id
for packet_id, _ in self.timers.items()
if packet_id <= ack.packet_id
for packet_id, _ in self.sent_packets.items()
if packet_id < ack.ack
]
for packet_id in acked_packets:
if self.debug:
Expand Down
47 changes: 22 additions & 25 deletions ns/packet/tcp_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@


class TCPSink(PacketSink):
""" A TCPSink inherits from the basic PacketSink, and sends ack packets back to the
"""A TCPSink inherits from the basic PacketSink, and sends ack packets back to the
TCPPacketGenerator with advertised receive window sizes.
"""
def __init__(self,
env,
rec_arrivals: bool = True,
absolute_arrivals: bool = True,
rec_waits: bool = True,
rec_flow_ids: bool = True,
debug: bool = False,
element_id: int = 0):
super().__init__(env, rec_arrivals, absolute_arrivals, rec_waits,
rec_flow_ids, debug)

def __init__(
self,
env,
rec_arrivals: bool = True,
absolute_arrivals: bool = True,
rec_waits: bool = True,
rec_flow_ids: bool = True,
debug: bool = False,
element_id: int = 0,
):
super().__init__(
env, rec_arrivals, absolute_arrivals, rec_waits, rec_flow_ids, debug
)
self.recv_buffer = []
# the next sequence number expected to be received
self.next_seq_expected = 0
Expand All @@ -30,8 +34,8 @@ def packet_arrived(self, packet):
Insert the packet into the receive buffer, which is a priority queue
that is sorted based on the sequence number of the packet (packet_id).
"""
self.recv_buffer.append(
[packet.packet_id, packet.packet_id + packet.size])

self.recv_buffer.append([packet.packet_id, packet.packet_id + packet.size])

self.recv_buffer.sort()

Expand All @@ -44,20 +48,12 @@ def packet_arrived(self, packet):
self.recv_buffer = merged_stats

def put(self, packet):
""" Sends a packet to this element. """
"""Sends a packet to this element."""
super().put(packet)

self.packet_arrived(packet)

if len(self.recv_buffer) == 1:
# in-order delivery: all data up to but not including
# `next_seq_expected' have been received
self.next_seq_expected = max(packet.packet_id + packet.size, self.next_seq_expected)
else:
# out-of-order delivery or retransmissions: needs
# to go through the receive buffer and find out
# what the last in-order packet's sequence number is
self.next_seq_expected = max(self.recv_buffer[0][1], self.next_seq_expected)
self.next_seq_expected = self.recv_buffer[0][1]

# a TCP sink needs to send ack packets back to the TCP packet generator
assert self.out is not None
Expand All @@ -66,8 +62,9 @@ def put(self, packet):
packet.time, # used for calculating RTT at the sender
size=40, # default size of the ack packet
packet_id=packet.packet_id,
flow_id=packet.flow_id + 10000)

flow_id=packet.flow_id + 10000,
)

# assert packet.delivered_time > 0
acknowledgment.ack = self.next_seq_expected
acknowledgment.delivered_time = packet.delivered_time
Expand Down

0 comments on commit 8a52dbf

Please sign in to comment.