Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: subscribe and handle kytos/mef_eline.(failover_link_down|failover_old_path|failover_deployed) #104

Merged
merged 16 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Added
- Handled ``kytos/mef_eline.error_redeploy_link_down`` event.
- Handled ``kytos/mef_eline.uni_active_updated`` event.
- Handled ``kytos/mef_eline.deployed`` event.
- Handled ``kytos/mef_eline.(failover_link_down|failover_old_path|failover_deployed)`` events.

Changed
=======
Expand Down
23 changes: 22 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async def enable_telemetry(self, request: Request) -> JSONResponse:
for evc_id in evcs
]
)
await self.int_manager._remove_int_flows(stored_flows)
await self.int_manager._remove_int_flows_by_cookies(stored_flows)
await self.int_manager.enable_int(evcs, force)
except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
raise HTTPException(404, detail=str(exc))
Expand Down Expand Up @@ -429,6 +429,27 @@ async def on_evc_error_redeployed_link_down(self, event: KytosEvent) -> None:
)
await self.int_manager.remove_int_flows(evcs, metadata, force=True)

@alisten_to("kytos/mef_eline.failover_link_down")
async def on_failover_link_down(self, event: KytosEvent):
"""Handle kytos/mef_eline.failover_link_down."""
await self.int_manager.handle_failover_flows(
event.content, event_name="failover_link_down"
)

@alisten_to("kytos/mef_eline.failover_old_path")
async def on_failover_old_path(self, event: KytosEvent):
"""Handle kytos/mef_eline.failover_old_path."""
await self.int_manager.handle_failover_flows(
event.content, event_name="failover_old_path"
)

@alisten_to("kytos/mef_eline.failover_deployed")
async def on_failover_deployed(self, event: KytosEvent):
"""Handle kytos/mef_eline.failover_deployed."""
await self.int_manager.handle_failover_flows(
event.content, event_name="failover_deployed"
)

@alisten_to("kytos/topology.link_down")
async def on_link_down(self, event):
"""Handle topology.link_down."""
Expand Down
74 changes: 67 additions & 7 deletions managers/flow_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import Literal

from napps.kytos.telemetry_int import utils
from napps.kytos.telemetry_int.exceptions import FlowsNotFound
from napps.kytos.telemetry_int import settings


Expand Down Expand Up @@ -42,6 +41,71 @@ def build_int_flows(
flows_per_cookie[cookie].append(flow)
return flows_per_cookie

def build_failover_old_flows(
self, evcs: dict[str, dict], old_flows: dict[int, list[dict]]
) -> dict[int, list[dict]]:
"""Build (old path) failover related to remove flows.

If sink NNIs svlan are different, it'll regenerate the rest of sink loop flows.
Otherwise, it'll just remove the same received flows except with int cookie
value the deletion uses flow mod OFPFC_DELETE, so no need to include the
additional INT keys in the match like nw_proto for deletion.
"""

removed_flows = defaultdict(list)
for evc_id, evc in evcs.items():
dpid_a, dpid_z = evc["uni_a"]["switch"], evc["uni_z"]["switch"]

cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
int_cookie = settings.INT_COOKIE_PREFIX << 56 | (cookie & 0xFFFFFFFFFFFFFF)
cur_sink_a_svlan, cur_sink_z_svlan = None, None
sink_a_flows: list[dict] = []
sink_z_flows: list[dict] = []

for link in evc["current_path"]:
if cur_sink_a_svlan is None and (
svlan := utils.get_svlan_dpid_link(link, dpid_a)
):
cur_sink_a_svlan = svlan
if cur_sink_z_svlan is None and (
svlan := utils.get_svlan_dpid_link(link, dpid_z)
):
cur_sink_z_svlan = svlan
if cur_sink_a_svlan is not None and cur_sink_z_svlan is not None:
break

for flow in old_flows[cookie]:
if not sink_a_flows and flow["switch"] == dpid_a:
if (
flow["flow"]["match"]["dl_vlan"] != cur_sink_a_svlan
and cur_sink_a_svlan
):
sink_a_flows = self._build_int_sink_flows(
"uni_a", evc, old_flows
)
else:
flow["flow"]["cookie"] = int_cookie
sink_a_flows = [flow]
elif not sink_z_flows and flow["switch"] == dpid_z:
if (
flow["flow"]["match"]["dl_vlan"] != cur_sink_z_svlan
and cur_sink_z_svlan
):
sink_z_flows = self._build_int_sink_flows(
"uni_z", evc, old_flows
)
else:
flow["flow"]["cookie"] = int_cookie
sink_z_flows = [flow]
if sink_a_flows and sink_z_flows:
break

hop_flows = self._build_int_hop_flows(evc, old_flows)
removed_flows[cookie] = list(
itertools.chain(sink_a_flows, hop_flows, sink_z_flows)
)
return removed_flows

def _build_int_source_flows(
self,
uni_src_key: Literal["uni_a", "uni_z"],
Expand Down Expand Up @@ -73,9 +137,7 @@ def _build_int_source_flows(
break

if not new_int_flow_tbl_0_tcp:
if not evc["active"]:
return []
raise FlowsNotFound(evc["id"])
return []

utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
utils.set_new_cookie(new_int_flow_tbl_0_tcp)
Expand Down Expand Up @@ -224,9 +286,7 @@ def _build_int_sink_flows(
new_int_flow_tbl_0_tcp = copy.deepcopy(flow)

if not new_int_flow_tbl_0_tcp:
if not evc["active"]:
return []
raise FlowsNotFound(evc["id"])
continue

utils.set_new_cookie(new_int_flow_tbl_0_tcp)
utils.set_owner(new_int_flow_tbl_0_tcp)
Expand Down
Loading