Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redid handling of link down to ensure tear down of old path #583

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Added
Fixed
=======
- UI: fixed issue where non-JSON data was being parsed as JSON data.
- Fixed inconsistencies with link down behaviour. Flows and vlan reservations should now be properly cleared on link down.

Changed
=======
Expand Down
336 changes: 271 additions & 65 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
import traceback
from collections import defaultdict
from contextlib import ExitStack
from copy import deepcopy
from threading import Lock
from typing import Optional
Expand Down Expand Up @@ -65,18 +66,19 @@ def setup(self):

# dictionary of EVCs created. It acts as a circuit buffer.
# Every create/update/delete must be synced to mongodb.
self.circuits = {}
self.circuits = dict[str, EVC]()

self._intf_events = defaultdict(dict)
self._lock_interfaces = defaultdict(Lock)
self.table_group = {"epl": 0, "evpl": 0}
self._lock = Lock()
self.multi_evc_lock = Lock()
self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)

self.load_all_evcs()
self._topology_updated_at = None

def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
"""Get circuits sorted by desc service level and asc creation_time.

In the future, as more ops are offloaded it should be get from the DB.
Expand Down Expand Up @@ -823,85 +825,289 @@ def on_link_down(self, event):
"""Change circuit when link is down or under_mantenance."""
self.handle_link_down(event)

def prepare_swap_to_failover(self, evc: EVC):
"""Prepare an evc for switching to failover."""
install_flows = {}
try:
install_flows = evc.get_failover_flows()
evc.old_path = evc.current_path
evc.current_path = evc.failover_path
evc.failover_path = Path([])
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(
"Ignore Failover path for "
f"{evc} due to error: {err}"
)
return install_flows

def execute_swap_to_failover(self, event_contents, install_flows):
"""Process changes needed to commit a swap to failover."""
emit_event(
self.controller, "failover_link_down",
content=deepcopy(event_contents)
)
send_flow_mods_event(
self.controller,
install_flows,
"install"
)

def prepare_clear_old_path(self, evc: EVC):
"""Prepare an evc for clearing the old path."""
del_flows = {}
try:
del_flows = prepare_delete_flow(
merge_flow_dicts(
evc._prepare_uni_flows(evc.old_path, skip_in=True),
evc._prepare_nni_flows(evc.old_path)
)
)
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(f"Fail to remove {evc} old_path: {err}")
return del_flows

def execute_clear_old_path(
self,
evcs: list[EVC],
event_contents,
delete_flows
):
"""Process changes needed to commit clearing the old path"""
send_flow_mods_event(
self.controller,
delete_flows,
'delete'
)
emit_event(
self.controller,
"failover_old_path",
content=event_contents
)
for evc in evcs:
evc.old_path.make_vlans_available(self.controller)
evc.old_path = Path([])

def prepare_undeploy(self, evc: EVC):
"""Prepare an evc for undeploying."""
del_flows = {}
try:
del_flows = prepare_delete_flow(
merge_flow_dicts(
evc._prepare_uni_flows(evc.current_path, skip_in=True),
evc._prepare_nni_flows(evc.current_path),
evc._prepare_nni_flows(evc.failover_path)
)
)
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(f"Fail to undeploy {evc}: {err}")
pass
return del_flows

def execute_undeploy(self, evcs: list[EVC], remove_flows):
"""Process changes needed to commit an undeploy"""
send_flow_mods_event(
self.controller,
remove_flows,
'delete'
)

for evc in evcs:
evc.current_path.make_vlans_available(self.controller)
evc.failover_path.make_vlans_available(self.controller)
evc.current_path = Path([])
evc.failover_path = Path([])
evc.deactivate()
emit_event(
self.controller,
"need_redeploy",
content={"evc_id": evc.id}
)
log.info(f"{evc} scheduled for redeploy")

# pylint: disable=too-many-branches
# pylint: disable=too-many-locals
def handle_link_down(self, event):
"""Change circuit when link is down or under_mantenance."""
link = event.content["link"]
log.info("Event handle_link_down %s", link)
switch_flows = {}
evcs_with_failover = []
evcs_normal = []
check_failover = []
failover_event_contents = {}

for evc in self.get_evcs_by_svc_level():
with evc.lock:
if evc.is_affected_by_link(link):
evc.affected_by_link_at = event.timestamp
# if there is no failover path, handles link down the
# tradditional way
if (
with ExitStack() as exit_stack:
exit_stack.enter_context(self.multi_evc_lock)
swap_to_failover = list[EVC]()
undeploy = list[EVC]()
clear_failover = list[EVC]()
clear_old_path = list[EVC]()
evc_dict = dict[str, EVC]()
evcs_to_update = list[EVC]()

flow_modifications = defaultdict[str, dict[str, dict[list]]](dict)
event_contents = defaultdict[str, dict[str, dict]](dict)

for evc in self.get_evcs_by_svc_level():
evc_dict[evc.id] = evc
with ExitStack() as sub_stack:
sub_stack.enter_context(evc.lock)
if all((
evc.is_affected_by_link(link),
evc.failover_path,
not evc.is_failover_path_affected_by_link(link)
)):
swap_to_failover.append(evc)
elif all((
evc.is_affected_by_link(link),
not evc.failover_path or
evc.is_failover_path_affected_by_link(link)
):
evcs_normal.append(evc)
continue
try:
dpid_flows = evc.get_failover_flows()
evc.old_path = evc.current_path
evc.current_path = evc.failover_path
evc.failover_path = Path([])
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(
"Ignore Failover path for "
f"{evc} due to error: {err}"
)
evcs_normal.append(evc)
)):
undeploy.append(evc)
elif all((
not evc.is_affected_by_link(link),
evc.failover_path,
evc.is_failover_path_affected_by_link(link)
)):
clear_failover.append(evc)
else:
continue
for dpid, flows in dpid_flows.items():
switch_flows.setdefault(dpid, [])
switch_flows[dpid].extend(flows)
evcs_with_failover.append(evc)
failover_event_contents[evc.id] = map_evc_event_content(
evc,
flows={k: v.copy() for k, v in switch_flows.items()}

exit_stack.push(sub_stack.pop_all())

# Swap from current path to failover path

for evc in swap_to_failover:
new_flows = self.prepare_swap_to_failover(evc)
if new_flows:
flow_modifications[evc.id]["swap_to_failover"] =\
new_flows
event_contents[evc.id]["swap_to_failover"] =\
map_evc_event_content(
evc,
flows=deepcopy(new_flows)
)
clear_old_path.append(evc)
elif evc.id in flow_modifications:
del flow_modifications[evc.id]
del event_contents[evc.id]
undeploy.append(evc)

for evc in clear_failover:
evc.old_path = evc.failover_path
evc.failover_path = Path([])
clear_old_path.append(evc)

# Clear out old flows

for evc in clear_old_path:
del_flows = self.prepare_clear_old_path(evc)
if del_flows:
flow_modifications[evc.id]["clear_old_path"] = del_flows
event_contents[evc.id]["clear_old_path"] =\
map_evc_event_content(
evc,
removed_flows=deepcopy(del_flows)
)
elif evc.is_failover_path_affected_by_link(link):
evc.old_path = evc.failover_path
evc.failover_path = Path([])
check_failover.append(evc)
elif evc.id in flow_modifications:
if "swap_to_failover" in flow_modifications[evc.id]:
evc.failover_path = evc.current_path
evc.current_path = evc.old_path
else:
evc.failover_path = evc.old_path
evc.old_path = Path([])
del flow_modifications[evc.id]
del event_contents[evc.id]
undeploy.append(evc)

if failover_event_contents:
emit_event(self.controller, "failover_link_down",
content=deepcopy(failover_event_contents))
send_flow_mods_event(self.controller, switch_flows, 'install')
swap_to_failover_flows = {}
swap_to_failover_event_contents = {}

for evc in evcs_normal:
emit_event(
self.controller,
"evc_affected_by_link_down",
content={"link": link} | map_evc_event_content(evc)
)
clear_old_path_flows = {}
clear_old_path_event_contents = {}

evcs_to_update = []
for evc in evcs_with_failover:
evcs_to_update.append(evc.as_dict())
log.info(
f"{evc} redeployed with failover due to link down {link.id}"
)
for evc in check_failover:
evcs_to_update.append(evc.as_dict())
clear_old_path_reservations = list[EVC]()

self.mongo_controller.update_evcs(evcs_to_update)
for modified_evc_id in flow_modifications:
evc = evc_dict[modified_evc_id]
if "swap_to_failover" in flow_modifications[modified_evc_id]:
swap_to_failover_flows = merge_flow_dicts(
swap_to_failover_flows,
flow_modifications[modified_evc_id]["swap_to_failover"]
)
swap_to_failover_event_contents[modified_evc_id] =\
event_contents[modified_evc_id]["swap_to_failover"]
if "clear_old_path" in flow_modifications[modified_evc_id]:
clear_old_path_flows = merge_flow_dicts(
clear_old_path_flows,
flow_modifications[modified_evc_id]["clear_old_path"]
)
clear_old_path_event_contents[modified_evc_id] =\
event_contents[modified_evc_id]["clear_old_path"]
clear_old_path_reservations.append(evc)
evcs_to_update.append(evc)

if swap_to_failover_flows:
self.execute_swap_to_failover(
swap_to_failover_event_contents,
swap_to_failover_flows
)

emit_event(
self.controller,
"cleanup_evcs_old_path",
content={"evcs": evcs_with_failover + check_failover}
)
if clear_old_path_flows:
self.execute_clear_old_path(
clear_old_path_reservations,
clear_old_path_event_contents,
clear_old_path_flows
)

undeploy_flows = {}
undeploy_ready = list[EVC]()

# Undeploy the evc, schedule a redeploy

for evc in undeploy:
del_flows = self.prepare_undeploy(evc)
if del_flows:
undeploy_flows = merge_flow_dicts(
undeploy_flows,
del_flows
)
undeploy_ready.append(evc)

if undeploy_ready:
self.execute_undeploy(undeploy_ready, undeploy_flows)
evcs_to_update.extend(undeploy_ready)

# Push update to DB

if evcs_to_update:
self.mongo_controller.update_evcs(
[evc.as_dict() for evc in evcs_to_update]
)

@listen_to("kytos/mef_eline.need_redeploy")
def on_evc_need_redeploy(self, event):
"""Redeploy evcs that need to be redeployed."""
self.handle_evc_need_redeploy(event)

def handle_evc_need_redeploy(self, event):
"""Redeploy evcs that need to be redeployed."""
evc = self.circuits.get(event.content["evc_id"])
if evc is None:
return
with evc.lock:
if not evc.is_enabled() or evc.is_active():
return
result = evc.deploy_to_path()
# if result:
# evc.setup_failover_path()
event_name = "error_redeploy_link_down"
if result:
log.info(f"{evc} redeployed")
event_name = "redeployed_link_down"
emit_event(self.controller, event_name,
content=map_evc_event_content(evc))

@listen_to("kytos/mef_eline.evc_affected_by_link_down")
def on_evc_affected_by_link_down(self, event):
Expand Down
Loading
Loading