diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1619841..4f03904 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,10 @@ file. [UNRELEASED] - Under development ******************************** +Added +===== +- Added pacing for sending flow mods. Pacing can be configured in ``ACTION_PACES`` in the NApp's ``setings.py`` file. + General Information =================== - Added new script ``scripts/db/2024.1.0/000_hard_delete_old.py``, it's a general purpose script to hard delete flows that have been soft deleted before a given string UTC datetime. diff --git a/main.py b/main.py index cf78502..06a3b76 100644 --- a/main.py +++ b/main.py @@ -20,6 +20,7 @@ from kytos.core import KytosEvent, KytosNApp, log, rest from kytos.core.helpers import listen_to, now +from kytos.core.pacing import PacerWrapper from kytos.core.rest_api import ( HTTPException, JSONResponse, @@ -39,6 +40,7 @@ SwitchNotConnectedError, ) from .settings import ( + ACTION_PACES, CONN_ERR_MAX_RETRIES, CONN_ERR_MIN_WAIT, CONN_ERR_MULTIPLIER, @@ -100,6 +102,9 @@ def setup(self): self._flow_mods_retry_count_lock = Lock() self.resent_flows = set() + self.pacer = PacerWrapper("flow_manager", self.controller.pacer) + self.pacer.inject_config(ACTION_PACES) + @staticmethod def get_flow_controller() -> FlowController: """Get FlowController.""" @@ -204,7 +209,7 @@ def _on_ofpt_barrier_reply(self, event): flows = [] with self._flow_mods_sent_lock: for flow_xid in flow_xids: - flow, cmd = self._flow_mods_sent[flow_xid] + flow, cmd, _ = self._flow_mods_sent[flow_xid] if ( cmd != "add" or flow_xid not in self._flow_mods_sent @@ -288,7 +293,7 @@ def _retry_on_openflow_connection_error( try: xid = int(event.message.header.xid) - flow, command = self._flow_mods_sent[xid] + flow, command, owner = self._flow_mods_sent[xid] except KeyError: raise ValueError( f"Aborting retries, xid: {xid} not found on flow mods sent" @@ -324,7 +329,7 @@ def _retry_on_openflow_connection_error( ) flow_mod = build_flow_mod_from_command(flow, command) flow_mod.header.xid = xid - self._send_flow_mod(flow.switch, flow_mod) + self._send_flow_mod(flow.switch, flow_mod, owner) if send_barrier: self._send_barrier_request(flow.switch, [flow_mod]) return True @@ -721,7 +726,7 @@ def _install_flows( reraise_conn: True to reraise switch connection errors send_barrier: True to send barrier_request """ - flow_mods, flows, flow_dicts = [], [], [] + flow_mods, flows, flow_dicts, owners = [], [], [], [] for switch in switches: serializer = FlowFactory.get_class(switch, Flow04) flows_list = flows_dict.get("flows", []) @@ -738,8 +743,9 @@ def _install_flows( flow_mod.pack() flow_mods.append(flow_mod) flows.append(flow) + owners.append(flow_dict.get("owner", "no_owner")) flow_dicts.append( - {**{"flow": flow_dict}, **{"flow_id": flow.id, "switch": switch.id}} + {"flow": flow_dict, "flow_id": flow.id, "switch": switch.id} ) if save and command == "add": self.flow_controller.upsert_flows( @@ -749,21 +755,24 @@ def _install_flows( self.delete_matched_flows( flow_dicts, {switch.id: switch for switch in switches} ) - self._send_flow_mods(switches, flow_mods, flows, reraise_conn, send_barrier) + self._send_flow_mods( + switches, flow_mods, flows, owners, reraise_conn, send_barrier + ) def _send_flow_mods( self, switches, flow_mods, flows, + owners, reraise_conn=True, send_barrier=ENABLE_BARRIER_REQUEST, ): """Send FlowMod (and BarrierRequest) given a list of flow_dicts to switches.""" for switch in switches: - for i, (flow_mod, flow) in enumerate(zip(flow_mods, flows)): + for i, (flow_mod, flow, owner) in enumerate(zip(flow_mods, flows, owners)): try: - self._send_flow_mod(switch, flow_mod) + self._send_flow_mod(switch, flow_mod, owner) if send_barrier and i == len(flow_mods) - 1: self._send_barrier_request(switch, flow_mods) except SwitchNotConnectedError: @@ -774,15 +783,16 @@ def _send_flow_mods( flow_mod.header.xid, flow, build_command_from_flow_mod(flow_mod), + owner, ) self._send_napp_event(switch, flow, "pending") return flow_mods - def _add_flow_mod_sent(self, xid, flow, command): + def _add_flow_mod_sent(self, xid, flow, command, owner): """Add the flow mod to the list of flow mods sent.""" if len(self._flow_mods_sent) >= self._flow_mods_sent_max_size: self._flow_mods_sent.popitem(last=False) - self._flow_mods_sent[xid] = (flow, command) + self._flow_mods_sent[xid] = (flow, command, owner) def _add_barrier_request(self, dpid, barrier_xid, flow_mods): """Add a barrier request.""" @@ -812,7 +822,11 @@ def _send_barrier_request(self, switch, flow_mods): ) self.controller.buffers.msg_out.put(event) - def _send_flow_mod(self, switch, flow_mod): + def _send_flow_mod(self, switch, flow_mod, owner): + owner_pacer = f"send_flow_mod.{owner}" + if not self.pacer.is_configured(owner_pacer): + owner_pacer = "send_flow_mod.no_owner" + self.pacer.hit(owner_pacer, switch.dpid) if not switch.is_connected(): raise SwitchNotConnectedError( f"switch {switch.id} isn't connected", flow_mod @@ -859,7 +873,7 @@ def _send_openflow_connection_error(self, event): switch = event.content["destination"].switch flow = event.message try: - _, error_command = self._flow_mods_sent[event.message.header.xid] + _, error_command, _ = self._flow_mods_sent[event.message.header.xid] except KeyError: error_command = "unknown" error_kwargs = { @@ -892,7 +906,7 @@ def handle_errors(self, event): return xid = message.header.xid.value try: - flow, error_command = self._flow_mods_sent[xid] + flow, error_command, _ = self._flow_mods_sent[xid] except KeyError: pass else: diff --git a/settings.py b/settings.py index 67bf2e9..42a36fe 100644 --- a/settings.py +++ b/settings.py @@ -22,3 +22,33 @@ to be at least greater than FLOW_STATS and ideally it slightly greater than whichever longest network convergence FlowMods operations that your network has. """ + +# Rate limits for sending flow mods this can be set per NApp, +# and the pacing is per DPID when in the context of that NApp +# The NApp that the flow pertains to is determined via the `owner` attribute of the flow. +ACTION_PACES = { + "send_flow_mod.no_owner": { + "pace": "100/second", + "strategy": "fixed_window", + }, + "send_flow_mod.mef_eline": { + "pace": "100/second", + "strategy": "fixed_window", + }, + "send_flow_mod.of_multi_table": { + "pace": "100/second", + "strategy": "fixed_window", + }, + "send_flow_mod.telemetry_int": { + "pace": "100/second", + "strategy": "fixed_window", + }, + "send_flow_mod.of_lldp": { + "pace": "100/second", + "strategy": "fixed_window", + }, + "send_flow_mod.coloring": { + "pace": "100/second", + "strategy": "fixed_window", + }, +} diff --git a/tests/integration/test_main.py b/tests/integration/test_main.py index aeca7b9..94beb7d 100644 --- a/tests/integration/test_main.py +++ b/tests/integration/test_main.py @@ -29,10 +29,10 @@ def test_add_flow_mod_sent_ok(self): xid = "12345" command = "add" initial_len = len(self.napp._flow_mods_sent) - self.napp._add_flow_mod_sent(xid, flow, command) + self.napp._add_flow_mod_sent(xid, flow, command, "no_owner") assert len(self.napp._flow_mods_sent) == initial_len + 1 - assert self.napp._flow_mods_sent.get(xid, None) == (flow, command) + assert self.napp._flow_mods_sent.get(xid, None) == (flow, command, "no_owner") def test_add_flow_mod_sent_overlimit(self): self.napp._flow_mods_sent_max_size = 5 @@ -41,12 +41,12 @@ def test_add_flow_mod_sent_overlimit(self): while len(self.napp._flow_mods_sent) < 5: xid += "1" flow = Mock() - self.napp._add_flow_mod_sent(xid, flow, command) + self.napp._add_flow_mod_sent(xid, flow, command, "no_owner") xid = "90876" flow = Mock() initial_len = len(self.napp._flow_mods_sent) - self.napp._add_flow_mod_sent(xid, flow, command) + self.napp._add_flow_mod_sent(xid, flow, command, "no_owner") assert len(self.napp._flow_mods_sent) == initial_len - assert self.napp._flow_mods_sent.get(xid, None) == (flow, command) + assert self.napp._flow_mods_sent.get(xid, None) == (flow, command, "no_owner") diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index 45d02a1..16de038 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -485,14 +485,21 @@ def test_install_flows(self, *args): serializer.from_dict.return_value = flow mock_flow_factory.return_value = serializer - flows_dict = {"flows": [MagicMock(), MagicMock()]} + flows_dict = { + "flows": [ + MagicMock(get=lambda x, y=None: y), + MagicMock(get=lambda x, y=None: y), + ] + } switches = [self.switch_01] self.napp._install_flows("add", flows_dict, switches) - mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod) + mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod, "no_owner") assert mock_send_flow_mod.call_count == len(flows_dict["flows"]) assert mock_send_barrier_request.call_count == 1 - mock_add_flow_mod_sent.assert_called_with(flow_mod.header.xid, flow, "add") + mock_add_flow_mod_sent.assert_called_with( + flow_mod.header.xid, flow, "add", "no_owner" + ) mock_send_napp_event.assert_called_with(self.switch_01, flow, "pending") self.napp.flow_controller.upsert_flows.assert_called() @@ -519,13 +526,13 @@ def test_install_flows_with_delete_strict(self, *args): serializer.from_dict.return_value = flow mock_flow_factory.return_value = serializer - flows_dict = {"flows": [MagicMock()]} + flows_dict = {"flows": [MagicMock(get=lambda x, y=None: y)]} switches = [self.switch_01] self.napp._install_flows("delete_strict", flows_dict, switches) - mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod) + mock_send_flow_mod.assert_called_with(self.switch_01, flow_mod, "no_owner") mock_add_flow_mod_sent.assert_called_with( - flow_mod.header.xid, flow, "delete_strict" + flow_mod.header.xid, flow, "delete_strict", "no_owner" ) mock_send_napp_event.assert_called_with(self.switch_01, flow, "pending") mock_send_barrier_request.assert_called() @@ -676,9 +683,9 @@ def test_add_flow_mod_sent(self): xid = 0 flow = MagicMock() - self.napp._add_flow_mod_sent(xid, flow, "add") + self.napp._add_flow_mod_sent(xid, flow, "add", "no_owner") - assert self.napp._flow_mods_sent[xid] == (flow, "add") + assert self.napp._flow_mods_sent[xid] == (flow, "add", "no_owner") def test_send_flow_mod(self): """Test _send_flow_mod method.""" @@ -687,7 +694,7 @@ def test_send_flow_mod(self): switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04) flow_mod = MagicMock() - self.napp._send_flow_mod(switch, flow_mod) + self.napp._send_flow_mod(switch, flow_mod, "no_owner") mock_buffers_put.assert_called() @@ -699,7 +706,7 @@ def test_send_flow_mod_error(self, mock_buffers_put): flow_mod = MagicMock() with pytest.raises(SwitchNotConnectedError): - self.napp._send_flow_mod(switch, flow_mod) + self.napp._send_flow_mod(switch, flow_mod, "no_owner") mock_buffers_put.assert_not_called() @@ -722,7 +729,7 @@ def test_handle_errors(self, mock_send_napp_event): flow.id = "1" flow.as_dict.return_value = {} flow.cookie = 0 - self.napp._flow_mods_sent[0] = (flow, "add") + self.napp._flow_mods_sent[0] = (flow, "add", "no_owner") switch = get_switch_mock("00:00:00:00:00:00:00:01") switch.connection = get_connection_mock( @@ -1086,7 +1093,9 @@ def test_on_ofpt_barrier_reply(self, mock_publish): barrier_xid = list(self.napp._pending_barrier_reply[switch.id].keys())[-1] for flow_mod in flow_mods: - self.napp._add_flow_mod_sent(flow_mod.header.xid, flow_mod, "add") + self.napp._add_flow_mod_sent( + flow_mod.header.xid, flow_mod, "add", "no_owner" + ) event = MagicMock() event.message.header.xid = barrier_xid @@ -1157,7 +1166,7 @@ def test_retry_on_openflow_connection_error(self, mock_barrier): flow.as_dict.return_value = {} flow.header.message_type = Type.OFPT_FLOW_MOD flow.xid = 1 - self.napp._flow_mods_sent[flow.xid] = (flow, "add") + self.napp._flow_mods_sent[flow.xid] = (flow, "add", "no_owner") mock_ev = MagicMock() mock_ev.message = flow @@ -1187,7 +1196,7 @@ def test_retry_on_openflow_connection_error_send_event(self, mock_send): flow.as_dict.return_value = {} flow.header.message_type = Type.OFPT_FLOW_MOD flow.xid = 1 - self.napp._flow_mods_sent[flow.xid] = (flow, "add") + self.napp._flow_mods_sent[flow.xid] = (flow, "add", "no_owner") # make sure a previous retry has stored executed self.napp._flow_mods_retry_count[flow.xid] = (3, now(), 10) @@ -1241,7 +1250,7 @@ def test_send_openflow_connection_error(self, mock_send): flow = MagicMock() flow.as_dict.return_value = {} flow.xid = 1 - self.napp._flow_mods_sent[flow.xid] = (flow, "add") + self.napp._flow_mods_sent[flow.xid] = (flow, "add", "no_owner") mock_ev = MagicMock() mock_ev.event.content = {"destination": switch}