diff --git a/main.py b/main.py index 47dfbaeb..74494d5e 100755 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ import time import traceback from collections import defaultdict +from contextlib import ExitStack from copy import deepcopy from threading import Lock from typing import Optional @@ -76,7 +77,7 @@ def setup(self): self.load_all_evcs() self._topology_updated_at = None - def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list: + def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]: """Get circuits sorted by desc service level and asc creation_time. In the future, as more ops are offloaded it should be get from the DB. @@ -829,79 +830,156 @@ def handle_link_down(self, event): """Change circuit when link is down or under_mantenance.""" link = event.content["link"] log.info("Event handle_link_down %s", link) - switch_flows = {} - evcs_with_failover = [] - evcs_normal = [] - check_failover = [] - failover_event_contents = {} - for evc in self.get_evcs_by_svc_level(): - with evc.lock: - if evc.is_affected_by_link(link): - evc.affected_by_link_at = event.timestamp - # if there is no failover path, handles link down the - # tradditional way - if ( + with ExitStack() as exit_stack: + exit_stack.enter_context(self._lock) + swap_to_failover = list[EVC]() + redeploy = list[EVC]() + clear_failover = list[EVC]() + clear_old_path = list[EVC]() + evcs_to_update = list[EVC]() + + for evc in self.get_evcs_by_svc_level(): + with ExitStack() as sub_stack: + sub_stack.enter_context(evc.lock) + if all(( + evc.is_affected_by_link(link), + evc.failover_path, + not evc.is_failover_path_affected_by_link(link) + )): + swap_to_failover.append(evc) + elif all(( + evc.is_affected_by_link(link), not evc.failover_path or evc.is_failover_path_affected_by_link(link) - ): - evcs_normal.append(evc) + )): + redeploy.append(evc) + elif all(( + not evc.is_affected_by_link(link), + evc.failover_path, + evc.is_failover_path_affected_by_link(link) + )): + clear_failover.append(evc) + else: continue - try: - dpid_flows = evc.get_failover_flows() - evc.old_path = evc.current_path - evc.current_path = evc.failover_path - evc.failover_path = Path([]) - # pylint: disable=broad-except - except Exception: - err = traceback.format_exc().replace("\n", ", ") - log.error( - "Ignore Failover path for " - f"{evc} due to error: {err}" + + exit_stack.push(sub_stack.pop_all()) + + # Swap from current path to failover path + + swap_to_failover_flows = {} + failover_swap_ready = list[EVC]() + failover_swap_event_contents = {} + for evc in swap_to_failover: + try: + new_flows = evc.get_failover_flows() + evc.old_path = evc.current_path + evc.current_path = evc.failover_path + evc.failover_path = Path([]) + # pylint: disable=broad-except + except Exception: + err = traceback.format_exc().replace("\n", ", ") + log.error( + "Ignore Failover path for " + f"{evc} due to error: {err}" + ) + redeploy.append(evc) + continue + swap_to_failover_flows = merge_flow_dicts( + swap_to_failover_flows, + new_flows + ) + failover_swap_ready.append(evc) + failover_swap_event_contents[evc.id] = map_evc_event_content( + evc, + flows=deepcopy(new_flows) + ) + clear_old_path.append(evc) + + if failover_swap_ready: + emit_event( + self.controller, "failover_link_down", + content=deepcopy(failover_swap_event_contents) + ) + send_flow_mods_event( + self.controller, + swap_to_failover_flows, + 'install' + ) + # evcs_to_update.extend(failover_swap_ready) + + # Clear out flows from failover path + + for evc in clear_failover: + evc.old_path = evc.failover_path + evc.failover_path = Path([]) + clear_old_path.append(evc) + + # Clear out old flows + + clear_old_path_flows = {} + clear_old_path_ready = list[EVC]() + clear_old_path_event_contents = {} + + for evc in clear_old_path: + try: + del_flows = prepare_delete_flow( + merge_flow_dicts( + evc._prepare_uni_flows(evc.old_path, skip_in=True), + evc._prepare_nni_flows(evc.old_path) ) - evcs_normal.append(evc) - continue - for dpid, flows in dpid_flows.items(): - switch_flows.setdefault(dpid, []) - switch_flows[dpid].extend(flows) - evcs_with_failover.append(evc) - failover_event_contents[evc.id] = map_evc_event_content( - evc, - flows={k: v.copy() for k, v in switch_flows.items()} ) - elif evc.is_failover_path_affected_by_link(link): - evc.old_path = evc.failover_path - evc.failover_path = Path([]) - check_failover.append(evc) - - if failover_event_contents: - emit_event(self.controller, "failover_link_down", - content=deepcopy(failover_event_contents)) - send_flow_mods_event(self.controller, switch_flows, 'install') - - for evc in evcs_normal: - emit_event( - self.controller, - "evc_affected_by_link_down", - content={"link": link} | map_evc_event_content(evc) - ) + except Exception: + err = traceback.format_exc().replace("\n", ", ") + log.error(f"Fail to remove {evc} old_path: {err}") + continue + clear_old_path_flows = merge_flow_dicts( + clear_old_path_flows, + del_flows + ) + clear_old_path_ready.append(evc) + clear_old_path_event_contents[evc.id] = map_evc_event_content( + evc, + removed_flows=deepcopy(del_flows) + ) - evcs_to_update = [] - for evc in evcs_with_failover: - evcs_to_update.append(evc.as_dict()) - log.info( - f"{evc} redeployed with failover due to link down {link.id}" - ) - for evc in check_failover: - evcs_to_update.append(evc.as_dict()) + if clear_old_path_ready: + send_flow_mods_event( + self.controller, + clear_old_path_flows, + 'delete' + ) + emit_event( + self.controller, + "failover_old_path", + content=clear_old_path_event_contents + ) + evcs_to_update.extend(clear_old_path_ready) - self.mongo_controller.update_evcs(evcs_to_update) - emit_event( - self.controller, - "cleanup_evcs_old_path", - content={"evcs": evcs_with_failover + check_failover} - ) + # Handle redeploys + # I think this uses a separate syncing mechanism here, so don't push to DB. + + for evc in redeploy: + result = evc.handle_link_down() + event_name = "error_redeploy_link_down" + if result: + log.info(f"{evc} redeployed due to link down {link.id}") + event_name = "redeployed_link_down" + emit_event( + self.controller, + event_name, + content=map_evc_event_content(evc) + ) + + + # Push update to DB + + if evcs_to_update: + self.mongo_controller.update_evcs( + [evc.as_dict() for evc in evcs_to_update] + ) + @listen_to("kytos/mef_eline.evc_affected_by_link_down") def on_evc_affected_by_link_down(self, event): diff --git a/utils.py b/utils.py index 32db4cbc..8ae4e971 100644 --- a/utils.py +++ b/utils.py @@ -28,7 +28,7 @@ def emit_event(controller, name, context="kytos/mef_eline", content=None, def merge_flow_dicts( - dst: dict[str, list], *srcs: list[dict[str, list]] + dst: dict[str, list], *srcs: dict[str, list] ) -> dict[str, list]: """Merge srcs dict flows into dst.""" for src in srcs: