Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Optimization in the consistency check to skip recent deleted flows #70

Merged
merged 7 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ Removed
Security
========

[2022.1.2] - 2022-02-07
viniarck marked this conversation as resolved.
Show resolved Hide resolved
***********************

Changed
=======
- Adapted consistency check to skip recent deleted flows
- Extracted ``is_recent_flow`` static method
- Changed ``_del_matched_flows_store`` to also archive flows
- Changed consistency check to also archive alien flows

Added
=====

- Added archived_flows and its lock to store in memory
- Added ``_add_Archived_flows`` method


[2022.1.1] - 2022-02-04
***********************

Expand Down
4 changes: 2 additions & 2 deletions kytos.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
"username": "kytos",
"name": "flow_manager",
"description": "Manage switches' flows through a REST API.",
"version": "2022.1.1",
"version": "2022.1.2",
"napp_dependencies": ["kytos/of_core", "kytos/storehouse"],
"license": "MIT",
"url": "https://github.com/kytos/flow_manager.git",
"tags": ["openflow", "manager", "flow", "rest", "work-in-progress"]
"tags": ["openflow", "manager", "flow", "rest"]
}
79 changes: 70 additions & 9 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from .barrier_request import new_barrier_request
from .exceptions import InvalidCommandError, SwitchNotConnectedError
from .settings import (
ARCHIVED_FLOWS_MAX_SIZE,
CONN_ERR_MAX_RETRIES,
CONN_ERR_MIN_WAIT,
CONN_ERR_MULTIPLIER,
Expand All @@ -41,6 +42,7 @@
_valid_consistency_ignored,
cast_fields,
get_min_wait_diff,
new_archive_flow_dict,
new_flow_dict,
)

Expand Down Expand Up @@ -87,11 +89,13 @@ def setup(self):
self._flow_mods_sent_error_lock = Lock()
self._flow_mods_retry_count = {}
self._flow_mods_retry_count_lock = Lock()
self._archived_flows_lock = Lock()

# Format of stored flow data:
# {'flow_persistence': {'dpid_str': {cookie_val: [
# {'flow': {flow_dict}}]}}}
self.stored_flows = {}
self.archived_flows = {}
self.resent_flows = set()

def execute(self):
Expand Down Expand Up @@ -381,6 +385,14 @@ def switch_flows_by_id(switch):
"""Build switch.flows indexed by id."""
return {flow.id: flow for flow in switch.flows}

@staticmethod
def is_recent_flow(stored_flow, created_attr="created_at", interval=STATS_INTERVAL):
"""Check if it's a recent flow based on its created attribute."""
stored_time = get_time(stored_flow.get(created_attr, "0001-01-01T00:00:00"))
if (now() - stored_time).seconds <= interval:
return True
return False

def check_switch_consistency(self, switch):
"""Check consistency of stored flows for a specific switch."""
dpid = switch.dpid
Expand All @@ -389,10 +401,7 @@ def check_switch_consistency(self, switch):

for cookie, stored_flows in self.stored_flows[dpid].items():
for stored_flow in stored_flows:
stored_time = get_time(
stored_flow.get("created_at", "0001-01-01T00:00:00")
)
if (now() - stored_time).seconds <= STATS_INTERVAL:
if self.is_recent_flow(stored_flow, "created_at"):
continue
stored_flow_obj = serializer.from_dict(stored_flow["flow"], switch)
if stored_flow_obj in installed_flows[cookie]:
Expand Down Expand Up @@ -432,6 +441,15 @@ def check_storehouse_consistency(self, switch):
for installed_flow in flows:
if self.is_ignored(installed_flow.table_id, self.tab_id_ignored_range):
continue
if (
switch.id in self.archived_flows
and installed_flow.id in self.archived_flows[switch.id]
and self.is_recent_flow(
self.archived_flows[switch.id][installed_flow.id],
"deleted_at",
)
):
continue

if dpid not in self.stored_flows:
log.info(
Expand All @@ -446,6 +464,16 @@ def check_storehouse_consistency(self, switch):
f"Flow forwarded to switch {dpid} to be deleted. "
f"Flow: {flow}"
)
self._add_archived_flows(
switch.id,
[
new_archive_flow_dict(
installed_flow.as_dict(include_id=False),
"alien",
installed_flow.id,
)
],
)
continue
except SwitchNotConnectedError:
log.error(
Expand All @@ -463,7 +491,16 @@ def check_storehouse_consistency(self, switch):
f"Flow forwarded to switch {dpid} to be deleted. "
f"Flow: {flow}"
)
continue
self._add_archived_flows(
switch.id,
[
new_archive_flow_dict(
installed_flow.as_dict(include_id=False),
"alien",
installed_flow.id,
)
],
)
except SwitchNotConnectedError:
log.error(
f"Failed to forward flow to switch {dpid} to be deleted. "
Expand Down Expand Up @@ -496,7 +533,7 @@ def _del_matched_flows_store(self, flow_dict, _flow_id, switch):
else [int(flow_dict.get("cookie", 0))]
)

has_deleted_any_flow = False
archived_flows = []
for cookie in cookies:
stored_flows = stored_flows_box[switch.id].get(cookie, [])
if not stored_flows:
Expand All @@ -508,6 +545,11 @@ def _del_matched_flows_store(self, flow_dict, _flow_id, switch):
# No strict match
if match_flow(flow_dict, version, stored_flow["flow"]):
deleted_flows_idxs.add(i)
archived_flows.append(
new_archive_flow_dict(
stored_flow["flow"], "delete", stored_flow.get("_id")
)
)

if not deleted_flows_idxs:
continue
Expand All @@ -517,14 +559,14 @@ def _del_matched_flows_store(self, flow_dict, _flow_id, switch):
for i, flow in enumerate(stored_flows)
if i not in deleted_flows_idxs
]
has_deleted_any_flow = True

if stored_flows:
stored_flows_box[switch.id][cookie] = stored_flows
else:
stored_flows_box[switch.id].pop(cookie, None)

if has_deleted_any_flow:
if archived_flows:
self._add_archived_flows(switch.id, archived_flows)
stored_flows_box["id"] = "flow_persistence"
self.storehouse.save_flow(stored_flows_box)
del stored_flows_box["id"]
Expand Down Expand Up @@ -564,6 +606,24 @@ def _add_flow_store(self, flow_dict, flow_id, switch):
del stored_flows_box["id"]
self.stored_flows = deepcopy(stored_flows_box)

def _add_archived_flows(
self,
dpid,
archived_flows,
max_len=ARCHIVED_FLOWS_MAX_SIZE,
):
"""Add archived flows in memory (will be stored in the DB in the future)."""
if not archived_flows:
return
with self._archived_flows_lock:
if dpid not in self.archived_flows:
self.archived_flows[dpid] = OrderedDict()
for archived_flow in archived_flows:
self.archived_flows[dpid][archived_flow["_id"]] = archived_flow

while len(self.archived_flows[dpid]) > max_len:
self.archived_flows[dpid].popitem(last=False)

def _update_flow_state_store(self, dpid, flow_ids, state):
"""Try to bulk update the state of some flow ids given a dpid."""
if not flow_ids:
Expand Down Expand Up @@ -959,5 +1019,6 @@ def handle_errors(self, event):
f"Deleting flow: {flow.as_dict()}, xid: {xid}, cookie: {flow.cookie}, "
f"error: {error_kwargs}"
)
self._del_stored_flow_by_id(switch.id, flow.cookie, flow.id)
with self._storehouse_lock:
self._del_stored_flow_by_id(switch.id, flow.cookie, flow.id)
self._send_napp_event(flow.switch, flow, "error", **error_kwargs)
3 changes: 3 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
CONSISTENCY_COOKIE_IGNORED_RANGE = []
CONSISTENCY_TABLE_ID_IGNORED_RANGE = []

# Maximum number of archived flows per switch
ARCHIVED_FLOWS_MAX_SIZE = 5000

# Retries options for `kytos/core.openflow.connection.error`
CONN_ERR_MAX_RETRIES = 3
CONN_ERR_MIN_WAIT = 1 # minimum wait between iterations in seconds
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ add-ignore = D105

[yala]
radon mi args = --min C
pylint args = --disable==too-many-arguments,too-many-locals,too-few-public-methods,too-many-instance-attributes,no-else-return,dangerous-default-value,duplicate-code,raise-missing-from,too-many-arguments,too-many-public-methods --ignored-modules=napps.kytos.topology
pylint args = --disable==too-many-arguments,too-many-locals,too-few-public-methods,too-many-instance-attributes,no-else-return,dangerous-default-value,duplicate-code,raise-missing-from,too-many-arguments,too-many-public-methods,too-many-lines --ignored-modules=napps.kytos.topology
viniarck marked this conversation as resolved.
Show resolved Hide resolved
linters=pylint,pycodestyle,isort,black

[flake8]
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
BASE_ENV = Path(os.environ.get("VIRTUAL_ENV", "/"))

NAPP_NAME = "flow_manager"
NAPP_VERSION = "2022.1.1"
NAPP_VERSION = "2022.1.2"

# Kytos var folder
VAR_PATH = BASE_ENV / "var" / "lib" / "kytos"
Expand Down
89 changes: 86 additions & 3 deletions tests/unit/test_main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Test Main methods."""
from datetime import timedelta
import threading
from unittest import TestCase
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -794,14 +795,15 @@ def test_check_switch_consistency_ignore(self, *args):
self.napp.check_switch_consistency(switch)
mock_install_flows.assert_not_called()

@patch("napps.kytos.flow_manager.main.Main._add_archived_flows")
@patch("napps.kytos.flow_manager.main.Main._install_flows")
@patch("napps.kytos.flow_manager.main.FlowFactory.get_class")
def test_check_storehouse_consistency(self, *args):
"""Test check_storehouse_consistency method.

This test checks the case when a flow is missing in storehouse.
"""
(mock_flow_factory, mock_install_flows) = args
(mock_flow_factory, mock_install_flows, mock_add_archived) = args
cookie_exception_interval = [(0x2B00000000000011, 0x2B000000000000FF)]
self.napp.cookie_exception_range = cookie_exception_interval
dpid = "00:00:00:00:00:00:00:01"
Expand All @@ -819,6 +821,7 @@ def test_check_storehouse_consistency(self, *args):
self.napp.stored_flows = {dpid: {0: stored_flows}}
self.napp.check_storehouse_consistency(switch)
mock_install_flows.assert_called()
mock_add_archived.assert_called()

@patch("napps.kytos.flow_manager.main.Main._install_flows")
@patch("napps.kytos.flow_manager.main.FlowFactory.get_class")
Expand Down Expand Up @@ -1181,11 +1184,12 @@ def test_no_strict_delete_of10(self, *args):
mock_save_flow.assert_called()
self.assertEqual(len(self.napp.stored_flows[dpid]), 0)

@patch("napps.kytos.flow_manager.main.Main._add_archived_flows")
@patch("napps.kytos.flow_manager.main.Main._install_flows")
@patch("napps.kytos.flow_manager.main.FlowFactory.get_class")
def test_consistency_cookie_ignored_range(self, *args):
"""Test the consistency `cookie` ignored range."""
(_, mock_install_flows) = args
(_, mock_install_flows, _) = args
dpid = "00:00:00:00:00:00:00:01"
switch = get_switch_mock(dpid, 0x04)
cookie_ignored_interval = [
Expand All @@ -1210,11 +1214,12 @@ def test_consistency_cookie_ignored_range(self, *args):
self.napp.check_storehouse_consistency(switch)
self.assertEqual(mock_install_flows.call_count, called)

@patch("napps.kytos.flow_manager.main.Main._add_archived_flows")
@patch("napps.kytos.flow_manager.main.Main._install_flows")
@patch("napps.kytos.flow_manager.main.FlowFactory.get_class")
def test_consistency_table_id_ignored_range(self, *args):
"""Test the consistency `table_id` ignored range."""
(_, mock_install_flows) = args
(_, mock_install_flows, _) = args
dpid = "00:00:00:00:00:00:00:01"
switch = get_switch_mock(dpid, 0x04)
table_id_ignored_interval = [(1, 2), 3]
Expand Down Expand Up @@ -1611,3 +1616,81 @@ def test_build_flow_mod_from_command(self):

with self.assertRaises(InvalidCommandError):
self.napp.build_flow_mod_from_command(mock, "invalid_command")

def test_is_recent_flow(self):
"""Test is_recent."""
stored_flow = {
"created_at": now().strftime("%Y-%m-%dT%H:%M:%S"),
"flow": {"flow_1": "data"},
}
assert self.napp.is_recent_flow(stored_flow)

past_dt = now() - timedelta(minutes=2)
stored_flow["created_at"] = past_dt.strftime("%Y-%m-%dT%H:%M:%S")
assert not self.napp.is_recent_flow(stored_flow)

stored_flow = {
"deleted_at": now().strftime("%Y-%m-%dT%H:%M:%S"),
"flow": {"flow_1": "data"},
}
assert self.napp.is_recent_flow(stored_flow, "deleted_at")

def test_del_matched_flows_store(self):
"""Test is_recent."""
dpid = "00:00:00:00:00:00:00:01"
switch = get_switch_mock(dpid, 0x04)
switch.id = dpid
flow_id = "1"
stored_flow = {"flow": {"flow_1": "data"}, "_id": flow_id}
self.napp.storehouse = MagicMock()
self.napp.stored_flows = {dpid: {0: [stored_flow]}}
assert not self.napp.archived_flows

self.napp._del_matched_flows_store(stored_flow, None, switch)

assert self.napp.archived_flows[dpid][flow_id]["reason"] == "delete"
self.assertDictEqual(
self.napp.archived_flows[dpid][flow_id]["flow"], stored_flow["flow"]
)

@patch("napps.kytos.flow_manager.main.Main._install_flows")
@patch("napps.kytos.flow_manager.main.FlowFactory.get_class")
def test_consistency_recent_deleted_flow(self, _, mock_install):
"""Test consistency check recent delete flow."""
flow = MagicMock()
flow.id = "1"
flow.cookie = 10
flow2 = MagicMock()
flow2.id = "2"
flow2.cookie = 20

dpid = "00:00:00:00:00:00:00:01"
switch = get_switch_mock(dpid, 0x04)
switch.id = dpid
switch.flows = [flow]
self.napp.stored_flows = {dpid: {flow2.cookie: [flow2]}}
assert not self.napp.archived_flows
self.napp.check_storehouse_consistency(switch)
assert len(self.napp.archived_flows[dpid]) == 1
assert self.napp.archived_flows[dpid][flow.id]["_id"] == flow.id
assert self.napp.archived_flows[dpid][flow.id]["reason"] == "alien"
assert mock_install.call_count == 1

# another second consistency run shouldn't delete again
self.napp.check_storehouse_consistency(switch)
assert mock_install.call_count == 1
assert len(self.napp.archived_flows[dpid]) == 1

def test_max_archived_flows(self):
"""Test max archived flows."""
max_len = 15
n_archived = 20
archived_flows = [{"_id": i} for i in range(n_archived)]

dpid = "00:00:00:00:00:00:00:01"
assert not self.napp.archived_flows
self.napp._add_archived_flows(dpid, archived_flows, max_len)
assert len(self.napp.archived_flows[dpid]) == max_len
keys = list(self.napp.archived_flows[dpid].keys())
assert self.napp.archived_flows[dpid][keys[-1]]["_id"] == n_archived - 1
assert self.napp.archived_flows[dpid][keys[0]]["_id"] == n_archived - max_len
Loading