Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paced Flow Manager #187

Merged
merged 13 commits into from
Jul 31, 2024
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ file.
[UNRELEASED] - Under development
********************************

Added
=====
- Added pacing for sending flow mods. Pacing can be configured in ``ACTION_PACES`` in the NApp's ``setings.py`` file.

General Information
===================
- Added new script ``scripts/db/2024.1.0/000_hard_delete_old.py``, it's a general purpose script to hard delete flows that have been soft deleted before a given string UTC datetime.
Expand Down
40 changes: 27 additions & 13 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from kytos.core import KytosEvent, KytosNApp, log, rest
from kytos.core.helpers import listen_to, now
from kytos.core.pacing import PacerWrapper
from kytos.core.rest_api import (
HTTPException,
JSONResponse,
Expand All @@ -39,6 +40,7 @@
SwitchNotConnectedError,
)
from .settings import (
ACTION_PACES,
CONN_ERR_MAX_RETRIES,
CONN_ERR_MIN_WAIT,
CONN_ERR_MULTIPLIER,
Expand Down Expand Up @@ -100,6 +102,9 @@ def setup(self):
self._flow_mods_retry_count_lock = Lock()
self.resent_flows = set()

self.pacer = PacerWrapper("flow_manager", self.controller.pacer)
self.pacer.inject_config(ACTION_PACES)

@staticmethod
def get_flow_controller() -> FlowController:
"""Get FlowController."""
Expand Down Expand Up @@ -204,7 +209,7 @@ def _on_ofpt_barrier_reply(self, event):
flows = []
with self._flow_mods_sent_lock:
for flow_xid in flow_xids:
flow, cmd = self._flow_mods_sent[flow_xid]
flow, cmd, _ = self._flow_mods_sent[flow_xid]
if (
cmd != "add"
or flow_xid not in self._flow_mods_sent
Expand Down Expand Up @@ -288,7 +293,7 @@ def _retry_on_openflow_connection_error(

try:
xid = int(event.message.header.xid)
flow, command = self._flow_mods_sent[xid]
flow, command, owner = self._flow_mods_sent[xid]
except KeyError:
raise ValueError(
f"Aborting retries, xid: {xid} not found on flow mods sent"
Expand Down Expand Up @@ -324,7 +329,7 @@ def _retry_on_openflow_connection_error(
)
flow_mod = build_flow_mod_from_command(flow, command)
flow_mod.header.xid = xid
self._send_flow_mod(flow.switch, flow_mod)
self._send_flow_mod(flow.switch, flow_mod, owner)
if send_barrier:
self._send_barrier_request(flow.switch, [flow_mod])
return True
Expand Down Expand Up @@ -721,7 +726,7 @@ def _install_flows(
reraise_conn: True to reraise switch connection errors
send_barrier: True to send barrier_request
"""
flow_mods, flows, flow_dicts = [], [], []
flow_mods, flows, flow_dicts, owners = [], [], [], []
for switch in switches:
serializer = FlowFactory.get_class(switch, Flow04)
flows_list = flows_dict.get("flows", [])
Expand All @@ -738,8 +743,9 @@ def _install_flows(
flow_mod.pack()
flow_mods.append(flow_mod)
flows.append(flow)
owners.append(flow_dict.get("owner", "no_owner"))
flow_dicts.append(
{**{"flow": flow_dict}, **{"flow_id": flow.id, "switch": switch.id}}
{"flow": flow_dict, "flow_id": flow.id, "switch": switch.id}
)
if save and command == "add":
self.flow_controller.upsert_flows(
Expand All @@ -749,21 +755,24 @@ def _install_flows(
self.delete_matched_flows(
flow_dicts, {switch.id: switch for switch in switches}
)
self._send_flow_mods(switches, flow_mods, flows, reraise_conn, send_barrier)
self._send_flow_mods(
switches, flow_mods, flows, owners, reraise_conn, send_barrier
)

def _send_flow_mods(
self,
switches,
flow_mods,
flows,
owners,
reraise_conn=True,
send_barrier=ENABLE_BARRIER_REQUEST,
):
"""Send FlowMod (and BarrierRequest) given a list of flow_dicts to switches."""
for switch in switches:
for i, (flow_mod, flow) in enumerate(zip(flow_mods, flows)):
for i, (flow_mod, flow, owner) in enumerate(zip(flow_mods, flows, owners)):
try:
self._send_flow_mod(switch, flow_mod)
self._send_flow_mod(switch, flow_mod, owner)
if send_barrier and i == len(flow_mods) - 1:
self._send_barrier_request(switch, flow_mods)
except SwitchNotConnectedError:
Expand All @@ -774,15 +783,16 @@ def _send_flow_mods(
flow_mod.header.xid,
flow,
build_command_from_flow_mod(flow_mod),
owner,
)
self._send_napp_event(switch, flow, "pending")
return flow_mods

def _add_flow_mod_sent(self, xid, flow, command):
def _add_flow_mod_sent(self, xid, flow, command, owner):
"""Add the flow mod to the list of flow mods sent."""
if len(self._flow_mods_sent) >= self._flow_mods_sent_max_size:
self._flow_mods_sent.popitem(last=False)
self._flow_mods_sent[xid] = (flow, command)
self._flow_mods_sent[xid] = (flow, command, owner)
viniarck marked this conversation as resolved.
Show resolved Hide resolved

def _add_barrier_request(self, dpid, barrier_xid, flow_mods):
"""Add a barrier request."""
Expand Down Expand Up @@ -812,7 +822,11 @@ def _send_barrier_request(self, switch, flow_mods):
)
self.controller.buffers.msg_out.put(event)

def _send_flow_mod(self, switch, flow_mod):
def _send_flow_mod(self, switch, flow_mod, owner):
owner_pacer = f"send_flow_mod.{owner}"
if not self.pacer.is_configured(owner_pacer):
owner_pacer = "send_flow_mod.no_owner"
self.pacer.hit(owner_pacer, switch.dpid)
if not switch.is_connected():
raise SwitchNotConnectedError(
f"switch {switch.id} isn't connected", flow_mod
Expand Down Expand Up @@ -859,7 +873,7 @@ def _send_openflow_connection_error(self, event):
switch = event.content["destination"].switch
flow = event.message
try:
_, error_command = self._flow_mods_sent[event.message.header.xid]
_, error_command, _ = self._flow_mods_sent[event.message.header.xid]
except KeyError:
error_command = "unknown"
error_kwargs = {
Expand Down Expand Up @@ -892,7 +906,7 @@ def handle_errors(self, event):
return
xid = message.header.xid.value
try:
flow, error_command = self._flow_mods_sent[xid]
flow, error_command, _ = self._flow_mods_sent[xid]
except KeyError:
pass
else:
Expand Down
30 changes: 30 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,33 @@
to be at least greater than FLOW_STATS and ideally it slightly greater than
whichever longest network convergence FlowMods operations that your network has.
"""

viniarck marked this conversation as resolved.
Show resolved Hide resolved
# Rate limits for sending flow mods this can be set per NApp,
# and the pacing is per DPID when in the context of that NApp
# The NApp that the flow pertains to is determined via the `owner` attribute of the flow.
ACTION_PACES = {
viniarck marked this conversation as resolved.
Show resolved Hide resolved
"send_flow_mod.no_owner": {
"pace": "100/second",
"strategy": "fixed_window",
},
"send_flow_mod.mef_eline": {
"pace": "100/second",
"strategy": "fixed_window",
},
"send_flow_mod.of_multi_table": {
"pace": "100/second",
"strategy": "fixed_window",
},
"send_flow_mod.telemetry_int": {
"pace": "100/second",
viniarck marked this conversation as resolved.
Show resolved Hide resolved
"strategy": "fixed_window",
},
"send_flow_mod.of_lldp": {
viniarck marked this conversation as resolved.
Show resolved Hide resolved
"pace": "100/second",
"strategy": "fixed_window",
},
"send_flow_mod.coloring": {
"pace": "100/second",
"strategy": "fixed_window",
},
}
10 changes: 5 additions & 5 deletions tests/integration/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ def test_add_flow_mod_sent_ok(self):
xid = "12345"
command = "add"
initial_len = len(self.napp._flow_mods_sent)
self.napp._add_flow_mod_sent(xid, flow, command)
self.napp._add_flow_mod_sent(xid, flow, command, "no_owner")

assert len(self.napp._flow_mods_sent) == initial_len + 1
assert self.napp._flow_mods_sent.get(xid, None) == (flow, command)
assert self.napp._flow_mods_sent.get(xid, None) == (flow, command, "no_owner")

def test_add_flow_mod_sent_overlimit(self):
self.napp._flow_mods_sent_max_size = 5
Expand All @@ -41,12 +41,12 @@ def test_add_flow_mod_sent_overlimit(self):
while len(self.napp._flow_mods_sent) < 5:
xid += "1"
flow = Mock()
self.napp._add_flow_mod_sent(xid, flow, command)
self.napp._add_flow_mod_sent(xid, flow, command, "no_owner")

xid = "90876"
flow = Mock()
initial_len = len(self.napp._flow_mods_sent)
self.napp._add_flow_mod_sent(xid, flow, command)
self.napp._add_flow_mod_sent(xid, flow, command, "no_owner")

assert len(self.napp._flow_mods_sent) == initial_len
assert self.napp._flow_mods_sent.get(xid, None) == (flow, command)
assert self.napp._flow_mods_sent.get(xid, None) == (flow, command, "no_owner")
39 changes: 24 additions & 15 deletions tests/unit/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,14 +485,21 @@ def test_install_flows(self, *args):
serializer.from_dict.return_value = flow
mock_flow_factory.return_value = serializer

flows_dict = {"flows": [MagicMock(), MagicMock()]}
flows_dict = {
"flows": [
MagicMock(get=lambda x, y=None: y),
MagicMock(get=lambda x, y=None: y),
]
}
switches = [self.switch_01]
self.napp._install_flows("add", flows_dict, switches)

mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod)
mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod, "no_owner")
assert mock_send_flow_mod.call_count == len(flows_dict["flows"])
assert mock_send_barrier_request.call_count == 1
mock_add_flow_mod_sent.assert_called_with(flow_mod.header.xid, flow, "add")
mock_add_flow_mod_sent.assert_called_with(
flow_mod.header.xid, flow, "add", "no_owner"
)
mock_send_napp_event.assert_called_with(self.switch_01, flow, "pending")
self.napp.flow_controller.upsert_flows.assert_called()

Expand All @@ -519,13 +526,13 @@ def test_install_flows_with_delete_strict(self, *args):
serializer.from_dict.return_value = flow
mock_flow_factory.return_value = serializer

flows_dict = {"flows": [MagicMock()]}
flows_dict = {"flows": [MagicMock(get=lambda x, y=None: y)]}
switches = [self.switch_01]
self.napp._install_flows("delete_strict", flows_dict, switches)

mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod)
mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod, "no_owner")
mock_add_flow_mod_sent.assert_called_with(
flow_mod.header.xid, flow, "delete_strict"
flow_mod.header.xid, flow, "delete_strict", "no_owner"
)
mock_send_napp_event.assert_called_with(self.switch_01, flow, "pending")
mock_send_barrier_request.assert_called()
Expand Down Expand Up @@ -676,9 +683,9 @@ def test_add_flow_mod_sent(self):
xid = 0
flow = MagicMock()

self.napp._add_flow_mod_sent(xid, flow, "add")
self.napp._add_flow_mod_sent(xid, flow, "add", "no_owner")

assert self.napp._flow_mods_sent[xid] == (flow, "add")
assert self.napp._flow_mods_sent[xid] == (flow, "add", "no_owner")

def test_send_flow_mod(self):
"""Test _send_flow_mod method."""
Expand All @@ -687,7 +694,7 @@ def test_send_flow_mod(self):
switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
flow_mod = MagicMock()

self.napp._send_flow_mod(switch, flow_mod)
self.napp._send_flow_mod(switch, flow_mod, "no_owner")

mock_buffers_put.assert_called()

Expand All @@ -699,7 +706,7 @@ def test_send_flow_mod_error(self, mock_buffers_put):
flow_mod = MagicMock()

with pytest.raises(SwitchNotConnectedError):
self.napp._send_flow_mod(switch, flow_mod)
self.napp._send_flow_mod(switch, flow_mod, "no_owner")

mock_buffers_put.assert_not_called()

Expand All @@ -722,7 +729,7 @@ def test_handle_errors(self, mock_send_napp_event):
flow.id = "1"
flow.as_dict.return_value = {}
flow.cookie = 0
self.napp._flow_mods_sent[0] = (flow, "add")
self.napp._flow_mods_sent[0] = (flow, "add", "no_owner")

switch = get_switch_mock("00:00:00:00:00:00:00:01")
switch.connection = get_connection_mock(
Expand Down Expand Up @@ -1086,7 +1093,9 @@ def test_on_ofpt_barrier_reply(self, mock_publish):

barrier_xid = list(self.napp._pending_barrier_reply[switch.id].keys())[-1]
for flow_mod in flow_mods:
self.napp._add_flow_mod_sent(flow_mod.header.xid, flow_mod, "add")
self.napp._add_flow_mod_sent(
flow_mod.header.xid, flow_mod, "add", "no_owner"
)

event = MagicMock()
event.message.header.xid = barrier_xid
Expand Down Expand Up @@ -1157,7 +1166,7 @@ def test_retry_on_openflow_connection_error(self, mock_barrier):
flow.as_dict.return_value = {}
flow.header.message_type = Type.OFPT_FLOW_MOD
flow.xid = 1
self.napp._flow_mods_sent[flow.xid] = (flow, "add")
self.napp._flow_mods_sent[flow.xid] = (flow, "add", "no_owner")

mock_ev = MagicMock()
mock_ev.message = flow
Expand Down Expand Up @@ -1187,7 +1196,7 @@ def test_retry_on_openflow_connection_error_send_event(self, mock_send):
flow.as_dict.return_value = {}
flow.header.message_type = Type.OFPT_FLOW_MOD
flow.xid = 1
self.napp._flow_mods_sent[flow.xid] = (flow, "add")
self.napp._flow_mods_sent[flow.xid] = (flow, "add", "no_owner")

# make sure a previous retry has stored executed
self.napp._flow_mods_retry_count[flow.xid] = (3, now(), 10)
Expand Down Expand Up @@ -1241,7 +1250,7 @@ def test_send_openflow_connection_error(self, mock_send):
flow = MagicMock()
flow.as_dict.return_value = {}
flow.xid = 1
self.napp._flow_mods_sent[flow.xid] = (flow, "add")
self.napp._flow_mods_sent[flow.xid] = (flow, "add", "no_owner")

mock_ev = MagicMock()
mock_ev.event.content = {"destination": switch}
Expand Down