Skip to content

Commit

Permalink
Merge pull request #473 from kytos-ng/feat/failover_link_down
Browse files Browse the repository at this point in the history
feat: new failover_path publish events
  • Loading branch information
viniarck authored Jun 14, 2024
2 parents 7981cdb + 96c0ab4 commit 2e45177
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Added
- EVC list now utilizes ``localStorage`` to store ``search_cols`` and make them persistent throughout EVC list usage.
- Added ``kytos/mef_eline.uni_active_updated`` event
- Included "id" on EVC mapped content to normalize it with the other models
- Introduced ``failover_old_path``, ``failover_deployed``, and ``failover_link_down`` events, which will be primarily consumed by ``telemetry_int`` NApp

Changed
=======
Expand Down
67 changes: 67 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,73 @@ Event published when an EVC active state changes due to a UNI going up or down
"uni_z": evc.uni_z.as_dict()}
}
kytos/mef_eline.failover_deployed
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Event published when an EVC failover_path gets deployed. ``flows`` are the new deployed flows, and ``removed_flows`` are the removed ones.

.. code-block:: python3
{
evc.id: {
"id", evc.id,
"evc_id": evc.id,
"name": evc.name,
"metadata": evc.metadata,
"active": evc._active,
"enabled": evc._enabled,
"uni_a": evc.uni_a.as_dict(),
"uni_z": evc.uni_z.as_dict(),
"flows": [],
"removed_flows": [],
"error_reason": string,
"current_path": evc.current_path.as_dict(),
}
}
kytos/mef_eline.failover_link_down
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Event published when an EVC failover_path switches over. ``flows`` are the new deployed flows.

.. code-block:: python3
{
evc.id: {
"id", evc.id,
"evc_id": evc.id,
"name": evc.name,
"metadata": evc.metadata,
"active": evc._active,
"enabled": evc._enabled,
"uni_a": evc.uni_a.as_dict(),
"uni_z": evc.uni_z.as_dict(),
"flows": [],
}
}
kytos/mef_eline.failover_old_path
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Event published when an EVC failover related old path gets removed (cleaned up). ``removed_flows`` are the removed flows.

.. code-block:: python3
{
evc.id: {
"id", evc.id,
"evc_id": evc.id,
"name": evc.name,
"metadata": evc.metadata,
"active": evc._active,
"enabled": evc._enabled,
"uni_a": evc.uni_a.as_dict(),
"uni_z": evc.uni_z.as_dict(),
"removed_flows": [],
"current_path": evc.current_path.as_dict(),
}
}
.. TAGs
Expand Down
31 changes: 24 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,14 +816,18 @@ def on_link_down(self, event):
"""Change circuit when link is down or under_mantenance."""
self.handle_link_down(event)

def handle_link_down(self, event): # pylint: disable=too-many-branches
# pylint: disable=too-many-branches
# pylint: disable=too-many-locals
def handle_link_down(self, event):
"""Change circuit when link is down or under_mantenance."""
link = event.content["link"]
log.info("Event handle_link_down %s", link)
switch_flows = {}
evcs_with_failover = []
evcs_normal = []
check_failover = []
failover_event_contents = {}

for evc in self.get_evcs_by_svc_level():
with evc.lock:
if evc.is_affected_by_link(link):
Expand Down Expand Up @@ -854,11 +858,19 @@ def handle_link_down(self, event): # pylint: disable=too-many-branches
switch_flows.setdefault(dpid, [])
switch_flows[dpid].extend(flows)
evcs_with_failover.append(evc)
failover_event_contents[evc.id] = map_evc_event_content(
evc,
flows={k: v.copy() for k, v in switch_flows.items()}
)
elif evc.is_failover_path_affected_by_link(link):
evc.old_path = evc.failover_path
evc.failover_path = Path([])
check_failover.append(evc)

if failover_event_contents:
emit_event(self.controller, "failover_link_down",
content=failover_event_contents)

while switch_flows:
offset = settings.BATCH_SIZE or None
switches = list(switch_flows.keys())
Expand Down Expand Up @@ -888,11 +900,6 @@ def handle_link_down(self, event): # pylint: disable=too-many-branches
evcs_to_update = []
for evc in evcs_with_failover:
evcs_to_update.append(evc.as_dict())
emit_event(
self.controller,
"redeployed_link_down",
content=map_evc_event_content(evc)
)
log.info(
f"{evc} redeployed with failover due to link down {link.id}"
)
Expand Down Expand Up @@ -949,11 +956,21 @@ def on_cleanup_evcs_old_path(self, event):
def handle_cleanup_evcs_old_path(self, event):
"""Handle cleanup evcs old path."""
evcs = event.content.get("evcs", [])
event_contents: dict[str, list[dict]] = {}
for evc in evcs:
if not evc.old_path:
continue
evc.remove_path_flows(evc.old_path)
removed_flows = evc.remove_path_flows(evc.old_path)
content = map_evc_event_content(
evc,
removed_flows=removed_flows,
current_path=evc.current_path.as_dict(),
)
event_contents[evc.id] = content
evc.old_path = Path([])
if event_contents:
emit_event(self.controller, "failover_old_path",
content=event_contents)

@listen_to("kytos/topology.topology_loaded")
def on_topology_loaded(self, event): # pylint: disable=unused-argument
Expand Down
56 changes: 43 additions & 13 deletions models/evc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
compare_endpoint_trace,
compare_uni_out_trace, emit_event,
make_uni_list, map_dl_vlan,
map_evc_event_content)
map_evc_event_content,
merge_flow_dicts)

from .path import DynamicPathManager, Path

Expand Down Expand Up @@ -739,12 +740,14 @@ def remove_current_flows(self, current_path=None, force=True,
if sync:
self.sync()

def remove_path_flows(self, path=None, force=True):
"""Remove all flows from path."""
if not path:
return
def remove_path_flows(
self, path=None, force=True
) -> dict[str, list[dict]]:
"""Remove all flows from path, and return the removed flows."""
dpid_flows_match: dict[str, list[dict]] = {}

dpid_flows_match = {}
if not path:
return dpid_flows_match

try:
nni_flows = self._prepare_nni_flows(path)
Expand Down Expand Up @@ -793,6 +796,8 @@ def remove_path_flows(self, path=None, force=True):
except KytosTagError as err:
log.error(f"Error when removing path flows: {err}")

return dpid_flows_match

@staticmethod
def links_zipped(path=None):
"""Return an iterator which yields pairs of links in order."""
Expand Down Expand Up @@ -902,9 +907,11 @@ def setup_failover_path(self):
if not self.is_eligible_for_failover_path():
return False

out_new_flows: dict[str, list[dict]] = {}
reason = ""
self.remove_path_flows(self.failover_path)
out_removed_flows = self.remove_path_flows(self.failover_path)
self.failover_path = Path([])

for use_path in self.get_failover_path_candidates():
if not use_path:
continue
Expand All @@ -919,19 +926,34 @@ def setup_failover_path(self):

try:
if use_path:
self._install_nni_flows(use_path)
self._install_uni_flows(use_path, skip_in=True)
_nni_flows = self._install_nni_flows(use_path)
out_new_flows = merge_flow_dicts(out_new_flows, _nni_flows)
_uni_flows = self._install_uni_flows(use_path, skip_in=True)
out_new_flows = merge_flow_dicts(out_new_flows, _uni_flows)
except FlowModException as err:
reason = "Error deploying failover path"
log.error(
f"{reason} for {self}. FlowManager error: {err}"
)
self.remove_path_flows(use_path)
_rmed_flows = self.remove_path_flows(use_path)
out_new_flows = merge_flow_dicts(out_new_flows, _rmed_flows)
use_path = Path([])

self.failover_path = use_path
self.sync()

if out_new_flows or out_removed_flows:
content = {
self.id: map_evc_event_content(
self,
flows=out_new_flows,
removed_flows=out_removed_flows,
error_reason=reason,
current_path=self.current_path.as_dict(),
)
}
emit_event(self._controller, "failover_deployed", content=content)

if not use_path:
log.warning(
f"Failover path for {self} was not deployed: {reason}"
Expand Down Expand Up @@ -1064,10 +1086,14 @@ def _prepare_nni_flows(self, path=None):
nni_flows[in_endpoint.switch.id] = flows
return nni_flows

def _install_nni_flows(self, path=None):
def _install_nni_flows(
self, path=None
) -> dict[str, list[dict]]:
"""Install NNI flows."""
for dpid, flows in self._prepare_nni_flows(path).items():
nni_flows = self._prepare_nni_flows(path)
for dpid, flows in nni_flows.items():
self._send_flow_mods(dpid, flows)
return nni_flows

@staticmethod
def _get_value_from_uni_tag(uni: UNI):
Expand Down Expand Up @@ -1184,13 +1210,17 @@ def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):

return uni_flows

def _install_uni_flows(self, path=None, skip_in=False, skip_out=False):
def _install_uni_flows(
self, path=None, skip_in=False, skip_out=False
) -> dict[str, list[dict]]:
"""Install UNI flows."""
uni_flows = self._prepare_uni_flows(path, skip_in, skip_out)

for (dpid, flows) in uni_flows.items():
self._send_flow_mods(dpid, flows)

return uni_flows

@staticmethod
def _send_flow_mods(dpid, flow_mods, command='flows', force=False):
"""Send a flow_mod list to a specific switch.
Expand Down
21 changes: 18 additions & 3 deletions tests/unit/models/test_evc_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,10 @@ def test_install_uni_flows(send_flow_mods_mock):
send_flow_mods_mock.assert_not_called()

# pylint: disable=protected-access
evc._install_uni_flows(evc.primary_links)
uni_flows = evc._install_uni_flows(evc.primary_links)
assert uni_flows
assert list(uni_flows.keys()) == [evc.uni_a.interface.switch.id,
evc.uni_z.interface.switch.id]

expected_flow_mod_a = [
{
Expand Down Expand Up @@ -496,7 +499,10 @@ def test_install_nni_flows(send_flow_mods_mock):
evc = TestEVC.create_evc_inter_switch()

# pylint: disable=protected-access
evc._install_nni_flows(evc.primary_links)
nni_flows = evc._install_nni_flows(evc.primary_links)
assert nni_flows
dpid = evc.primary_links[0].endpoint_b.switch.id
assert list(nni_flows.keys()) == [dpid]

in_vlan = evc.primary_links[0].get_metadata("s_vlan").value
out_vlan = evc.primary_links[-1].get_metadata("s_vlan").value
Expand Down Expand Up @@ -709,6 +715,7 @@ def test_deploy_error(self, *args):
assert remove_current_flows.call_count == 2
assert deployed is False

@patch("napps.kytos.mef_eline.models.evc.emit_event")
@patch("napps.kytos.mef_eline.models.evc.EVC.get_failover_path_candidates")
@patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
@patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
Expand All @@ -722,6 +729,7 @@ def test_setup_failover_path(self, *args):
install_uni_flows_mock,
install_nni_flows_mock,
get_failover_path_candidates_mock,
emit_event_mock,
) = args

# case1: early return intra switch
Expand Down Expand Up @@ -751,6 +759,8 @@ def test_setup_failover_path(self, *args):
install_uni_flows_mock.assert_called_with(path_mock, skip_in=True)
assert evc2.failover_path == path_mock
assert sync_mock.call_count == 1
assert emit_event_mock.call_count == 1
assert emit_event_mock.call_args[0][1] == "failover_deployed"

# case 4: failed to setup failover_path - No Tag available
evc2.failover_path = []
Expand Down Expand Up @@ -1363,7 +1373,12 @@ def test_remove_path_flows(self, *args):
},
]

evc.remove_path_flows(evc.primary_links)
dpid_flows = evc.remove_path_flows(evc.primary_links)
assert dpid_flows
assert len(dpid_flows) == 3
assert sum(len(flows) for flows in dpid_flows.values()) == len(
expected_flows_1
) + len(expected_flows_2) + len(expected_flows_3)
send_flow_mods_mock.assert_has_calls([
call(1, expected_flows_1, 'delete', force=True),
call(2, expected_flows_2, 'delete', force=True),
Expand Down
Loading

0 comments on commit 2e45177

Please sign in to comment.