diff --git a/main.py b/main.py index 47dfbaeb..0fa495c2 100755 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ import time import traceback from collections import defaultdict +from contextlib import ExitStack from copy import deepcopy from threading import Lock from typing import Optional @@ -65,18 +66,19 @@ def setup(self): # dictionary of EVCs created. It acts as a circuit buffer. # Every create/update/delete must be synced to mongodb. - self.circuits = {} + self.circuits = dict[str, EVC]() self._intf_events = defaultdict(dict) self._lock_interfaces = defaultdict(Lock) self.table_group = {"epl": 0, "evpl": 0} self._lock = Lock() + self.multi_evc_lock = Lock() self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL) self.load_all_evcs() self._topology_updated_at = None - def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list: + def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]: """Get circuits sorted by desc service level and asc creation_time. In the future, as more ops are offloaded it should be get from the DB. @@ -823,85 +825,289 @@ def on_link_down(self, event): """Change circuit when link is down or under_mantenance.""" self.handle_link_down(event) + def prepare_swap_to_failover(self, evc: EVC): + """Prepare an evc for switching to failover.""" + install_flows = {} + try: + install_flows = evc.get_failover_flows() + evc.old_path = evc.current_path + evc.current_path = evc.failover_path + evc.failover_path = Path([]) + # pylint: disable=broad-except + except Exception: + err = traceback.format_exc().replace("\n", ", ") + log.error( + "Ignore Failover path for " + f"{evc} due to error: {err}" + ) + return install_flows + + def execute_swap_to_failover(self, event_contents, install_flows): + """Process changes needed to commit a swap to failover.""" + emit_event( + self.controller, "failover_link_down", + content=deepcopy(event_contents) + ) + send_flow_mods_event( + self.controller, + install_flows, + "install" + ) + + def prepare_clear_old_path(self, evc: EVC): + """Prepare an evc for clearing the old path.""" + del_flows = {} + try: + del_flows = prepare_delete_flow( + merge_flow_dicts( + evc._prepare_uni_flows(evc.old_path, skip_in=True), + evc._prepare_nni_flows(evc.old_path) + ) + ) + # pylint: disable=broad-except + except Exception: + err = traceback.format_exc().replace("\n", ", ") + log.error(f"Fail to remove {evc} old_path: {err}") + return del_flows + + def execute_clear_old_path( + self, + evcs: list[EVC], + event_contents, + delete_flows + ): + """Process changes needed to commit clearing the old path""" + send_flow_mods_event( + self.controller, + delete_flows, + 'delete' + ) + emit_event( + self.controller, + "failover_old_path", + content=event_contents + ) + for evc in evcs: + evc.old_path.make_vlans_available(self.controller) + evc.old_path = Path([]) + + def prepare_undeploy(self, evc: EVC): + """Prepare an evc for undeploying.""" + del_flows = {} + try: + del_flows = prepare_delete_flow( + merge_flow_dicts( + evc._prepare_uni_flows(evc.current_path, skip_in=True), + evc._prepare_nni_flows(evc.current_path), + evc._prepare_nni_flows(evc.failover_path) + ) + ) + # pylint: disable=broad-except + except Exception: + err = traceback.format_exc().replace("\n", ", ") + log.error(f"Fail to undeploy {evc}: {err}") + pass + return del_flows + + def execute_undeploy(self, evcs: list[EVC], remove_flows): + """Process changes needed to commit an undeploy""" + send_flow_mods_event( + self.controller, + remove_flows, + 'delete' + ) + + for evc in evcs: + evc.current_path.make_vlans_available(self.controller) + evc.failover_path.make_vlans_available(self.controller) + evc.current_path = Path([]) + evc.failover_path = Path([]) + evc.deactivate() + emit_event( + self.controller, + "need_redeploy", + content={"evc_id": evc.id} + ) + log.info(f"{evc} scheduled for redeploy") + # pylint: disable=too-many-branches # pylint: disable=too-many-locals def handle_link_down(self, event): """Change circuit when link is down or under_mantenance.""" link = event.content["link"] log.info("Event handle_link_down %s", link) - switch_flows = {} - evcs_with_failover = [] - evcs_normal = [] - check_failover = [] - failover_event_contents = {} - for evc in self.get_evcs_by_svc_level(): - with evc.lock: - if evc.is_affected_by_link(link): - evc.affected_by_link_at = event.timestamp - # if there is no failover path, handles link down the - # tradditional way - if ( + with ExitStack() as exit_stack: + exit_stack.enter_context(self.multi_evc_lock) + swap_to_failover = list[EVC]() + undeploy = list[EVC]() + clear_failover = list[EVC]() + clear_old_path = list[EVC]() + evc_dict = dict[str, EVC]() + evcs_to_update = list[EVC]() + + flow_modifications = defaultdict[str, dict[str, dict[list]]](dict) + event_contents = defaultdict[str, dict[str, dict]](dict) + + for evc in self.get_evcs_by_svc_level(): + evc_dict[evc.id] = evc + with ExitStack() as sub_stack: + sub_stack.enter_context(evc.lock) + if all(( + evc.is_affected_by_link(link), + evc.failover_path, + not evc.is_failover_path_affected_by_link(link) + )): + swap_to_failover.append(evc) + elif all(( + evc.is_affected_by_link(link), not evc.failover_path or evc.is_failover_path_affected_by_link(link) - ): - evcs_normal.append(evc) - continue - try: - dpid_flows = evc.get_failover_flows() - evc.old_path = evc.current_path - evc.current_path = evc.failover_path - evc.failover_path = Path([]) - # pylint: disable=broad-except - except Exception: - err = traceback.format_exc().replace("\n", ", ") - log.error( - "Ignore Failover path for " - f"{evc} due to error: {err}" - ) - evcs_normal.append(evc) + )): + undeploy.append(evc) + elif all(( + not evc.is_affected_by_link(link), + evc.failover_path, + evc.is_failover_path_affected_by_link(link) + )): + clear_failover.append(evc) + else: continue - for dpid, flows in dpid_flows.items(): - switch_flows.setdefault(dpid, []) - switch_flows[dpid].extend(flows) - evcs_with_failover.append(evc) - failover_event_contents[evc.id] = map_evc_event_content( - evc, - flows={k: v.copy() for k, v in switch_flows.items()} + + exit_stack.push(sub_stack.pop_all()) + + # Swap from current path to failover path + + for evc in swap_to_failover: + new_flows = self.prepare_swap_to_failover(evc) + if new_flows: + flow_modifications[evc.id]["swap_to_failover"] =\ + new_flows + event_contents[evc.id]["swap_to_failover"] =\ + map_evc_event_content( + evc, + flows=deepcopy(new_flows) + ) + clear_old_path.append(evc) + elif evc.id in flow_modifications: + del flow_modifications[evc.id] + del event_contents[evc.id] + undeploy.append(evc) + + for evc in clear_failover: + evc.old_path = evc.failover_path + evc.failover_path = Path([]) + clear_old_path.append(evc) + + # Clear out old flows + + for evc in clear_old_path: + del_flows = self.prepare_clear_old_path(evc) + if del_flows: + flow_modifications[evc.id]["clear_old_path"] = del_flows + event_contents[evc.id]["clear_old_path"] =\ + map_evc_event_content( + evc, + removed_flows=deepcopy(del_flows) ) - elif evc.is_failover_path_affected_by_link(link): - evc.old_path = evc.failover_path - evc.failover_path = Path([]) - check_failover.append(evc) + elif evc.id in flow_modifications: + if "swap_to_failover" in flow_modifications[evc.id]: + evc.failover_path = evc.current_path + evc.current_path = evc.old_path + else: + evc.failover_path = evc.old_path + evc.old_path = Path([]) + del flow_modifications[evc.id] + del event_contents[evc.id] + undeploy.append(evc) - if failover_event_contents: - emit_event(self.controller, "failover_link_down", - content=deepcopy(failover_event_contents)) - send_flow_mods_event(self.controller, switch_flows, 'install') + swap_to_failover_flows = {} + swap_to_failover_event_contents = {} - for evc in evcs_normal: - emit_event( - self.controller, - "evc_affected_by_link_down", - content={"link": link} | map_evc_event_content(evc) - ) + clear_old_path_flows = {} + clear_old_path_event_contents = {} - evcs_to_update = [] - for evc in evcs_with_failover: - evcs_to_update.append(evc.as_dict()) - log.info( - f"{evc} redeployed with failover due to link down {link.id}" - ) - for evc in check_failover: - evcs_to_update.append(evc.as_dict()) + clear_old_path_reservations = list[EVC]() - self.mongo_controller.update_evcs(evcs_to_update) + for modified_evc_id in flow_modifications: + evc = evc_dict[modified_evc_id] + if "swap_to_failover" in flow_modifications[modified_evc_id]: + swap_to_failover_flows = merge_flow_dicts( + swap_to_failover_flows, + flow_modifications[modified_evc_id]["swap_to_failover"] + ) + swap_to_failover_event_contents[modified_evc_id] =\ + event_contents[modified_evc_id]["swap_to_failover"] + if "clear_old_path" in flow_modifications[modified_evc_id]: + clear_old_path_flows = merge_flow_dicts( + clear_old_path_flows, + flow_modifications[modified_evc_id]["clear_old_path"] + ) + clear_old_path_event_contents[modified_evc_id] =\ + event_contents[modified_evc_id]["clear_old_path"] + clear_old_path_reservations.append(evc) + evcs_to_update.append(evc) + + if swap_to_failover_flows: + self.execute_swap_to_failover( + swap_to_failover_event_contents, + swap_to_failover_flows + ) - emit_event( - self.controller, - "cleanup_evcs_old_path", - content={"evcs": evcs_with_failover + check_failover} - ) + if clear_old_path_flows: + self.execute_clear_old_path( + clear_old_path_reservations, + clear_old_path_event_contents, + clear_old_path_flows + ) + + undeploy_flows = {} + undeploy_ready = list[EVC]() + + # Undeploy the evc, schedule a redeploy + + for evc in undeploy: + del_flows = self.prepare_undeploy(evc) + if del_flows: + undeploy_flows = merge_flow_dicts( + undeploy_flows, + del_flows + ) + undeploy_ready.append(evc) + + if undeploy_ready: + self.execute_undeploy(undeploy_ready, undeploy_flows) + evcs_to_update.extend(undeploy_ready) + + # Push update to DB + + if evcs_to_update: + self.mongo_controller.update_evcs( + [evc.as_dict() for evc in evcs_to_update] + ) + + @listen_to("kytos/mef_eline.need_redeploy") + def on_evc_need_redeploy(self, event): + """Redeploy evcs that need to be redeployed.""" + self.handle_evc_need_redeploy(event) + + def handle_evc_need_redeploy(self, event): + """Redeploy evcs that need to be redeployed.""" + evc = self.circuits.get(event.content["evc_id"]) + if evc is None: + return + with evc.lock: + if not evc.is_enabled() or evc.is_active(): + return + result = evc.deploy_to_path() + # if result: + # evc.setup_failover_path() + event_name = "error_redeploy_link_down" + if result: + log.info(f"{evc} redeployed") + event_name = "redeployed_link_down" + emit_event(self.controller, event_name, + content=map_evc_event_content(evc)) @listen_to("kytos/mef_eline.evc_affected_by_link_down") def on_evc_affected_by_link_down(self, event): diff --git a/utils.py b/utils.py index 32db4cbc..8ae4e971 100644 --- a/utils.py +++ b/utils.py @@ -28,7 +28,7 @@ def emit_event(controller, name, context="kytos/mef_eline", content=None, def merge_flow_dicts( - dst: dict[str, list], *srcs: list[dict[str, list]] + dst: dict[str, list], *srcs: dict[str, list] ) -> dict[str, list]: """Merge srcs dict flows into dst.""" for src in srcs: