Skip to content

Commit

Permalink
Redid handling of link down
Browse files Browse the repository at this point in the history
 - Ensure that entire operation is atomic, or as close to as reasonably possible
 - Ensure that the state of the EVC is properly progressed in link down scenarios
 - Ensure that vlans are cleared for removed paths.
  • Loading branch information
Ktmi committed Dec 19, 2024
1 parent a47e289 commit 1b60403
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 66 deletions.
336 changes: 271 additions & 65 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
import traceback
from collections import defaultdict
from contextlib import ExitStack
from copy import deepcopy
from threading import Lock
from typing import Optional
Expand Down Expand Up @@ -65,18 +66,19 @@ def setup(self):

# dictionary of EVCs created. It acts as a circuit buffer.
# Every create/update/delete must be synced to mongodb.
self.circuits = {}
self.circuits = dict[str, EVC]()

self._intf_events = defaultdict(dict)
self._lock_interfaces = defaultdict(Lock)
self.table_group = {"epl": 0, "evpl": 0}
self._lock = Lock()
self.multi_evc_lock = Lock()
self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)

self.load_all_evcs()
self._topology_updated_at = None

def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
"""Get circuits sorted by desc service level and asc creation_time.
In the future, as more ops are offloaded it should be get from the DB.
Expand Down Expand Up @@ -823,85 +825,289 @@ def on_link_down(self, event):
"""Change circuit when link is down or under_mantenance."""
self.handle_link_down(event)

def prepare_swap_to_failover(self, evc: EVC):
"""Prepare an evc for switching to failover."""
install_flows = {}
try:
install_flows = evc.get_failover_flows()
evc.old_path = evc.current_path
evc.current_path = evc.failover_path
evc.failover_path = Path([])
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(
"Ignore Failover path for "
f"{evc} due to error: {err}"
)
return install_flows

def execute_swap_to_failover(self, event_contents, install_flows):
"""Process changes needed to commit a swap to failover."""
emit_event(
self.controller, "failover_link_down",
content=deepcopy(event_contents)
)
send_flow_mods_event(
self.controller,
install_flows,
"install"
)

def prepare_clear_old_path(self, evc: EVC):
"""Prepare an evc for clearing the old path."""
del_flows = {}
try:
del_flows = prepare_delete_flow(
merge_flow_dicts(
evc._prepare_uni_flows(evc.old_path, skip_in=True),
evc._prepare_nni_flows(evc.old_path)
)
)
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(f"Fail to remove {evc} old_path: {err}")
return del_flows

def execute_clear_old_path(
self,
evcs: list[EVC],
event_contents,
delete_flows
):
"""Process changes needed to commit clearing the old path"""
send_flow_mods_event(
self.controller,
delete_flows,
'delete'
)
emit_event(
self.controller,
"failover_old_path",
content=event_contents
)
for evc in evcs:
evc.old_path.make_vlans_available(self.controller)
evc.old_path = Path([])

def prepare_undeploy(self, evc: EVC):
"""Prepare an evc for undeploying."""
del_flows = {}
try:
del_flows = prepare_delete_flow(
merge_flow_dicts(
evc._prepare_uni_flows(evc.current_path, skip_in=True),
evc._prepare_nni_flows(evc.current_path),
evc._prepare_nni_flows(evc.failover_path)
)
)
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(f"Fail to undeploy {evc}: {err}")
pass
return del_flows

def execute_undeploy(self, evcs: list[EVC], remove_flows):
"""Process changes needed to commit an undeploy"""
send_flow_mods_event(
self.controller,
remove_flows,
'delete'
)

for evc in evcs:
evc.current_path.make_vlans_available(self.controller)
evc.failover_path.make_vlans_available(self.controller)
evc.current_path = Path([])
evc.failover_path = Path([])
evc.deactivate()
emit_event(
self.controller,
"need_redeploy",
content={"evc_id": evc.id}
)
log.info(f"{evc} scheduled for redeploy")

# pylint: disable=too-many-branches
# pylint: disable=too-many-locals
def handle_link_down(self, event):
"""Change circuit when link is down or under_mantenance."""
link = event.content["link"]
log.info("Event handle_link_down %s", link)
switch_flows = {}
evcs_with_failover = []
evcs_normal = []
check_failover = []
failover_event_contents = {}

for evc in self.get_evcs_by_svc_level():
with evc.lock:
if evc.is_affected_by_link(link):
evc.affected_by_link_at = event.timestamp
# if there is no failover path, handles link down the
# tradditional way
if (
with ExitStack() as exit_stack:
exit_stack.enter_context(self.multi_evc_lock)
swap_to_failover = list[EVC]()
undeploy = list[EVC]()
clear_failover = list[EVC]()
clear_old_path = list[EVC]()
evc_dict = dict[str, EVC]()
evcs_to_update = list[EVC]()

flow_modifications = defaultdict[str, dict[str, dict[list]]](dict)
event_contents = defaultdict[str, dict[str, dict]](dict)

for evc in self.get_evcs_by_svc_level():
evc_dict[evc.id] = evc
with ExitStack() as sub_stack:
sub_stack.enter_context(evc.lock)
if all((
evc.is_affected_by_link(link),
evc.failover_path,
not evc.is_failover_path_affected_by_link(link)
)):
swap_to_failover.append(evc)
elif all((
evc.is_affected_by_link(link),
not evc.failover_path or
evc.is_failover_path_affected_by_link(link)
):
evcs_normal.append(evc)
continue
try:
dpid_flows = evc.get_failover_flows()
evc.old_path = evc.current_path
evc.current_path = evc.failover_path
evc.failover_path = Path([])
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(
"Ignore Failover path for "
f"{evc} due to error: {err}"
)
evcs_normal.append(evc)
)):
undeploy.append(evc)
elif all((
not evc.is_affected_by_link(link),
evc.failover_path,
evc.is_failover_path_affected_by_link(link)
)):
clear_failover.append(evc)
else:
continue
for dpid, flows in dpid_flows.items():
switch_flows.setdefault(dpid, [])
switch_flows[dpid].extend(flows)
evcs_with_failover.append(evc)
failover_event_contents[evc.id] = map_evc_event_content(
evc,
flows={k: v.copy() for k, v in switch_flows.items()}

exit_stack.push(sub_stack.pop_all())

# Swap from current path to failover path

for evc in swap_to_failover:
new_flows = self.prepare_swap_to_failover(evc)
if new_flows:
flow_modifications[evc.id]["swap_to_failover"] =\
new_flows
event_contents[evc.id]["swap_to_failover"] =\
map_evc_event_content(
evc,
flows=deepcopy(new_flows)
)
clear_old_path.append(evc)
elif evc.id in flow_modifications:
del flow_modifications[evc.id]
del event_contents[evc.id]
undeploy.append(evc)

for evc in clear_failover:
evc.old_path = evc.failover_path
evc.failover_path = Path([])
clear_old_path.append(evc)

# Clear out old flows

for evc in clear_old_path:
del_flows = self.prepare_clear_old_path(evc)
if del_flows:
flow_modifications[evc.id]["clear_old_path"] = del_flows
event_contents[evc.id]["clear_old_path"] =\
map_evc_event_content(
evc,
removed_flows=deepcopy(del_flows)
)
elif evc.is_failover_path_affected_by_link(link):
evc.old_path = evc.failover_path
evc.failover_path = Path([])
check_failover.append(evc)
elif evc.id in flow_modifications:
if "swap_to_failover" in flow_modifications[evc.id]:
evc.failover_path = evc.current_path
evc.current_path = evc.old_path
else:
evc.failover_path = evc.old_path
evc.old_path = Path([])
del flow_modifications[evc.id]
del event_contents[evc.id]
undeploy.append(evc)

if failover_event_contents:
emit_event(self.controller, "failover_link_down",
content=deepcopy(failover_event_contents))
send_flow_mods_event(self.controller, switch_flows, 'install')
swap_to_failover_flows = {}
swap_to_failover_event_contents = {}

for evc in evcs_normal:
emit_event(
self.controller,
"evc_affected_by_link_down",
content={"link": link} | map_evc_event_content(evc)
)
clear_old_path_flows = {}
clear_old_path_event_contents = {}

evcs_to_update = []
for evc in evcs_with_failover:
evcs_to_update.append(evc.as_dict())
log.info(
f"{evc} redeployed with failover due to link down {link.id}"
)
for evc in check_failover:
evcs_to_update.append(evc.as_dict())
clear_old_path_reservations = list[EVC]()

self.mongo_controller.update_evcs(evcs_to_update)
for modified_evc_id in flow_modifications:
evc = evc_dict[modified_evc_id]
if "swap_to_failover" in flow_modifications[modified_evc_id]:
swap_to_failover_flows = merge_flow_dicts(
swap_to_failover_flows,
flow_modifications[modified_evc_id]["swap_to_failover"]
)
swap_to_failover_event_contents[modified_evc_id] =\
event_contents[modified_evc_id]["swap_to_failover"]
if "clear_old_path" in flow_modifications[modified_evc_id]:
clear_old_path_flows = merge_flow_dicts(
clear_old_path_flows,
flow_modifications[modified_evc_id]["clear_old_path"]
)
clear_old_path_event_contents[modified_evc_id] =\
event_contents[modified_evc_id]["clear_old_path"]
clear_old_path_reservations.append(evc)
evcs_to_update.append(evc)

if swap_to_failover_flows:
self.execute_swap_to_failover(
swap_to_failover_event_contents,
swap_to_failover_flows
)

emit_event(
self.controller,
"cleanup_evcs_old_path",
content={"evcs": evcs_with_failover + check_failover}
)
if clear_old_path_flows:
self.execute_clear_old_path(
clear_old_path_reservations,
clear_old_path_event_contents,
clear_old_path_flows
)

undeploy_flows = {}
undeploy_ready = list[EVC]()

# Undeploy the evc, schedule a redeploy

for evc in undeploy:
del_flows = self.prepare_undeploy(evc)
if del_flows:
undeploy_flows = merge_flow_dicts(
undeploy_flows,
del_flows
)
undeploy_ready.append(evc)

if undeploy_ready:
self.execute_undeploy(undeploy_ready, undeploy_flows)
evcs_to_update.extend(undeploy_ready)

# Push update to DB

if evcs_to_update:
self.mongo_controller.update_evcs(
[evc.as_dict() for evc in evcs_to_update]
)

@listen_to("kytos/mef_eline.need_redeploy")
def on_evc_need_redeploy(self, event):
"""Redeploy evcs that need to be redeployed."""
self.handle_evc_need_redeploy(event)

def handle_evc_need_redeploy(self, event):
"""Redeploy evcs that need to be redeployed."""
evc = self.circuits.get(event.content["evc_id"])
if evc is None:
return
with evc.lock:
if not evc.is_enabled() or evc.is_active():
return
result = evc.deploy_to_path()
# if result:
# evc.setup_failover_path()
event_name = "error_redeploy_link_down"
if result:
log.info(f"{evc} redeployed")
event_name = "redeployed_link_down"
emit_event(self.controller, event_name,
content=map_evc_event_content(evc))

@listen_to("kytos/mef_eline.evc_affected_by_link_down")
def on_evc_affected_by_link_down(self, event):
Expand Down
2 changes: 1 addition & 1 deletion utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def emit_event(controller, name, context="kytos/mef_eline", content=None,


def merge_flow_dicts(
dst: dict[str, list], *srcs: list[dict[str, list]]
dst: dict[str, list], *srcs: dict[str, list]
) -> dict[str, list]:
"""Merge srcs dict flows into dst."""
for src in srcs:
Expand Down

0 comments on commit 1b60403

Please sign in to comment.