diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 340f83af..8c5b3686 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -11,6 +11,7 @@ Added - EVC list now utilizes ``localStorage`` to store ``search_cols`` and make them persistent throughout EVC list usage. - Added ``kytos/mef_eline.uni_active_updated`` event - Included "id" on EVC mapped content to normalize it with the other models +- Introduced ``failover_old_path``, ``failover_deployed``, and ``failover_link_down`` events, which will be primarily consumed by ``telemetry_int`` NApp Changed ======= diff --git a/README.rst b/README.rst index 1836aa21..9dfd8035 100644 --- a/README.rst +++ b/README.rst @@ -109,6 +109,73 @@ Event published when an EVC active state changes due to a UNI going up or down "uni_z": evc.uni_z.as_dict()} } +kytos/mef_eline.failover_deployed +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Event published when an EVC failover_path gets deployed. ``flows`` are the new deployed flows, and ``removed_flows`` are the removed ones. + +.. code-block:: python3 + + { + evc.id: { + "id", evc.id, + "evc_id": evc.id, + "name": evc.name, + "metadata": evc.metadata, + "active": evc._active, + "enabled": evc._enabled, + "uni_a": evc.uni_a.as_dict(), + "uni_z": evc.uni_z.as_dict(), + "flows": [], + "removed_flows": [], + "error_reason": string, + "current_path": evc.current_path.as_dict(), + } + } + +kytos/mef_eline.failover_link_down +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Event published when an EVC failover_path switches over. ``flows`` are the new deployed flows. + +.. code-block:: python3 + + { + evc.id: { + "id", evc.id, + "evc_id": evc.id, + "name": evc.name, + "metadata": evc.metadata, + "active": evc._active, + "enabled": evc._enabled, + "uni_a": evc.uni_a.as_dict(), + "uni_z": evc.uni_z.as_dict(), + "flows": [], + } + } + +kytos/mef_eline.failover_old_path +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Event published when an EVC failover related old path gets removed (cleaned up). ``removed_flows`` are the removed flows. + +.. code-block:: python3 + + { + evc.id: { + "id", evc.id, + "evc_id": evc.id, + "name": evc.name, + "metadata": evc.metadata, + "active": evc._active, + "enabled": evc._enabled, + "uni_a": evc.uni_a.as_dict(), + "uni_z": evc.uni_z.as_dict(), + "removed_flows": [], + "current_path": evc.current_path.as_dict(), + } + } + .. TAGs diff --git a/main.py b/main.py index 13c8d8fa..8e4f4adc 100755 --- a/main.py +++ b/main.py @@ -816,7 +816,9 @@ def on_link_down(self, event): """Change circuit when link is down or under_mantenance.""" self.handle_link_down(event) - def handle_link_down(self, event): # pylint: disable=too-many-branches + # pylint: disable=too-many-branches + # pylint: disable=too-many-locals + def handle_link_down(self, event): """Change circuit when link is down or under_mantenance.""" link = event.content["link"] log.info("Event handle_link_down %s", link) @@ -824,6 +826,8 @@ def handle_link_down(self, event): # pylint: disable=too-many-branches evcs_with_failover = [] evcs_normal = [] check_failover = [] + failover_event_contents = {} + for evc in self.get_evcs_by_svc_level(): with evc.lock: if evc.is_affected_by_link(link): @@ -854,11 +858,19 @@ def handle_link_down(self, event): # pylint: disable=too-many-branches switch_flows.setdefault(dpid, []) switch_flows[dpid].extend(flows) evcs_with_failover.append(evc) + failover_event_contents[evc.id] = map_evc_event_content( + evc, + flows={k: v.copy() for k, v in switch_flows.items()} + ) elif evc.is_failover_path_affected_by_link(link): evc.old_path = evc.failover_path evc.failover_path = Path([]) check_failover.append(evc) + if failover_event_contents: + emit_event(self.controller, "failover_link_down", + content=failover_event_contents) + while switch_flows: offset = settings.BATCH_SIZE or None switches = list(switch_flows.keys()) @@ -888,11 +900,6 @@ def handle_link_down(self, event): # pylint: disable=too-many-branches evcs_to_update = [] for evc in evcs_with_failover: evcs_to_update.append(evc.as_dict()) - emit_event( - self.controller, - "redeployed_link_down", - content=map_evc_event_content(evc) - ) log.info( f"{evc} redeployed with failover due to link down {link.id}" ) @@ -949,11 +956,21 @@ def on_cleanup_evcs_old_path(self, event): def handle_cleanup_evcs_old_path(self, event): """Handle cleanup evcs old path.""" evcs = event.content.get("evcs", []) + event_contents: dict[str, list[dict]] = {} for evc in evcs: if not evc.old_path: continue - evc.remove_path_flows(evc.old_path) + removed_flows = evc.remove_path_flows(evc.old_path) + content = map_evc_event_content( + evc, + removed_flows=removed_flows, + current_path=evc.current_path.as_dict(), + ) + event_contents[evc.id] = content evc.old_path = Path([]) + if event_contents: + emit_event(self.controller, "failover_old_path", + content=event_contents) @listen_to("kytos/topology.topology_loaded") def on_topology_loaded(self, event): # pylint: disable=unused-argument diff --git a/models/evc.py b/models/evc.py index 6a6fdc5d..9a746d60 100644 --- a/models/evc.py +++ b/models/evc.py @@ -26,7 +26,8 @@ compare_endpoint_trace, compare_uni_out_trace, emit_event, make_uni_list, map_dl_vlan, - map_evc_event_content) + map_evc_event_content, + merge_flow_dicts) from .path import DynamicPathManager, Path @@ -739,12 +740,14 @@ def remove_current_flows(self, current_path=None, force=True, if sync: self.sync() - def remove_path_flows(self, path=None, force=True): - """Remove all flows from path.""" - if not path: - return + def remove_path_flows( + self, path=None, force=True + ) -> dict[str, list[dict]]: + """Remove all flows from path, and return the removed flows.""" + dpid_flows_match: dict[str, list[dict]] = {} - dpid_flows_match = {} + if not path: + return dpid_flows_match try: nni_flows = self._prepare_nni_flows(path) @@ -793,6 +796,8 @@ def remove_path_flows(self, path=None, force=True): except KytosTagError as err: log.error(f"Error when removing path flows: {err}") + return dpid_flows_match + @staticmethod def links_zipped(path=None): """Return an iterator which yields pairs of links in order.""" @@ -902,9 +907,11 @@ def setup_failover_path(self): if not self.is_eligible_for_failover_path(): return False + out_new_flows: dict[str, list[dict]] = {} reason = "" - self.remove_path_flows(self.failover_path) + out_removed_flows = self.remove_path_flows(self.failover_path) self.failover_path = Path([]) + for use_path in self.get_failover_path_candidates(): if not use_path: continue @@ -919,19 +926,34 @@ def setup_failover_path(self): try: if use_path: - self._install_nni_flows(use_path) - self._install_uni_flows(use_path, skip_in=True) + _nni_flows = self._install_nni_flows(use_path) + out_new_flows = merge_flow_dicts(out_new_flows, _nni_flows) + _uni_flows = self._install_uni_flows(use_path, skip_in=True) + out_new_flows = merge_flow_dicts(out_new_flows, _uni_flows) except FlowModException as err: reason = "Error deploying failover path" log.error( f"{reason} for {self}. FlowManager error: {err}" ) - self.remove_path_flows(use_path) + _rmed_flows = self.remove_path_flows(use_path) + out_new_flows = merge_flow_dicts(out_new_flows, _rmed_flows) use_path = Path([]) self.failover_path = use_path self.sync() + if out_new_flows or out_removed_flows: + content = { + self.id: map_evc_event_content( + self, + flows=out_new_flows, + removed_flows=out_removed_flows, + error_reason=reason, + current_path=self.current_path.as_dict(), + ) + } + emit_event(self._controller, "failover_deployed", content=content) + if not use_path: log.warning( f"Failover path for {self} was not deployed: {reason}" @@ -1064,10 +1086,14 @@ def _prepare_nni_flows(self, path=None): nni_flows[in_endpoint.switch.id] = flows return nni_flows - def _install_nni_flows(self, path=None): + def _install_nni_flows( + self, path=None + ) -> dict[str, list[dict]]: """Install NNI flows.""" - for dpid, flows in self._prepare_nni_flows(path).items(): + nni_flows = self._prepare_nni_flows(path) + for dpid, flows in nni_flows.items(): self._send_flow_mods(dpid, flows) + return nni_flows @staticmethod def _get_value_from_uni_tag(uni: UNI): @@ -1184,13 +1210,17 @@ def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False): return uni_flows - def _install_uni_flows(self, path=None, skip_in=False, skip_out=False): + def _install_uni_flows( + self, path=None, skip_in=False, skip_out=False + ) -> dict[str, list[dict]]: """Install UNI flows.""" uni_flows = self._prepare_uni_flows(path, skip_in, skip_out) for (dpid, flows) in uni_flows.items(): self._send_flow_mods(dpid, flows) + return uni_flows + @staticmethod def _send_flow_mods(dpid, flow_mods, command='flows', force=False): """Send a flow_mod list to a specific switch. diff --git a/tests/unit/models/test_evc_deploy.py b/tests/unit/models/test_evc_deploy.py index 5fc90ba4..ad7db650 100644 --- a/tests/unit/models/test_evc_deploy.py +++ b/tests/unit/models/test_evc_deploy.py @@ -330,7 +330,10 @@ def test_install_uni_flows(send_flow_mods_mock): send_flow_mods_mock.assert_not_called() # pylint: disable=protected-access - evc._install_uni_flows(evc.primary_links) + uni_flows = evc._install_uni_flows(evc.primary_links) + assert uni_flows + assert list(uni_flows.keys()) == [evc.uni_a.interface.switch.id, + evc.uni_z.interface.switch.id] expected_flow_mod_a = [ { @@ -496,7 +499,10 @@ def test_install_nni_flows(send_flow_mods_mock): evc = TestEVC.create_evc_inter_switch() # pylint: disable=protected-access - evc._install_nni_flows(evc.primary_links) + nni_flows = evc._install_nni_flows(evc.primary_links) + assert nni_flows + dpid = evc.primary_links[0].endpoint_b.switch.id + assert list(nni_flows.keys()) == [dpid] in_vlan = evc.primary_links[0].get_metadata("s_vlan").value out_vlan = evc.primary_links[-1].get_metadata("s_vlan").value @@ -709,6 +715,7 @@ def test_deploy_error(self, *args): assert remove_current_flows.call_count == 2 assert deployed is False + @patch("napps.kytos.mef_eline.models.evc.emit_event") @patch("napps.kytos.mef_eline.models.evc.EVC.get_failover_path_candidates") @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows") @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows") @@ -722,6 +729,7 @@ def test_setup_failover_path(self, *args): install_uni_flows_mock, install_nni_flows_mock, get_failover_path_candidates_mock, + emit_event_mock, ) = args # case1: early return intra switch @@ -751,6 +759,8 @@ def test_setup_failover_path(self, *args): install_uni_flows_mock.assert_called_with(path_mock, skip_in=True) assert evc2.failover_path == path_mock assert sync_mock.call_count == 1 + assert emit_event_mock.call_count == 1 + assert emit_event_mock.call_args[0][1] == "failover_deployed" # case 4: failed to setup failover_path - No Tag available evc2.failover_path = [] @@ -1363,7 +1373,12 @@ def test_remove_path_flows(self, *args): }, ] - evc.remove_path_flows(evc.primary_links) + dpid_flows = evc.remove_path_flows(evc.primary_links) + assert dpid_flows + assert len(dpid_flows) == 3 + assert sum(len(flows) for flows in dpid_flows.values()) == len( + expected_flows_1 + ) + len(expected_flows_2) + len(expected_flows_3) send_flow_mods_mock.assert_has_calls([ call(1, expected_flows_1, 'delete', force=True), call(2, expected_flows_2, 'delete', force=True), diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index 3b7c501d..5481ae4e 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -2004,19 +2004,8 @@ def test_handle_link_down(self, emit_event_mock, settings_mock, _): self.napp.mongo_controller.update_evcs.assert_called_with( [{"id": "5"}, {"id": "4"}, {"id": "2"}] ) - event_name = "redeployed_link_down" - emit_event_mock.assert_has_calls([ - call(self.napp.controller, event_name, content={ - "evc_id": "4", - "id": "4", - "name": "name", - "metadata": "mock", - "active": "true", - "enabled": "true", - "uni_a": uni.as_dict(), - "uni_z": uni.as_dict(), - }), - ]) + event_name = "failover_link_down" + assert emit_event_mock.call_args_list[0][0][1] == event_name @patch("napps.kytos.mef_eline.main.emit_event") def test_handle_evc_affected_by_link_down(self, emit_event_mock): @@ -2080,17 +2069,33 @@ def test_handle_evc_affected_by_link_down(self, emit_event_mock): } ) - def test_cleanup_evcs_old_path(self): + def test_cleanup_evcs_old_path(self, monkeypatch): """Test handle_cleanup_evcs_old_path method.""" - evc1 = create_autospec(EVC, id="1", old_path=["1"]) - evc2 = create_autospec(EVC, id="2", old_path=["2"]) - evc3 = create_autospec(EVC, id="3", old_path=[]) + current_path, map_evc_content, emit_event = [ + MagicMock(), MagicMock(), MagicMock() + ] + monkeypatch.setattr( + "napps.kytos.mef_eline.main.map_evc_event_content", + map_evc_content + ) + monkeypatch.setattr( + "napps.kytos.mef_eline.main.emit_event", + emit_event + ) + evc1 = create_autospec(EVC, id="1", old_path=["1"], + current_path=current_path) + evc2 = create_autospec(EVC, id="2", old_path=["2"], + current_path=current_path) + evc3 = create_autospec(EVC, id="3", old_path=[], current_path=[]) event = KytosEvent(name="e1", content={"evcs": [evc1, evc2, evc3]}) self.napp.handle_cleanup_evcs_old_path(event) evc1.remove_path_flows.assert_called_with(["1"]) evc2.remove_path_flows.assert_called_with(["2"]) evc3.remove_path_flows.assert_not_called() + assert emit_event.call_count == 1 + assert emit_event.call_args[0][1] == "failover_old_path" + assert len(emit_event.call_args[1]["content"]) == 2 async def test_add_metadata(self): """Test method to add metadata""" diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 64267151..6568da4a 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -8,11 +8,12 @@ from napps.kytos.mef_eline.utils import (check_disabled_component, compare_endpoint_trace, compare_uni_out_trace, - get_vlan_tags_and_masks, map_dl_vlan) + get_vlan_tags_and_masks, map_dl_vlan, + merge_flow_dicts) # pylint: disable=too-many-public-methods, too-many-lines -class TestUtils(): +class TestUtils: """Test utility functions.""" @pytest.mark.parametrize( @@ -140,3 +141,25 @@ def test_check_disabled_component(self): # There is no disabled component uni_z.interface.status = EntityStatus.UP check_disabled_component(uni_a, uni_z) + + @pytest.mark.parametrize( + "src1,src2,src3,expected", + [ + ( + {"dpida": [10, 11, 12]}, + {"dpida": [11, 20, 21]}, + {"dpidb": [30, 31, 32]}, + {"dpida": [10, 11, 12, 11, 20, 21], "dpidb": [30, 31, 32]}, + ), + ( + {"dpida": [10, 11, 12]}, + {"dpida": [11, 20, 21], "dpidb": [40, 41, 42]}, + {"dpidb": [30, 31, 32]}, + {"dpida": [10, 11, 12, 11, 20, 21], + "dpidb": [40, 41, 42, 30, 31, 32]}, + ), + ] + ) + def test_merge_flow_dicts(self, src1, src2, src3, expected) -> None: + """test merge flow dicts.""" + assert merge_flow_dicts({}, src1, src2, src3) == expected diff --git a/utils.py b/utils.py index 46e87e9b..362fccf2 100644 --- a/utils.py +++ b/utils.py @@ -27,6 +27,19 @@ def emit_event(controller, name, context="kytos/mef_eline", content=None, controller.buffers.app.put(event, timeout=timeout) +def merge_flow_dicts( + dst: dict[str, list], *srcs: list[dict[str, list]] +) -> dict[str, list]: + """Merge srcs dict flows into dst.""" + for src in srcs: + for k, v in src.items(): + if k not in dst: + dst[k] = v + else: + dst[k].extend(v) + return dst + + async def aemit_event(controller, name, content): """Send an asynchronous event""" event = KytosEvent(name=name, content=content)