Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add consistency check to remove alien flows #172

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
from threading import Lock

import requests
from flask import jsonify, request
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
MethodNotAllowed, NotFound,
Expand Down Expand Up @@ -101,6 +102,56 @@ def execute_consistency(self):
log.info(f"EVC found in mongodb but unloaded {circuit_id}")
self._load_evc(stored_circuits[circuit_id])

if self.execution_rounds > settings.WAIT_FOR_OLD_PATH:
self.check_consistency_alien_flows()

def check_consistency_alien_flows(self):
"""Check flows consistency based on mef_eline cookie prefix and remove
alien flows."""
expected_flows = {}
for evc in self.circuits.values():
if not evc.is_enabled():
continue
expected_flows[evc.get_cookie()] = evc.get_flows_match()

response = requests.get(settings.MANAGER_URL + "/flows")
if response.status_code != 200:
return

flows = response.json()
for dpid in flows.keys():
remove_flows = []
for flow in flows[dpid]['flows']:
if flow.get('cookie', 0) & 0xff00000000000000 != 0xaa << 56:
continue
current_flow = {
"match": flow["match"],
"cookie": flow["cookie"],
}
if (
flow['cookie'] not in expected_flows or
dpid not in expected_flows[flow['cookie']] or
current_flow not in expected_flows[flow['cookie']][dpid]
):
remove_flows.append({
"cookie": flow['cookie'],
"cookie_mask": 18446744073709551615,
"match": flow['match'],
})
if remove_flows:
log.info(
f"Inconsistency found. Remove flows dpid={dpid} "
f"flows={remove_flows}"
)
emit_event(
self.controller,
context="kytos.flow_manager",
name="flows.delete",
dpid=dpid,
flow_dict={"flows": remove_flows},
force=True,
)

def shutdown(self):
"""Execute when your napp is unloaded.

Expand Down Expand Up @@ -427,7 +478,6 @@ def list_schedules(self):
"""
log.debug("list_schedules /v2/evc/schedule")
circuits = self.mongo_controller.get_circuits()['circuits'].values()
print(circuits)
if not circuits:
result = {}
status = 200
Expand Down
157 changes: 105 additions & 52 deletions models/evc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Classes used in the main application.""" # pylint: disable=too-many-lines
from collections import OrderedDict
from datetime import datetime
from threading import Lock
from uuid import uuid4
Expand Down Expand Up @@ -451,7 +452,7 @@ def remove_current_flows(self, current_path=None, force=True):

for switch in switches:
try:
self._send_flow_mods(switch, [match], 'delete', force=force)
self._send_flow_mods(switch.id, [match], 'delete', force=force)
except FlowModException:
log.error(
f"Error removing flows from switch {switch.id} for"
Expand Down Expand Up @@ -548,12 +549,8 @@ def deploy_to_path(self, path=None): # pylint: disable=too-many-branches
log.info(f"{self} was deployed.")
return True

def _install_direct_uni_flows(self):
"""Install flows connecting two UNIs.

This case happens when the circuit is between UNIs in the
same switch.
"""
def _prepare_direct_uni_flows(self):
"""Prepare flows connecting two UNIs for intra-switch EVC."""
vlan_a = self.uni_a.user_tag.value if self.uni_a.user_tag else None
vlan_z = self.uni_z.user_tag.value if self.uni_z.user_tag else None

Expand Down Expand Up @@ -585,12 +582,22 @@ def _install_direct_uni_flows(self):
flow_mod_az["actions"].insert(
0, {"action_type": "set_vlan", "vlan_id": vlan_z}
)
self._send_flow_mods(
self.uni_a.interface.switch, [flow_mod_az, flow_mod_za]
return (
self.uni_a.interface.switch.id, [flow_mod_az, flow_mod_za]
)

def _install_nni_flows(self, path=None):
"""Install NNI flows."""
def _install_direct_uni_flows(self):
"""Install flows connecting two UNIs.

This case happens when the circuit is between UNIs in the
same switch.
"""
(dpid, flows) = self._prepare_direct_uni_flows()
self._send_flow_mods(dpid, flows)

def _prepare_nni_flows(self, path=None):
"""Prepare NNI flows."""
nni_flows = OrderedDict()
for incoming, outcoming in self.links_zipped(path):
in_vlan = incoming.get_metadata("s_vlan").value
out_vlan = outcoming.get_metadata("s_vlan").value
Expand All @@ -617,13 +624,20 @@ def _install_nni_flows(self, path=None):
queue_id=self.queue_id,
)
)
self._send_flow_mods(incoming.endpoint_b.switch, flows)
nni_flows[incoming.endpoint_b.switch.id] = flows
return nni_flows

def _install_uni_flows(self, path=None):
"""Install UNI flows."""
def _install_nni_flows(self, path=None):
"""Install NNI flows."""
for dpid, flows in self._prepare_nni_flows(path).items():
self._send_flow_mods(dpid, flows)

def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
"""Prepare flows to install UNIs."""
uni_flows = {}
if not path:
log.info("install uni flows without path.")
return
return uni_flows

# Determine VLANs
in_vlan_a = self.uni_a.user_tag.value if self.uni_a.user_tag else None
Expand All @@ -636,64 +650,103 @@ def _install_uni_flows(self, path=None):
flows_a = []

# Flow for one direction, pushing the service tag
push_flow = self._prepare_push_flow(
self.uni_a.interface,
path[0].endpoint_a,
in_vlan_a,
out_vlan_a,
in_vlan_z,
queue_id=self.queue_id,
)
flows_a.append(push_flow)
if not skip_in:
push_flow = self._prepare_push_flow(
self.uni_a.interface,
path[0].endpoint_a,
in_vlan_a,
out_vlan_a,
in_vlan_z,
queue_id=self.queue_id,
)
flows_a.append(push_flow)

# Flow for the other direction, popping the service tag
pop_flow = self._prepare_pop_flow(
path[0].endpoint_a,
self.uni_a.interface,
out_vlan_a,
queue_id=self.queue_id,
)
flows_a.append(pop_flow)
if not skip_out:
pop_flow = self._prepare_pop_flow(
path[0].endpoint_a,
self.uni_a.interface,
out_vlan_a,
queue_id=self.queue_id,
)
flows_a.append(pop_flow)

self._send_flow_mods(self.uni_a.interface.switch, flows_a)
uni_flows[self.uni_a.interface.switch.id] = flows_a

# Flows for the second UNI
flows_z = []

# Flow for one direction, pushing the service tag
push_flow = self._prepare_push_flow(
self.uni_z.interface,
path[-1].endpoint_b,
in_vlan_z,
out_vlan_z,
in_vlan_a,
queue_id=self.queue_id,
)
flows_z.append(push_flow)
if not skip_in:
push_flow = self._prepare_push_flow(
self.uni_z.interface,
path[-1].endpoint_b,
in_vlan_z,
out_vlan_z,
in_vlan_a,
queue_id=self.queue_id,
)
flows_z.append(push_flow)

# Flow for the other direction, popping the service tag
pop_flow = self._prepare_pop_flow(
path[-1].endpoint_b,
self.uni_z.interface,
out_vlan_z,
queue_id=self.queue_id,
)
flows_z.append(pop_flow)
if not skip_out:
pop_flow = self._prepare_pop_flow(
path[-1].endpoint_b,
self.uni_z.interface,
out_vlan_z,
queue_id=self.queue_id,
)
flows_z.append(pop_flow)

self._send_flow_mods(self.uni_z.interface.switch, flows_z)
uni_flows[self.uni_z.interface.switch.id] = flows_z

return uni_flows

def _install_uni_flows(self, path=None, skip_in=False, skip_out=False):
"""Install UNI flows."""
uni_flows = self._prepare_uni_flows(path, skip_in, skip_out)

for (dpid, flows) in uni_flows.items():
self._send_flow_mods(dpid, flows)

def get_flows_match(self):
"""Get the list of flows for the EVC (only match fields)."""
evc_flows = OrderedDict()
if self.uni_a.interface.switch == self.uni_z.interface.switch:
(dpid, flows) = self._prepare_direct_uni_flows()
for flow in flows:
del flow["actions"]
evc_flows[dpid] = flows
return evc_flows

nni_flows = self._prepare_nni_flows(self.current_path)
for dpid, flows in nni_flows.items():
for flow in flows:
del flow["actions"]
evc_flows.setdefault(dpid, [])
evc_flows[dpid].extend(flows)

uni_flows = self._prepare_uni_flows(self.current_path)
for (dpid, flows) in uni_flows.items():
for flow in flows:
del flow["actions"]
evc_flows.setdefault(dpid, [])
evc_flows[dpid].extend(flows)

return evc_flows

@staticmethod
def _send_flow_mods(switch, flow_mods, command='flows', force=False):
def _send_flow_mods(dpid, flow_mods, command='flows', force=False):
"""Send a flow_mod list to a specific switch.

Args:
switch(Switch): The target of flows.
dpid(str): The target of flows (i.e. Switch.id).
flow_mods(dict): Python dictionary with flow_mods.
command(str): By default is 'flows'. To remove a flow is 'remove'.
force(bool): True to send via consistency check in case of errors

"""
endpoint = f"{settings.MANAGER_URL}/{command}/{switch.id}"
endpoint = f"{settings.MANAGER_URL}/{command}/{dpid}"

data = {"flows": flow_mods, "force": force}
response = requests.post(endpoint, json=data)
Expand Down
Loading