Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] index stored_flows by cookie #41

Merged
merged 20 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ Removed
Security
========

[5.1.0] - 2021-11.05
********************

Changed
=======
- ``stored_flows`` are now indexed by cookie, issue 34
- Changed the ``flow_persistence`` data structured on storehouse
- Refactored the consistency checks methods accordingly to use cookie indexes


Deprecated
==========
- The prior ``flow_persistence`` data structure isn't supported anymore. It's required to delete the ``kytos.flow.persistence`` folder, upgrading won't be supported this time.


[4.1.2] - 2021-11.03
********************

Expand Down
2 changes: 1 addition & 1 deletion kytos.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"username": "kytos",
"name": "flow_manager",
"description": "Manage switches' flows through a REST API.",
"version": "4.1.2",
"version": "5.1.0",
viniarck marked this conversation as resolved.
Show resolved Hide resolved
"napp_dependencies": ["kytos/of_core", "kytos/storehouse"],
"license": "MIT",
"url": "https://github.com/kytos/flow_manager.git",
Expand Down
283 changes: 152 additions & 131 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""kytos/flow_manager NApp installs, lists and deletes switch flows."""

# pylint: disable=relative-beyond-top-level
from collections import OrderedDict
import itertools
from collections import OrderedDict, defaultdict
from copy import deepcopy
from threading import Lock

Expand Down Expand Up @@ -104,9 +105,8 @@ def setup(self):
self._storehouse_lock = Lock()

# Format of stored flow data:
# {'flow_persistence': {'dpid_str': {'flow_list': [
# {'command': '<add|delete>',
# 'flow': {flow_dict}}]}}}
# {'flow_persistence': {'dpid_str': {cookie_val: [
# {'flow': {flow_dict}}]}}}
self.stored_flows = {}
self.resent_flows = set()

Expand All @@ -122,6 +122,10 @@ def shutdown(self):
"""Shutdown routine of the NApp."""
log.debug("flow-manager stopping")

def stored_flows_list(self, dpid):
"""Ordered list of all stored flows given a dpid."""
return itertools.chain(*list(self.stored_flows[dpid].values()))

@listen_to("kytos/of_core.handshake.completed")
def resend_stored_flows(self, event):
"""Resend stored Flows."""
Expand All @@ -135,11 +139,9 @@ def resend_stored_flows(self, event):
log.debug(f"Flow already resent to the switch {dpid}")
return
if dpid in self.stored_flows:
flow_list = self.stored_flows[dpid]["flow_list"]
for flow in flow_list:
command = flow["command"]
for flow in self.stored_flows_list(dpid):
flows_dict = {"flows": [flow["flow"]]}
self._install_flows(command, flows_dict, [switch])
self._install_flows("add", flows_dict, [switch])
self.resent_flows.add(dpid)
log.info(f"Flows resent to Switch {dpid}")

Expand All @@ -160,94 +162,96 @@ def is_ignored(field, ignored_range):
return True
return False

def consistency_ignored_check(self, flow):
"""Check if the flow is in the list of flows ignored by consistency.

Check by `cookie` range and `table_id` range.
Return True if the flow is in the ignored range, otherwise return
False.
"""
# Check by cookie
if self.is_ignored(flow.cookie, self.cookie_ignored_range):
return True

# Check by `table_id`
if self.is_ignored(flow.table_id, self.tab_id_ignored_range):
return True
return False

@listen_to("kytos/of_core.flow_stats.received")
def on_flow_stats_check_consistency(self, event):
"""Check the consistency of a switch upon receiving flow stats."""
if not ENABLE_CONSISTENCY_CHECK:
self.check_consistency(event.content["switch"])

def check_consistency(self, switch):
"""Check consistency of stored and installed flows given a switch."""
if not ENABLE_CONSISTENCY_CHECK or not switch.is_enabled():
return
switch = event.content["switch"]
if switch.is_enabled():
self.check_storehouse_consistency(switch)
if switch.dpid in self.stored_flows:
self.check_switch_consistency(switch)
log.debug(f"check_consistency on switch {switch.id} has started")
self.check_storehouse_consistency(switch)
if switch.dpid in self.stored_flows:
self.check_switch_consistency(switch)
log.debug(f"check_consistency on switch {switch.id} is done")

@staticmethod
def switch_flows_by_cookie(switch):
"""Build switch.flows indexed by cookie."""
installed_flows = defaultdict(list)
for cookie, flows in itertools.groupby(switch.flows, lambda x: x.cookie):
for flow in flows:
installed_flows[cookie].append(flow)
return installed_flows

def check_switch_consistency(self, switch):
"""Check consistency of installed flows for a specific switch."""
"""Check consistency of stored flows for a specific switch."""
dpid = switch.dpid

# Flows stored in storehouse
stored_flows = self.stored_flows[dpid]["flow_list"]

serializer = FlowFactory.get_class(switch)
installed_flows = self.switch_flows_by_cookie(switch)

for stored_flow in stored_flows:
stored_time = get_time(stored_flow.get("created_at", "0001-01-01T00:00:00"))
if (now() - stored_time).seconds <= STATS_INTERVAL:
continue
command = stored_flow["command"]
stored_flow_obj = serializer.from_dict(stored_flow["flow"], switch)

flow = {"flows": [stored_flow["flow"]]}
for cookie, stored_flows in self.stored_flows[dpid].items():
for stored_flow in stored_flows:
stored_time = get_time(
stored_flow.get("created_at", "0001-01-01T00:00:00")
)
if (now() - stored_time).seconds <= STATS_INTERVAL:
continue
stored_flow_obj = serializer.from_dict(stored_flow["flow"], switch)
if stored_flow_obj in installed_flows[cookie]:
continue

if stored_flow_obj not in switch.flows:
if command == "add":
log.info("A consistency problem was detected in " f"switch {dpid}.")
viniarck marked this conversation as resolved.
Show resolved Hide resolved
self._install_flows(command, flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be "
f"installed. Flow: {flow}"
)
log.info(f"Consistency check: missing flow on switch {dpid}.")
flow = {"flows": [stored_flow["flow"]]}
self._install_flows("add", flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be installed. Flow: {flow}"
)

def check_storehouse_consistency(self, switch):
"""Check consistency of installed flows for a specific switch."""
dpid = switch.dpid

for installed_flow in switch.flows:

# Check if the flow is in the ignored flow list
if self.consistency_ignored_check(installed_flow):
for cookie, flows in self.switch_flows_by_cookie(switch).items():
if self.is_ignored(cookie, self.cookie_ignored_range):
continue

if dpid not in self.stored_flows:
log.info("A consistency problem was detected in " f"switch {dpid}.")
flow = {"flows": [installed_flow.as_dict()]}
command = "delete_strict"
self._install_flows(command, flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be deleted." f" Flow: {flow}"
)
else:
serializer = FlowFactory.get_class(switch)
stored_flows = self.stored_flows[dpid]["flow_list"]
stored_flows_list = [
serializer.from_dict(stored_flow["flow"], switch)
for stored_flow in stored_flows
]
serializer = FlowFactory.get_class(switch)
stored_flows_list = [
serializer.from_dict(stored_flow["flow"], switch)
for stored_flow in self.stored_flows.get(dpid, {}).get(cookie, [])
]
log.debug(
viniarck marked this conversation as resolved.
Show resolved Hide resolved
f"stored_flows_list on switch {switch.id} by cookie: {hex(cookie)}: "
f"{self.stored_flows.get(dpid, {}).get(cookie, [])}"
)

for installed_flow in flows:
if self.is_ignored(installed_flow.table_id, self.tab_id_ignored_range):
continue

if dpid not in self.stored_flows:
log.info(
f"Consistency check: alien flow on switch {dpid}, dpid"
" not indexed"
)
flow = {"flows": [installed_flow.as_dict()]}
command = "delete_strict"
self._install_flows(command, flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}"
)
continue

if installed_flow not in stored_flows_list:
log.info("A consistency problem was detected in " f"switch {dpid}.")
log.info(f"Consistency check: alien flow on switch {dpid}")
flow = {"flows": [installed_flow.as_dict()]}
command = "delete_strict"
self._install_flows(command, flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be deleted."
f" Flow: {flow}"
f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}"
)

# pylint: disable=attribute-defined-outside-init
Expand All @@ -263,77 +267,94 @@ def _load_flows(self):
else:
log.info("Flows loaded.")

def _store_changed_flows(self, command, flow, switch):
"""Store changed flows.

Args:
command: Flow command to be installed
flow: Flows to be stored
switch: Switch target
"""
def _del_matched_flows_store(self, flow_dict, switch):
"""Try to delete matching stored flows given a flow dict."""
stored_flows_box = deepcopy(self.stored_flows)
# if the flow has a destination dpid it can be stored.
if not switch:
log.info(
"The Flow cannot be stored, the destination switch "
f"have not been specified: {switch}"
)

if switch.id not in stored_flows_box:
return
installed_flow = {}
installed_flow["command"] = command
installed_flow["flow"] = flow
installed_flow["created_at"] = now().strftime("%Y-%m-%dT%H:%M:%S")
should_persist_flow = command == "add"
deleted_flows_idxs = set()

serializer = FlowFactory.get_class(switch)
installed_flow_obj = serializer.from_dict(flow, switch)
cookies = (
self.stored_flows[switch.id].keys()
if flow_dict.get("cookie") is None
else [int(flow_dict.get("cookie", 0))]
)

if switch.id not in stored_flows_box:
# Switch not stored, add to box.
if should_persist_flow:
stored_flows_box[switch.id] = {"flow_list": [installed_flow]}
else:
stored_flows = stored_flows_box[switch.id].get("flow_list", [])
# Check if flow already stored
for i, stored_flow in enumerate(stored_flows):
stored_flow_obj = serializer.from_dict(stored_flow["flow"], switch)
has_deleted_any_flow = False
for cookie in cookies:
stored_flows = stored_flows_box[switch.id].get(cookie, [])
if not stored_flows:
continue

deleted_flows_idxs = set()
for i, stored_flow in enumerate(stored_flows):
version = switch.connection.protocol.version

if installed_flow["command"] == "delete":
# No strict match
if match_flow(flow, version, stored_flow["flow"]):
deleted_flows_idxs.add(i)

elif installed_flow_obj == stored_flow_obj:
if stored_flow["command"] == installed_flow["command"]:
log.debug("Data already stored.")
return
# Flow with inconsistency in "command" fields : Remove the
# old instruction. This happens when there is a stored
# instruction to install the flow, but the new instruction
# is to remove it. In this case, the old instruction is
# removed and the new one is stored.
stored_flow["command"] = installed_flow.get("command")
# No strict match
if match_flow(flow_dict, version, stored_flow["flow"]):
deleted_flows_idxs.add(i)
break

if deleted_flows_idxs:
stored_flows = [
flow
for i, flow in enumerate(stored_flows)
if i not in deleted_flows_idxs
]
if should_persist_flow:
stored_flows.append(installed_flow)
stored_flows_box[switch.id]["flow_list"] = stored_flows

if not deleted_flows_idxs:
continue

stored_flows = [
flow
for i, flow in enumerate(stored_flows)
if i not in deleted_flows_idxs
]
has_deleted_any_flow = True

if stored_flows:
stored_flows_box[switch.id][cookie] = stored_flows
else:
stored_flows_box[switch.id].pop(cookie, None)

if has_deleted_any_flow:
stored_flows_box["id"] = "flow_persistence"
self.storehouse.save_flow(stored_flows_box)
del stored_flows_box["id"]
self.stored_flows = deepcopy(stored_flows_box)

# pylint: disable=fixme
def _add_flow_store(self, flow_dict, switch):
"""Try to add a flow dict in the store."""
installed_flow = {}
installed_flow["flow"] = flow_dict
installed_flow["created_at"] = now().strftime("%Y-%m-%dT%H:%M:%S")

stored_flows_box = deepcopy(self.stored_flows)
cookie = int(flow_dict.get("cookie", 0))
if switch.id not in stored_flows_box:
stored_flows_box[switch.id] = OrderedDict()

# TODO handle issue 23 (overlapping FlowMod add)
if not stored_flows_box[switch.id].get(cookie):
stored_flows_box[switch.id][cookie] = [installed_flow]
else:
stored_flows_box[switch.id][cookie].append(installed_flow)
italovalcy marked this conversation as resolved.
Show resolved Hide resolved

stored_flows_box["id"] = "flow_persistence"
self.storehouse.save_flow(stored_flows_box)
del stored_flows_box["id"]
self.stored_flows = deepcopy(stored_flows_box)

def _store_changed_flows(self, command, flow_dict, switch):
"""Store changed flows.

Args:
command: Flow command to be installed
flow: flow dict to be stored
switch: Switch target
"""
cmd_handlers = {
"add": self._add_flow_store,
"delete": self._del_matched_flows_store,
}
if command not in cmd_handlers:
raise ValueError(
f"Invalid command: {command}, supported: {list(cmd_handlers.keys())}"
)
return cmd_handlers[command](flow_dict, switch)

@rest("v2/flows")
@rest("v2/flows/<dpid>")
def list(self, dpid=None):
Expand Down
Loading