Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Added archived flows #45

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
62ab0ed
Bootstraped storehouse box_flows_archive
viniarck Nov 9, 2021
2140af9
Merge branch 'fix/overlapping_flows' into feature/archived_flows
viniarck Nov 9, 2021
e843bc1
Merge branch 'fix/overlapping_flows' into feature/archived_flows
viniarck Nov 9, 2021
a2e4719
Merge branch 'fix/overlapping_flows' into feature/archived_flows
viniarck Nov 9, 2021
61ee966
Added archived_flows dict
viniarck Nov 9, 2021
3f49984
Bootstraped _load_flow_boxes
viniarck Nov 9, 2021
73ad10a
Added save_archive_flow
viniarck Nov 9, 2021
d8b5be0
Fixed _load_flows
viniarck Nov 10, 2021
0331c58
Augmented _add_archive_flows_store
viniarck Nov 10, 2021
f92c42f
Added archived flows on settings
viniarck Nov 10, 2021
5f76a9e
Merge branch 'fix/overlapping_flows' into feature/archived_flows
viniarck Nov 10, 2021
e943ba6
Fixed imports
viniarck Nov 10, 2021
3ebc3b8
Fixed arg passed to get_data
viniarck Nov 10, 2021
d4f8a22
Fixed unit tests
viniarck Nov 10, 2021
21169a6
Aumented unit tests to cover archived_flows
viniarck Nov 10, 2021
8a11dff
Added test for archived flows rotation
viniarck Nov 10, 2021
62730ef
Refactored max_delete as offset_delete
viniarck Nov 10, 2021
f6ea55c
Linter fixes
viniarck Nov 10, 2021
9139c81
Augmented unit tests
viniarck Nov 10, 2021
2db1c97
Extracted function new_archive_flow
viniarck Nov 10, 2021
6051690
Updated flow installed argument
viniarck Nov 10, 2021
46ed324
Updated settings.py
viniarck Nov 10, 2021
6b984a7
Bumped version 5.2.0
viniarck Nov 10, 2021
d05354c
Updated order of operations
viniarck Nov 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ Removed
Security
========

[5.2.0] - 2021-11.10
********************

Added
=====
- Added support for archiving overlapped and deleted flows, issue #33.
- Updated settings to set the maximum number of archived flows per switch.

Changed
=======
- Updated ``stored_flows`` box to stop using a random generated box, now it's named ``flows``. This is a breaking changing, but facilitates moving forward to also find for it, since it was assuming it only had one box in this namespace, which isn't the case anymore since we have to introduce a box for archived flows.


[5.1.0] - 2021-11.08
********************

Expand Down
2 changes: 1 addition & 1 deletion kytos.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"username": "kytos",
"name": "flow_manager",
"description": "Manage switches' flows through a REST API.",
"version": "5.1.0",
"version": "5.2.0",
"napp_dependencies": ["kytos/of_core", "kytos/storehouse"],
"license": "MIT",
"url": "https://github.com/kytos/flow_manager.git",
Expand Down
104 changes: 92 additions & 12 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import itertools
from collections import OrderedDict, defaultdict
from copy import deepcopy
from threading import Lock
from threading import Lock, Thread

from flask import jsonify, request
from napps.kytos.flow_manager.match import match_flow
Expand All @@ -21,6 +21,8 @@

from .exceptions import InvalidCommandError
from .settings import (
ARCHIVED_MAX_FLOWS_PER_SWITCH,
ARCHIVED_ROTATION_DELETED,
CONSISTENCY_COOKIE_IGNORED_RANGE,
CONSISTENCY_TABLE_ID_IGNORED_RANGE,
ENABLE_CONSISTENCY_CHECK,
Expand Down Expand Up @@ -80,6 +82,14 @@ def _valid_consistency_ignored(consistency_ignored_list):
return True


def new_archive_flow(dict_flow, reason):
"""Build an archive flow given an stored dict flow."""
archive_flow = dict(dict_flow)
archive_flow["deleted_at"] = now().strftime("%Y-%m-%dT%H:%M:%S")
archive_flow["reason"] = reason
return archive_flow


class Main(KytosNApp):
"""Main class to be used by Kytos controller."""

Expand Down Expand Up @@ -108,6 +118,7 @@ def setup(self):
# {'flow_persistence': {'dpid_str': {cookie_val: [
# {'flow': {flow_dict}}]}}}
self.stored_flows = {}
self.archived_flows = {}
self.resent_flows = set()

def execute(self):
Expand All @@ -116,7 +127,20 @@ def execute(self):
The execute method is called by the run method of KytosNApp class.
Users shouldn't call this method directly.
"""
self._load_flows()
self._load_flow_boxes()

def _load_flow_boxes(self) -> None:
"""Load stored flow boxes."""
threads = []
for box_id, set_attr, data_key in (
("flows", "stored_flows", "flow_persistence"),
("flows_archived", "archived_flows", None),
):
thread = Thread(target=self._load_flows, args=(box_id, set_attr, data_key))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()

def shutdown(self):
"""Shutdown routine of the NApp."""
Expand Down Expand Up @@ -228,6 +252,7 @@ def check_storehouse_consistency(self, switch):
f"{self.stored_flows.get(dpid, {}).get(cookie, [])}"
)

archived_flows = []
for installed_flow in flows:
if self.is_ignored(installed_flow.table_id, self.tab_id_ignored_range):
continue
Expand All @@ -237,6 +262,13 @@ def check_storehouse_consistency(self, switch):
f"Consistency check: alien flow on switch {dpid}, dpid"
" not indexed"
)
archived_flows.append(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still a risk of losing this append here if _install_flows side effect below crashes, fixing #26 that's on our radar would mitigate this risk.

new_archive_flow(
{"flow": installed_flow.as_dict(include_id=False)},
reason="alien",
)
)

flow = {"flows": [installed_flow.as_dict()]}
command = "delete_strict"
self._install_flows(command, flow, [switch], save=False)
Expand All @@ -247,25 +279,40 @@ def check_storehouse_consistency(self, switch):

if installed_flow not in stored_flows_list:
log.info(f"Consistency check: alien flow on switch {dpid}")
archived_flows.append(
new_archive_flow(
{"flow": installed_flow.as_dict(include_id=False)},
reason="alien",
)
)

flow = {"flows": [installed_flow.as_dict()]}
command = "delete_strict"
self._install_flows(command, flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}"
)

if archived_flows:
with self._storehouse_lock:
self._add_archived_flows_store(switch.id, archived_flows)

# pylint: disable=attribute-defined-outside-init
def _load_flows(self):
"""Load stored flows."""
def _load_flows(
self, box_id="flows", set_attr="stored_flows", data_key="flow_persistence"
):
"""Load stored flows of a box in an attribute."""
try:
data = self.storehouse.get_data()["flow_persistence"]
data = self.storehouse.get_data(attr=self.storehouse.box_id_to_attr[box_id])
if data_key:
data = data[data_key]
if "id" in data:
del data["id"]
self.stored_flows = data
except (KeyError, FileNotFoundError) as error:
log.debug(f"There are no flows to load: {error}")
setattr(self, set_attr, data)
except (KeyError, FileNotFoundError):
log.info(f"There are no flows to load from {box_id}.")
else:
log.info("Flows loaded.")
log.info(f"Flows loaded from {box_id}.")

def _del_matched_flows_store(self, flow_dict, switch):
"""Try to delete matching stored flows given a flow dict."""
Expand All @@ -280,7 +327,7 @@ def _del_matched_flows_store(self, flow_dict, switch):
else [int(flow_dict.get("cookie", 0))]
)

has_deleted_any_flow = False
archived_flows = []
for cookie in cookies:
stored_flows = stored_flows_box[switch.id].get(cookie, [])
if not stored_flows:
Expand All @@ -292,6 +339,9 @@ def _del_matched_flows_store(self, flow_dict, switch):
# No strict match
if match_flow(flow_dict, version, stored_flow["flow"]):
deleted_flows_idxs.add(i)
archived_flows.append(
new_archive_flow(stored_flow, reason="delete")
)

if not deleted_flows_idxs:
continue
Expand All @@ -301,18 +351,42 @@ def _del_matched_flows_store(self, flow_dict, switch):
for i, flow in enumerate(stored_flows)
if i not in deleted_flows_idxs
]
has_deleted_any_flow = True

if stored_flows:
stored_flows_box[switch.id][cookie] = stored_flows
else:
stored_flows_box[switch.id].pop(cookie, None)

if has_deleted_any_flow:
if archived_flows:
stored_flows_box["id"] = "flow_persistence"
self.storehouse.save_flow(stored_flows_box)
del stored_flows_box["id"]
self.stored_flows = deepcopy(stored_flows_box)
self._add_archived_flows_store(switch.id, archived_flows)

def _add_archived_flows_store(
self,
dpid,
archived_flows,
max_len=ARCHIVED_MAX_FLOWS_PER_SWITCH,
offset_delete=ARCHIVED_ROTATION_DELETED,
):
"""Store archived flows."""
if not archived_flows:
return

if (
dpid in self.archived_flows
and len(self.archived_flows[dpid]) + len(archived_flows) > max_len
):
self.archived_flows[dpid] = self.archived_flows[dpid][offset_delete:]

for archived_flow in archived_flows:
if dpid not in self.archived_flows:
self.archived_flows[dpid] = [archived_flow]
else:
self.archived_flows[dpid].append(archived_flow)
self.storehouse.save_archived_flow(self.archived_flows)

def _add_flow_store(self, flow_dict, switch):
"""Try to add a flow dict in the store idempotently."""
Expand All @@ -325,6 +399,7 @@ def _add_flow_store(self, flow_dict, switch):
if switch.id not in stored_flows_box:
stored_flows_box[switch.id] = OrderedDict()

archived_flows = []
if not stored_flows_box[switch.id].get(cookie):
stored_flows_box[switch.id][cookie] = [installed_flow]
else:
Expand All @@ -338,6 +413,9 @@ def _add_flow_store(self, flow_dict, switch):
match_flow(flow_dict, version, stored_flows[i]["flow"]),
)
):
archived_flows.append(
new_archive_flow(stored_flows[i], reason="overlap")
)
stored_flows_box[switch.id][cookie][i] = installed_flow
break
else:
Expand All @@ -348,6 +426,8 @@ def _add_flow_store(self, flow_dict, switch):
del stored_flows_box["id"]
self.stored_flows = deepcopy(stored_flows_box)

self._add_archived_flows_store(switch.id, archived_flows)

def _store_changed_flows(self, command, flow_dict, switch):
"""Store changed flows.

Expand Down
5 changes: 5 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@
# To filter by a cookie or `table_id` range [(value1, value2)]
CONSISTENCY_COOKIE_IGNORED_RANGE = []
CONSISTENCY_TABLE_ID_IGNORED_RANGE = []

# Maximum number of flows to archive per switch
ARCHIVED_MAX_FLOWS_PER_SWITCH = 5000
Copy link
Member Author

@viniarck viniarck Nov 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I set a value higher than a couple of thousands, but we might need more info from expected volume that we want to keep track of archived flows, let me know if we need to support a higher value here, cc'ing @italovalcy @jab1982 @ajoaoff @rmotitsuki.

Copy link
Member Author

@viniarck viniarck Nov 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: We need to go with a reasonable size for this list that's good enough for production, and not allow it to grow too much, especially since we're still using the default FS backend with storehouse, having large objects on top of the high number of lock ops is something to minimize to ensure smooth IO operations. In the future, as we can afford to move to a more powerful and production grade DB, I think most of these concerns will be gone. With FS storage using pickle, I tested with 9000+ archived flows on OVS, and it behaved as expected:

❯ curl http://127.0.0.1:8181/api/kytos/storehouse/v1/kytos.flow.persistence/flows_archived | jq '.["00:00:00:00:00:00:00:01"]' | jq length
9020

The size of the binary pickled, didn't end up too large, but pickle with the default serialization isn't going to be production grade as other DB storages that have been optimized and designed for that:

❯ lar
total 704K
drwxr-xr-x 8 viniarck viniarck 4.0K Nov 10 15:31 ..
drwxr-xr-x 2 viniarck viniarck 4.0K Nov 10 15:59 .
-rw-r--r-- 1 viniarck viniarck 1.4K Nov 10 16:13 flows
-rw-r--r-- 1 viniarck viniarck 691K Nov 10 16:13 flows_archived

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@italovalcy @ajoaoff @jab1982 @rmotitsuki here's more data for us to make an informed decision. I've measured more execution times using the existing FS backend for how long it was taking to load the flows, and also for writing:

  • In this first case, with a linear topology with 3 OVS switches and just a single archived flow, pretty much there isn't significant difference between the load and write times for flows and flows_archived:
load_flows 'flows' exec in secs load_flows 'flows_archived' exec in secs flow saved in kytos.flow.persistence.flows in secs flow saved in kytos.flow.persistence.flows_archived in secs number of archived flows per switch number of switches
0.321988059 0.323313328 0.056381483 0.063623033 1 3
0.331015045 0.336439969 0.060050609 0.064152285 1 3
0.222171105 0.225473629 0.055098731 0.05798877 1 3
0.317800926 0.317802884 0.041970486 0.048492011 1 3
0.210971846 0.204924712 0.038846376 0.052048731 1 3
0.236901137 0.230581803 0.156860926 0.147859926 1 3
0.32118056 0.325453682 0.060899418 0.074994289 1 3
           
average in secs: average in secs: average in secs: average in secs:    
0.2802898111 0.280570001 0.06715828986 0.07273700643    
           
std deviation in secs: std deviation in secs: std deviation in secs: std deviation in secs:    
0.05393865596 0.05716338808 0.04048419031 0.03425065777    
           
variance in secs: variance in secs: variance in secs: variance in secs:    
0.002909378607 0.003267652936 0.001638969665 0.001173107557    
  • but, when we scale with a topology with 30 switches and 9100 archived flows per switch, notice that the overall load times increased by 4 times (1.242984775 / 0.3031526574), and IO writes are 22 times (3.514650555 / 0.1539360896) slower on average, taking up more than seconds, all of this happen asynchronously on storage house, but it's always saving the file entirely, overall it's functional, but it's a bit concerning to have these relatively slow writes, although the load times weren't too bad considering the existing load times:
load_flows 'flows' exec in secs load_flows 'flows_archived' exec in secs flow saved in kytos.flow.persistence.flows in secs flow saved in kytos.flow.persistence.flows_archived in secs number of archived flows per switch number of switches
0.343262868 1.36081191 0.380696204 2.396087289 9100 30
0.309977511 1.226346058 0.043746247 2.461283397 9100 30
0.306658912 1.243730335 0.060340325 2.230030912 9100 30
0.336292488 1.229078531 0.311950604 4.024685604 9100 30
0.306667977 1.217683696 0.097841618 4.552577523 9100 30
0.208896111 1.205937712 0.098094499 5.464421286 9100 30
0.310312735 1.217305181 0.08488313 3.473467872 9100 30
           
average in secs: average in secs: average in secs: average in secs:    
0.3031526574 1.242984775 0.1539360896 3.514650555    
           
std deviation in secs: std deviation in secs: std deviation in secs: std deviation in secs:    
0.04418497206 0.05326989636 0.1343651052 1.235026574    
           
variance in secs: variance in secs: variance in secs: variance in secs:    
0.001952311756 0.002837681858 0.0180539815 1.525290639    

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness sake, I've also run more stress test targeting the flows box storage as well, since the previous one was only stressing the flows_archived, as you can see, with 30+ switches, and 9000+ flows per switch, we also have writes that can take 8+ seconds on average, this shows that the current insertion on FS backend performs poorly, DB with optimized insertion is expected to perform better and also depending how we partition the collections let's say per dpid we also could gain some perf:

load_flows 'flows' exec in secs load_flows 'flows_archived' exec in secs flow saved in kytos.flow.persistence.flows in secs flow saved in kytos.flow.persistence.flows_archived in secs number of flows per switch number of switches
1.320028502 0.413320756 10.63344993 0.05605842 9100 30
1.420192211 0.318927119 9.613857726 0.04748472 9100 30
1.112718186 0.401718191 6.870411208 0.06791718 9100 30
1.335173917 0.412811008 7.292584671 0.07829201 9100 30
1.408119001 0.320302018 7.731722126 0.08179112 9100 30
           
average in secs: average in secs: average in secs: average in secs:    
1.319246363 0.3734158184 8.428405133 0.06630869    
           
std deviation in secs: std deviation in secs: std deviation in secs: std deviation in secs:    
0.1234874453 0.04933432745 1.617893756 0.01453704508    
           
variance in secs: variance in secs: variance in secs: variance in secs:    
0.01524914916 0.002433875865 2.617580205 0.0002113256797    

I think this would a great starting point to compare if without changing much the data models if on NoSQL it would be able to handle this expected production worst case scenario, and it might be worth extrapolating the number of flows a bit more since we also want to make sure the potential NoSQL storage we end up with have some room to still perform well and meet our perf requirements, otherwise we need to reassess other options.

# Number of old flows to delete whenever 'ARCHIVED_MAX_FLOWS_PER_SWITCH' overflows
ARCHIVED_ROTATION_DELETED = 1000
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
BASE_ENV = Path(os.environ.get("VIRTUAL_ENV", "/"))

NAPP_NAME = "flow_manager"
NAPP_VERSION = "5.1.0"
NAPP_VERSION = "5.2.0"

# Kytos var folder
VAR_PATH = BASE_ENV / "var" / "lib" / "kytos"
Expand Down
47 changes: 33 additions & 14 deletions storehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,39 +33,44 @@ def __init__(self, controller):

if "box" not in self.__dict__:
self.box = None
if "box_archived" not in self.__dict__:
self.box_archived = None
self.box_id_to_attr = {"flows": "box", "flows_archived": "box_archived"}

self.list_stored_boxes()

def get_data(self):
def get_data(self, attr="box"):
"""Return the persistence box data."""
# Wait retrieve or create box in storehouse
i = 0
while not self.box and i < BOX_RESTORE_ATTEMPTS:
while not getattr(self, attr) and i < BOX_RESTORE_ATTEMPTS:
time.sleep(self.box_restore_timer)
i += 1
if not self.box:
if not getattr(self, attr):
error = "Error retrieving persistence box from storehouse."
log.error(error)
raise FileNotFoundError(error)
return self.box.data
return getattr(getattr(self, attr), "data")

def create_box(self):
def create_box(self, box_id=None):
"""Create a persistence box to store administrative changes."""
content = {
"namespace": self.namespace,
"box_id": box_id,
"callback": self._create_box_callback,
"data": {},
}
event = KytosEvent(name="kytos.storehouse.create", content=content)
self.controller.buffers.app.put(event)

def _create_box_callback(self, _event, data, error):
def _create_box_callback(self, event, data, error):
"""Execute the callback to handle create_box."""
if error:
log.error(
f"Can't create persistence" f"box with namespace {self.namespace}"
)

self.box = data
setattr(self, self.box_id_to_attr[event.content["box_id"]], data)

def list_stored_boxes(self):
"""List all persistence box stored in storehouse."""
Expand All @@ -80,10 +85,11 @@ def list_stored_boxes(self):

def _get_or_create_a_box_from_list_of_boxes(self, _event, data, _error):
"""Create a persistence box or retrieve the stored box."""
if data:
self.get_stored_box(data[0])
else:
self.create_box()
for box_id in self.box_id_to_attr:
if box_id in data:
self.get_stored_box(box_id)
else:
self.create_box(box_id)

def get_stored_box(self, box_id):
"""Get persistence box from storehouse."""
Expand All @@ -97,12 +103,12 @@ def get_stored_box(self, box_id):
event = KytosEvent(name=name, content=content)
self.controller.buffers.app.put(event)

def _get_box_callback(self, _event, data, error):
def _get_box_callback(self, event, data, error):
"""Handle get_box method saving the box or logging with the error."""
if error:
log.error("Persistence box not found.")
log.error(f"Persistence box {event.content['box_id']} not found.")

self.box = data
setattr(self, self.box_id_to_attr[event.content["box_id"]], data)

def save_flow(self, flows):
"""Save flows in storehouse."""
Expand All @@ -123,3 +129,16 @@ def _save_flow_callback(self, _event, data, error):
log.error(f"Can't update persistence box {data.box_id}.")

log.info(f"Flow saved in {self.namespace}.{data.box_id}")

def save_archived_flow(self, stored_flows):
"""Save archived flows in storehouse."""
self.box_archived.data = stored_flows
content = {
"namespace": self.namespace,
"box_id": self.box_archived.box_id,
"data": self.box_archived.data,
"callback": self._save_flow_callback,
}

event = KytosEvent(name="kytos.storehouse.update", content=content)
self.controller.buffers.app.put(event)
Loading