Skip to content

Commit

Permalink
Merge pull request #187 from kytos-ng/paced/flow_manager
Browse files Browse the repository at this point in the history
Paced Flow Manager
  • Loading branch information
Ktmi authored Jul 31, 2024
2 parents 3c406c1 + a44a8d3 commit 2876238
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 33 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ file.
[UNRELEASED] - Under development
********************************

Added
=====
- Added pacing for sending flow mods. Pacing can be configured in ``ACTION_PACES`` in the NApp's ``setings.py`` file.

General Information
===================
- Added new script ``scripts/db/2024.1.0/000_hard_delete_old.py``, it's a general purpose script to hard delete flows that have been soft deleted before a given string UTC datetime.
Expand Down
40 changes: 27 additions & 13 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from kytos.core import KytosEvent, KytosNApp, log, rest
from kytos.core.helpers import listen_to, now
from kytos.core.pacing import PacerWrapper
from kytos.core.rest_api import (
HTTPException,
JSONResponse,
Expand All @@ -39,6 +40,7 @@
SwitchNotConnectedError,
)
from .settings import (
ACTION_PACES,
CONN_ERR_MAX_RETRIES,
CONN_ERR_MIN_WAIT,
CONN_ERR_MULTIPLIER,
Expand Down Expand Up @@ -100,6 +102,9 @@ def setup(self):
self._flow_mods_retry_count_lock = Lock()
self.resent_flows = set()

self.pacer = PacerWrapper("flow_manager", self.controller.pacer)
self.pacer.inject_config(ACTION_PACES)

@staticmethod
def get_flow_controller() -> FlowController:
"""Get FlowController."""
Expand Down Expand Up @@ -204,7 +209,7 @@ def _on_ofpt_barrier_reply(self, event):
flows = []
with self._flow_mods_sent_lock:
for flow_xid in flow_xids:
flow, cmd = self._flow_mods_sent[flow_xid]
flow, cmd, _ = self._flow_mods_sent[flow_xid]
if (
cmd != "add"
or flow_xid not in self._flow_mods_sent
Expand Down Expand Up @@ -288,7 +293,7 @@ def _retry_on_openflow_connection_error(

try:
xid = int(event.message.header.xid)
flow, command = self._flow_mods_sent[xid]
flow, command, owner = self._flow_mods_sent[xid]
except KeyError:
raise ValueError(
f"Aborting retries, xid: {xid} not found on flow mods sent"
Expand Down Expand Up @@ -324,7 +329,7 @@ def _retry_on_openflow_connection_error(
)
flow_mod = build_flow_mod_from_command(flow, command)
flow_mod.header.xid = xid
self._send_flow_mod(flow.switch, flow_mod)
self._send_flow_mod(flow.switch, flow_mod, owner)
if send_barrier:
self._send_barrier_request(flow.switch, [flow_mod])
return True
Expand Down Expand Up @@ -721,7 +726,7 @@ def _install_flows(
reraise_conn: True to reraise switch connection errors
send_barrier: True to send barrier_request
"""
flow_mods, flows, flow_dicts = [], [], []
flow_mods, flows, flow_dicts, owners = [], [], [], []
for switch in switches:
serializer = FlowFactory.get_class(switch, Flow04)
flows_list = flows_dict.get("flows", [])
Expand All @@ -738,8 +743,9 @@ def _install_flows(
flow_mod.pack()
flow_mods.append(flow_mod)
flows.append(flow)
owners.append(flow_dict.get("owner", "no_owner"))
flow_dicts.append(
{**{"flow": flow_dict}, **{"flow_id": flow.id, "switch": switch.id}}
{"flow": flow_dict, "flow_id": flow.id, "switch": switch.id}
)
if save and command == "add":
self.flow_controller.upsert_flows(
Expand All @@ -749,21 +755,24 @@ def _install_flows(
self.delete_matched_flows(
flow_dicts, {switch.id: switch for switch in switches}
)
self._send_flow_mods(switches, flow_mods, flows, reraise_conn, send_barrier)
self._send_flow_mods(
switches, flow_mods, flows, owners, reraise_conn, send_barrier
)

def _send_flow_mods(
self,
switches,
flow_mods,
flows,
owners,
reraise_conn=True,
send_barrier=ENABLE_BARRIER_REQUEST,
):
"""Send FlowMod (and BarrierRequest) given a list of flow_dicts to switches."""
for switch in switches:
for i, (flow_mod, flow) in enumerate(zip(flow_mods, flows)):
for i, (flow_mod, flow, owner) in enumerate(zip(flow_mods, flows, owners)):
try:
self._send_flow_mod(switch, flow_mod)
self._send_flow_mod(switch, flow_mod, owner)
if send_barrier and i == len(flow_mods) - 1:
self._send_barrier_request(switch, flow_mods)
except SwitchNotConnectedError:
Expand All @@ -774,15 +783,16 @@ def _send_flow_mods(
flow_mod.header.xid,
flow,
build_command_from_flow_mod(flow_mod),
owner,
)
self._send_napp_event(switch, flow, "pending")
return flow_mods

def _add_flow_mod_sent(self, xid, flow, command):
def _add_flow_mod_sent(self, xid, flow, command, owner):
"""Add the flow mod to the list of flow mods sent."""
if len(self._flow_mods_sent) >= self._flow_mods_sent_max_size:
self._flow_mods_sent.popitem(last=False)
self._flow_mods_sent[xid] = (flow, command)
self._flow_mods_sent[xid] = (flow, command, owner)

def _add_barrier_request(self, dpid, barrier_xid, flow_mods):
"""Add a barrier request."""
Expand Down Expand Up @@ -812,7 +822,11 @@ def _send_barrier_request(self, switch, flow_mods):
)
self.controller.buffers.msg_out.put(event)

def _send_flow_mod(self, switch, flow_mod):
def _send_flow_mod(self, switch, flow_mod, owner):
owner_pacer = f"send_flow_mod.{owner}"
if not self.pacer.is_configured(owner_pacer):
owner_pacer = "send_flow_mod.no_owner"
self.pacer.hit(owner_pacer, switch.dpid)
if not switch.is_connected():
raise SwitchNotConnectedError(
f"switch {switch.id} isn't connected", flow_mod
Expand Down Expand Up @@ -859,7 +873,7 @@ def _send_openflow_connection_error(self, event):
switch = event.content["destination"].switch
flow = event.message
try:
_, error_command = self._flow_mods_sent[event.message.header.xid]
_, error_command, _ = self._flow_mods_sent[event.message.header.xid]
except KeyError:
error_command = "unknown"
error_kwargs = {
Expand Down Expand Up @@ -892,7 +906,7 @@ def handle_errors(self, event):
return
xid = message.header.xid.value
try:
flow, error_command = self._flow_mods_sent[xid]
flow, error_command, _ = self._flow_mods_sent[xid]
except KeyError:
pass
else:
Expand Down
30 changes: 30 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,33 @@
to be at least greater than FLOW_STATS and ideally it slightly greater than
whichever longest network convergence FlowMods operations that your network has.
"""

# Rate limits for sending flow mods this can be set per NApp,
# and the pacing is per DPID when in the context of that NApp
# The NApp that the flow pertains to is determined via the `owner` attribute of the flow.
ACTION_PACES = {
"send_flow_mod.no_owner": {
"pace": "100/second",
"strategy": "fixed_window",
},
"send_flow_mod.mef_eline": {
"pace": "100/second",
"strategy": "fixed_window",
},
"send_flow_mod.of_multi_table": {
"pace": "100/second",
"strategy": "fixed_window",
},
"send_flow_mod.telemetry_int": {
"pace": "100/second",
"strategy": "fixed_window",
},
"send_flow_mod.of_lldp": {
"pace": "100/second",
"strategy": "fixed_window",
},
"send_flow_mod.coloring": {
"pace": "100/second",
"strategy": "fixed_window",
},
}
10 changes: 5 additions & 5 deletions tests/integration/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ def test_add_flow_mod_sent_ok(self):
xid = "12345"
command = "add"
initial_len = len(self.napp._flow_mods_sent)
self.napp._add_flow_mod_sent(xid, flow, command)
self.napp._add_flow_mod_sent(xid, flow, command, "no_owner")

assert len(self.napp._flow_mods_sent) == initial_len + 1
assert self.napp._flow_mods_sent.get(xid, None) == (flow, command)
assert self.napp._flow_mods_sent.get(xid, None) == (flow, command, "no_owner")

def test_add_flow_mod_sent_overlimit(self):
self.napp._flow_mods_sent_max_size = 5
Expand All @@ -41,12 +41,12 @@ def test_add_flow_mod_sent_overlimit(self):
while len(self.napp._flow_mods_sent) < 5:
xid += "1"
flow = Mock()
self.napp._add_flow_mod_sent(xid, flow, command)
self.napp._add_flow_mod_sent(xid, flow, command, "no_owner")

xid = "90876"
flow = Mock()
initial_len = len(self.napp._flow_mods_sent)
self.napp._add_flow_mod_sent(xid, flow, command)
self.napp._add_flow_mod_sent(xid, flow, command, "no_owner")

assert len(self.napp._flow_mods_sent) == initial_len
assert self.napp._flow_mods_sent.get(xid, None) == (flow, command)
assert self.napp._flow_mods_sent.get(xid, None) == (flow, command, "no_owner")
39 changes: 24 additions & 15 deletions tests/unit/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,14 +485,21 @@ def test_install_flows(self, *args):
serializer.from_dict.return_value = flow
mock_flow_factory.return_value = serializer

flows_dict = {"flows": [MagicMock(), MagicMock()]}
flows_dict = {
"flows": [
MagicMock(get=lambda x, y=None: y),
MagicMock(get=lambda x, y=None: y),
]
}
switches = [self.switch_01]
self.napp._install_flows("add", flows_dict, switches)

mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod)
mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod, "no_owner")
assert mock_send_flow_mod.call_count == len(flows_dict["flows"])
assert mock_send_barrier_request.call_count == 1
mock_add_flow_mod_sent.assert_called_with(flow_mod.header.xid, flow, "add")
mock_add_flow_mod_sent.assert_called_with(
flow_mod.header.xid, flow, "add", "no_owner"
)
mock_send_napp_event.assert_called_with(self.switch_01, flow, "pending")
self.napp.flow_controller.upsert_flows.assert_called()

Expand All @@ -519,13 +526,13 @@ def test_install_flows_with_delete_strict(self, *args):
serializer.from_dict.return_value = flow
mock_flow_factory.return_value = serializer

flows_dict = {"flows": [MagicMock()]}
flows_dict = {"flows": [MagicMock(get=lambda x, y=None: y)]}
switches = [self.switch_01]
self.napp._install_flows("delete_strict", flows_dict, switches)

mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod)
mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod, "no_owner")
mock_add_flow_mod_sent.assert_called_with(
flow_mod.header.xid, flow, "delete_strict"
flow_mod.header.xid, flow, "delete_strict", "no_owner"
)
mock_send_napp_event.assert_called_with(self.switch_01, flow, "pending")
mock_send_barrier_request.assert_called()
Expand Down Expand Up @@ -676,9 +683,9 @@ def test_add_flow_mod_sent(self):
xid = 0
flow = MagicMock()

self.napp._add_flow_mod_sent(xid, flow, "add")
self.napp._add_flow_mod_sent(xid, flow, "add", "no_owner")

assert self.napp._flow_mods_sent[xid] == (flow, "add")
assert self.napp._flow_mods_sent[xid] == (flow, "add", "no_owner")

def test_send_flow_mod(self):
"""Test _send_flow_mod method."""
Expand All @@ -687,7 +694,7 @@ def test_send_flow_mod(self):
switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
flow_mod = MagicMock()

self.napp._send_flow_mod(switch, flow_mod)
self.napp._send_flow_mod(switch, flow_mod, "no_owner")

mock_buffers_put.assert_called()

Expand All @@ -699,7 +706,7 @@ def test_send_flow_mod_error(self, mock_buffers_put):
flow_mod = MagicMock()

with pytest.raises(SwitchNotConnectedError):
self.napp._send_flow_mod(switch, flow_mod)
self.napp._send_flow_mod(switch, flow_mod, "no_owner")

mock_buffers_put.assert_not_called()

Expand All @@ -722,7 +729,7 @@ def test_handle_errors(self, mock_send_napp_event):
flow.id = "1"
flow.as_dict.return_value = {}
flow.cookie = 0
self.napp._flow_mods_sent[0] = (flow, "add")
self.napp._flow_mods_sent[0] = (flow, "add", "no_owner")

switch = get_switch_mock("00:00:00:00:00:00:00:01")
switch.connection = get_connection_mock(
Expand Down Expand Up @@ -1086,7 +1093,9 @@ def test_on_ofpt_barrier_reply(self, mock_publish):

barrier_xid = list(self.napp._pending_barrier_reply[switch.id].keys())[-1]
for flow_mod in flow_mods:
self.napp._add_flow_mod_sent(flow_mod.header.xid, flow_mod, "add")
self.napp._add_flow_mod_sent(
flow_mod.header.xid, flow_mod, "add", "no_owner"
)

event = MagicMock()
event.message.header.xid = barrier_xid
Expand Down Expand Up @@ -1157,7 +1166,7 @@ def test_retry_on_openflow_connection_error(self, mock_barrier):
flow.as_dict.return_value = {}
flow.header.message_type = Type.OFPT_FLOW_MOD
flow.xid = 1
self.napp._flow_mods_sent[flow.xid] = (flow, "add")
self.napp._flow_mods_sent[flow.xid] = (flow, "add", "no_owner")

mock_ev = MagicMock()
mock_ev.message = flow
Expand Down Expand Up @@ -1187,7 +1196,7 @@ def test_retry_on_openflow_connection_error_send_event(self, mock_send):
flow.as_dict.return_value = {}
flow.header.message_type = Type.OFPT_FLOW_MOD
flow.xid = 1
self.napp._flow_mods_sent[flow.xid] = (flow, "add")
self.napp._flow_mods_sent[flow.xid] = (flow, "add", "no_owner")

# make sure a previous retry has stored executed
self.napp._flow_mods_retry_count[flow.xid] = (3, now(), 10)
Expand Down Expand Up @@ -1241,7 +1250,7 @@ def test_send_openflow_connection_error(self, mock_send):
flow = MagicMock()
flow.as_dict.return_value = {}
flow.xid = 1
self.napp._flow_mods_sent[flow.xid] = (flow, "add")
self.napp._flow_mods_sent[flow.xid] = (flow, "add", "no_owner")

mock_ev = MagicMock()
mock_ev.event.content = {"destination": switch}
Expand Down

0 comments on commit 2876238

Please sign in to comment.