Skip to content

Commit

Permalink
Redid handling of link down
Browse files Browse the repository at this point in the history
  • Loading branch information
Ktmi committed Dec 4, 2024
1 parent a47e289 commit 8b24f76
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 66 deletions.
208 changes: 143 additions & 65 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
import traceback
from collections import defaultdict
from contextlib import ExitStack
from copy import deepcopy
from threading import Lock
from typing import Optional
Expand Down Expand Up @@ -76,7 +77,7 @@ def setup(self):
self.load_all_evcs()
self._topology_updated_at = None

def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
"""Get circuits sorted by desc service level and asc creation_time.
In the future, as more ops are offloaded it should be get from the DB.
Expand Down Expand Up @@ -829,79 +830,156 @@ def handle_link_down(self, event):
"""Change circuit when link is down or under_mantenance."""
link = event.content["link"]
log.info("Event handle_link_down %s", link)
switch_flows = {}
evcs_with_failover = []
evcs_normal = []
check_failover = []
failover_event_contents = {}

for evc in self.get_evcs_by_svc_level():
with evc.lock:
if evc.is_affected_by_link(link):
evc.affected_by_link_at = event.timestamp
# if there is no failover path, handles link down the
# tradditional way
if (
with ExitStack() as exit_stack:
exit_stack.enter_context(self._lock)
swap_to_failover = list[EVC]()
redeploy = list[EVC]()
clear_failover = list[EVC]()
clear_old_path = list[EVC]()
evcs_to_update = list[EVC]()

for evc in self.get_evcs_by_svc_level():
with ExitStack() as sub_stack:
sub_stack.enter_context(evc.lock)
if all((
evc.is_affected_by_link(link),
evc.failover_path,
not evc.is_failover_path_affected_by_link(link)
)):
swap_to_failover.append(evc)
elif all((
evc.is_affected_by_link(link),
not evc.failover_path or
evc.is_failover_path_affected_by_link(link)
):
evcs_normal.append(evc)
)):
redeploy.append(evc)
elif all((
not evc.is_affected_by_link(link),
evc.failover_path,
evc.is_failover_path_affected_by_link(link)
)):
clear_failover.append(evc)
else:
continue
try:
dpid_flows = evc.get_failover_flows()
evc.old_path = evc.current_path
evc.current_path = evc.failover_path
evc.failover_path = Path([])
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(
"Ignore Failover path for "
f"{evc} due to error: {err}"

exit_stack.push(sub_stack.pop_all())

# Swap from current path to failover path

swap_to_failover_flows = {}
failover_swap_ready = list[EVC]()
failover_swap_event_contents = {}
for evc in swap_to_failover:
try:
new_flows = evc.get_failover_flows()
evc.old_path = evc.current_path
evc.current_path = evc.failover_path
evc.failover_path = Path([])
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(
"Ignore Failover path for "
f"{evc} due to error: {err}"
)
redeploy.append(evc)
continue
swap_to_failover_flows = merge_flow_dicts(
swap_to_failover_flows,
new_flows
)
failover_swap_ready.append(evc)
failover_swap_event_contents[evc.id] = map_evc_event_content(
evc,
flows=deepcopy(new_flows)
)
clear_old_path.append(evc)

if failover_swap_ready:
emit_event(
self.controller, "failover_link_down",
content=deepcopy(failover_swap_event_contents)
)
send_flow_mods_event(
self.controller,
swap_to_failover_flows,
'install'
)
# evcs_to_update.extend(failover_swap_ready)

# Clear out flows from failover path

for evc in clear_failover:
evc.old_path = evc.failover_path
evc.failover_path = Path([])
clear_old_path.append(evc)

# Clear out old flows

clear_old_path_flows = {}
clear_old_path_ready = list[EVC]()
clear_old_path_event_contents = {}

for evc in clear_old_path:
try:
del_flows = prepare_delete_flow(
merge_flow_dicts(
evc._prepare_uni_flows(evc.old_path, skip_in=True),
evc._prepare_nni_flows(evc.old_path)
)
evcs_normal.append(evc)
continue
for dpid, flows in dpid_flows.items():
switch_flows.setdefault(dpid, [])
switch_flows[dpid].extend(flows)
evcs_with_failover.append(evc)
failover_event_contents[evc.id] = map_evc_event_content(
evc,
flows={k: v.copy() for k, v in switch_flows.items()}
)
elif evc.is_failover_path_affected_by_link(link):
evc.old_path = evc.failover_path
evc.failover_path = Path([])
check_failover.append(evc)

if failover_event_contents:
emit_event(self.controller, "failover_link_down",
content=deepcopy(failover_event_contents))
send_flow_mods_event(self.controller, switch_flows, 'install')

for evc in evcs_normal:
emit_event(
self.controller,
"evc_affected_by_link_down",
content={"link": link} | map_evc_event_content(evc)
)
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(f"Fail to remove {evc} old_path: {err}")
continue
clear_old_path_flows = merge_flow_dicts(
clear_old_path_flows,
del_flows
)
clear_old_path_ready.append(evc)
clear_old_path_event_contents[evc.id] = map_evc_event_content(
evc,
removed_flows=deepcopy(del_flows)
)

evcs_to_update = []
for evc in evcs_with_failover:
evcs_to_update.append(evc.as_dict())
log.info(
f"{evc} redeployed with failover due to link down {link.id}"
)
for evc in check_failover:
evcs_to_update.append(evc.as_dict())
if clear_old_path_ready:
send_flow_mods_event(
self.controller,
clear_old_path_flows,
'delete'
)
emit_event(
self.controller,
"failover_old_path",
content=clear_old_path_event_contents
)
evcs_to_update.extend(clear_old_path_ready)

self.mongo_controller.update_evcs(evcs_to_update)

emit_event(
self.controller,
"cleanup_evcs_old_path",
content={"evcs": evcs_with_failover + check_failover}
)
# Handle redeploys
# I think this uses a separate syncing mechanism here, so don't push to DB.

for evc in redeploy:
result = evc.handle_link_down()
event_name = "error_redeploy_link_down"
if result:
log.info(f"{evc} redeployed due to link down {link.id}")
event_name = "redeployed_link_down"
emit_event(
self.controller,
event_name,
content=map_evc_event_content(evc)
)


# Push update to DB

if evcs_to_update:
self.mongo_controller.update_evcs(
[evc.as_dict() for evc in evcs_to_update]
)


@listen_to("kytos/mef_eline.evc_affected_by_link_down")
def on_evc_affected_by_link_down(self, event):
Expand Down
2 changes: 1 addition & 1 deletion utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def emit_event(controller, name, context="kytos/mef_eline", content=None,


def merge_flow_dicts(
dst: dict[str, list], *srcs: list[dict[str, list]]
dst: dict[str, list], *srcs: dict[str, list]
) -> dict[str, list]:
"""Merge srcs dict flows into dst."""
for src in srcs:
Expand Down

0 comments on commit 8b24f76

Please sign in to comment.