From 62ab0ede08b33b1a63edb91d443ad43233e74fd6 Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Tue, 9 Nov 2021 12:56:50 -0300 Subject: [PATCH 01/20] Bootstraped storehouse box_flows_archive Set box to have a deterministic id --- storehouse.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/storehouse.py b/storehouse.py index 6f86ebad..6bc6cacd 100644 --- a/storehouse.py +++ b/storehouse.py @@ -33,6 +33,10 @@ def __init__(self, controller): if "box" not in self.__dict__: self.box = None + if "box_flows_archived" not in self.__dict__: + self.box_flows_archived = None + self.box_id_to_attr = {"flows": "box", "flows_archived": "box_flows_archived"} + self.list_stored_boxes() def get_data(self): @@ -48,24 +52,25 @@ def get_data(self): raise FileNotFoundError(error) return self.box.data - def create_box(self): + def create_box(self, box_id=None): """Create a persistence box to store administrative changes.""" content = { "namespace": self.namespace, + "box_id": box_id, "callback": self._create_box_callback, "data": {}, } event = KytosEvent(name="kytos.storehouse.create", content=content) self.controller.buffers.app.put(event) - def _create_box_callback(self, _event, data, error): + def _create_box_callback(self, event, data, error): """Execute the callback to handle create_box.""" if error: log.error( f"Can't create persistence" f"box with namespace {self.namespace}" ) - self.box = data + setattr(self, self.box_id_to_attr[event.content["box_id"]], data) def list_stored_boxes(self): """List all persistence box stored in storehouse.""" @@ -80,10 +85,11 @@ def list_stored_boxes(self): def _get_or_create_a_box_from_list_of_boxes(self, _event, data, _error): """Create a persistence box or retrieve the stored box.""" - if data: - self.get_stored_box(data[0]) - else: - self.create_box() + for box_id in self.box_id_to_attr: + if box_id in data: + self.get_stored_box(box_id) + else: + self.create_box(box_id) def get_stored_box(self, box_id): """Get persistence box from storehouse.""" @@ -97,12 +103,12 @@ def get_stored_box(self, box_id): event = KytosEvent(name=name, content=content) self.controller.buffers.app.put(event) - def _get_box_callback(self, _event, data, error): + def _get_box_callback(self, event, data, error): """Handle get_box method saving the box or logging with the error.""" if error: - log.error("Persistence box not found.") + log.error(f"Persistence box {event.content['box_id']} not found.") - self.box = data + setattr(self, self.box_id_to_attr[event.content["box_id"]], data) def save_flow(self, flows): """Save flows in storehouse.""" From 61ee96639d3a1ba65d20bd2a964528a7ace46f5c Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Tue, 9 Nov 2021 16:06:02 -0300 Subject: [PATCH 02/20] Added archived_flows dict Updated _load_flows to load from any box of flows --- main.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/main.py b/main.py index 1a2e98f2..e9b68aa6 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,7 @@ from collections import OrderedDict, defaultdict from copy import deepcopy from threading import Lock +from threading import Thread from flask import jsonify, request from napps.kytos.flow_manager.match import match_flow @@ -108,6 +109,7 @@ def setup(self): # {'flow_persistence': {'dpid_str': {cookie_val: [ # {'flow': {flow_dict}}]}}} self.stored_flows = {} + self.archived_flows = {} self.resent_flows = set() def execute(self): @@ -116,7 +118,16 @@ def execute(self): The execute method is called by the run method of KytosNApp class. Users shouldn't call this method directly. """ - self._load_flows() + threads = [] + for box_id, box_attr in ( + ("flows", "stored_flows"), + ("flows_archive", "archived_flows"), + ): + t = Thread(target=self._load_flows, args=(box_id, box_attr)) + threads.append(t) + t.start() + for t in threads: + t.join() def shutdown(self): """Shutdown routine of the NApp.""" @@ -255,17 +266,19 @@ def check_storehouse_consistency(self, switch): ) # pylint: disable=attribute-defined-outside-init - def _load_flows(self): - """Load stored flows.""" + def _load_flows( + self, box_id="flows", box_attr="stored_flows", data_key="flow_persistence" + ): + """Load stored flows of a box_id.""" try: - data = self.storehouse.get_data()["flow_persistence"] + data = self.storehouse.get_data()[data_key] if "id" in data: del data["id"] - self.stored_flows = data + setattr(self, box_attr, data) except (KeyError, FileNotFoundError) as error: - log.debug(f"There are no flows to load: {error}") + log.info(f"There are no flows to load from {error}.") else: - log.info("Flows loaded.") + log.info(f"Flows loaded from {box_attr}.") def _del_matched_flows_store(self, flow_dict, switch): """Try to delete matching stored flows given a flow dict.""" From 3f499843290f67d4ee50e59f8709265a0022c520 Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Tue, 9 Nov 2021 18:47:10 -0300 Subject: [PATCH 03/20] Bootstraped _load_flow_boxes Added _add_archived_flows_store --- main.py | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/main.py b/main.py index e9b68aa6..9d01cd70 100644 --- a/main.py +++ b/main.py @@ -118,12 +118,16 @@ def execute(self): The execute method is called by the run method of KytosNApp class. Users shouldn't call this method directly. """ + self._load_flow_boxes() + + def _load_flow_boxes(self) -> None: + """Load stored flow boxes.""" threads = [] - for box_id, box_attr in ( - ("flows", "stored_flows"), - ("flows_archive", "archived_flows"), + for box_id, box_attr, data_key in ( + ("flows", "stored_flows", "flow_persistence"), + ("flows_archived", "archived_flows", None), ): - t = Thread(target=self._load_flows, args=(box_id, box_attr)) + t = Thread(target=self._load_flows, args=(box_id, box_attr, data_key)) threads.append(t) t.start() for t in threads: @@ -271,12 +275,14 @@ def _load_flows( ): """Load stored flows of a box_id.""" try: - data = self.storehouse.get_data()[data_key] + data = self.storehouse.get_data() + if data_key: + data = data[data_key] if "id" in data: del data["id"] setattr(self, box_attr, data) - except (KeyError, FileNotFoundError) as error: - log.info(f"There are no flows to load from {error}.") + except (KeyError, FileNotFoundError): + log.info(f"There are no flows to load from {box_id}.") else: log.info(f"Flows loaded from {box_attr}.") @@ -293,7 +299,7 @@ def _del_matched_flows_store(self, flow_dict, switch): else [int(flow_dict.get("cookie", 0))] ) - has_deleted_any_flow = False + deleted_flows = [] for cookie in cookies: stored_flows = stored_flows_box[switch.id].get(cookie, []) if not stored_flows: @@ -305,6 +311,7 @@ def _del_matched_flows_store(self, flow_dict, switch): # No strict match if match_flow(flow_dict, version, stored_flow["flow"]): deleted_flows_idxs.add(i) + deleted_flows.append(stored_flow["flow"]) if not deleted_flows_idxs: continue @@ -314,18 +321,31 @@ def _del_matched_flows_store(self, flow_dict, switch): for i, flow in enumerate(stored_flows) if i not in deleted_flows_idxs ] - has_deleted_any_flow = True if stored_flows: stored_flows_box[switch.id][cookie] = stored_flows else: stored_flows_box[switch.id].pop(cookie, None) - if has_deleted_any_flow: + if deleted_flows: stored_flows_box["id"] = "flow_persistence" self.storehouse.save_flow(stored_flows_box) del stored_flows_box["id"] self.stored_flows = deepcopy(stored_flows_box) + self._add_archived_flows_store(switch.id, deleted_flows) + + def _add_archived_flows_store(self, dpid, archived_flows) -> None: + """Store archived flows.""" + for archived_flow in archived_flows: + cookie = archived_flow.get("cookie", 0) + if dpid not in self.archived_flows: + self.archived_flows[dpid] = {cookie: [archived_flow]} + else: + if cookie not in self.archived_flows[dpid]: + self.archived_flows[dpid][cookie] = [archived_flow] + else: + self.archived_flows[dpid][cookie].append(archived_flow) + self.storehouse.save_archive_flow(self.archived_flows) def _add_flow_store(self, flow_dict, switch): """Try to add a flow dict in the store idempotently.""" From 73ad10ada71e4fc25a471212a0b6059491eafa12 Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Tue, 9 Nov 2021 18:47:45 -0300 Subject: [PATCH 04/20] Added save_archive_flow Bootstraped box_archived on storehouse client --- storehouse.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/storehouse.py b/storehouse.py index 6bc6cacd..774d91b0 100644 --- a/storehouse.py +++ b/storehouse.py @@ -33,9 +33,9 @@ def __init__(self, controller): if "box" not in self.__dict__: self.box = None - if "box_flows_archived" not in self.__dict__: - self.box_flows_archived = None - self.box_id_to_attr = {"flows": "box", "flows_archived": "box_flows_archived"} + if "box_archived" not in self.__dict__: + self.box_archived = None + self.box_id_to_attr = {"flows": "box", "flows_archived": "box_archived"} self.list_stored_boxes() @@ -70,7 +70,7 @@ def _create_box_callback(self, event, data, error): f"Can't create persistence" f"box with namespace {self.namespace}" ) - setattr(self, self.box_id_to_attr[event.content["box_id"]], data) + setattr(self, self.box_id_to_attr[event.content["box_id"]], data) def list_stored_boxes(self): """List all persistence box stored in storehouse.""" @@ -108,7 +108,7 @@ def _get_box_callback(self, event, data, error): if error: log.error(f"Persistence box {event.content['box_id']} not found.") - setattr(self, self.box_id_to_attr[event.content["box_id"]], data) + setattr(self, self.box_id_to_attr[event.content["box_id"]], data) def save_flow(self, flows): """Save flows in storehouse.""" @@ -129,3 +129,16 @@ def _save_flow_callback(self, _event, data, error): log.error(f"Can't update persistence box {data.box_id}.") log.info(f"Flow saved in {self.namespace}.{data.box_id}") + + def save_archive_flow(self, stored_flows): + """Save archive flows in storehouse.""" + self.box_archived.data = stored_flows + content = { + "namespace": self.namespace, + "box_id": self.box_archived.box_id, + "data": self.box_archived.data, + "callback": self._save_flow_callback, + } + + event = KytosEvent(name="kytos.storehouse.update", content=content) + self.controller.buffers.app.put(event) From d8b5be0c0c02af273bd1f4b7496b8000e0d08d34 Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 10:07:09 -0300 Subject: [PATCH 05/20] Fixed _load_flows Parametrized get_data with attr --- main.py | 16 ++++++++-------- storehouse.py | 12 ++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/main.py b/main.py index 9d01cd70..1d4d5c90 100644 --- a/main.py +++ b/main.py @@ -123,11 +123,11 @@ def execute(self): def _load_flow_boxes(self) -> None: """Load stored flow boxes.""" threads = [] - for box_id, box_attr, data_key in ( + for box_id, set_attr, data_key in ( ("flows", "stored_flows", "flow_persistence"), ("flows_archived", "archived_flows", None), ): - t = Thread(target=self._load_flows, args=(box_id, box_attr, data_key)) + t = Thread(target=self._load_flows, args=(box_id, set_attr, data_key)) threads.append(t) t.start() for t in threads: @@ -271,20 +271,20 @@ def check_storehouse_consistency(self, switch): # pylint: disable=attribute-defined-outside-init def _load_flows( - self, box_id="flows", box_attr="stored_flows", data_key="flow_persistence" + self, box_id="flows", set_attr="stored_flows", data_key="flow_persistence" ): - """Load stored flows of a box_id.""" + """Load stored flows of a box in an attribute.""" try: - data = self.storehouse.get_data() + data = self.storehouse.get_data(attr=set_attr) if data_key: data = data[data_key] if "id" in data: del data["id"] - setattr(self, box_attr, data) + setattr(self, set_attr, data) except (KeyError, FileNotFoundError): log.info(f"There are no flows to load from {box_id}.") else: - log.info(f"Flows loaded from {box_attr}.") + log.info(f"Flows loaded from {box_id}.") def _del_matched_flows_store(self, flow_dict, switch): """Try to delete matching stored flows given a flow dict.""" @@ -345,7 +345,7 @@ def _add_archived_flows_store(self, dpid, archived_flows) -> None: self.archived_flows[dpid][cookie] = [archived_flow] else: self.archived_flows[dpid][cookie].append(archived_flow) - self.storehouse.save_archive_flow(self.archived_flows) + self.storehouse.save_archived_flow(self.archived_flows) def _add_flow_store(self, flow_dict, switch): """Try to add a flow dict in the store idempotently.""" diff --git a/storehouse.py b/storehouse.py index 774d91b0..ff3c77fe 100644 --- a/storehouse.py +++ b/storehouse.py @@ -39,18 +39,18 @@ def __init__(self, controller): self.list_stored_boxes() - def get_data(self): + def get_data(self, attr="box"): """Return the persistence box data.""" # Wait retrieve or create box in storehouse i = 0 - while not self.box and i < BOX_RESTORE_ATTEMPTS: + while not getattr(self, attr) and i < BOX_RESTORE_ATTEMPTS: time.sleep(self.box_restore_timer) i += 1 - if not self.box: + if not getattr(self, attr): error = "Error retrieving persistence box from storehouse." log.error(error) raise FileNotFoundError(error) - return self.box.data + return getattr(getattr(self, attr), "data") def create_box(self, box_id=None): """Create a persistence box to store administrative changes.""" @@ -130,8 +130,8 @@ def _save_flow_callback(self, _event, data, error): log.info(f"Flow saved in {self.namespace}.{data.box_id}") - def save_archive_flow(self, stored_flows): - """Save archive flows in storehouse.""" + def save_archived_flow(self, stored_flows): + """Save archived flows in storehouse.""" self.box_archived.data = stored_flows content = { "namespace": self.namespace, From 0331c582ebfb83d4220e0e2c3102d55f3d0ac935 Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 10:48:01 -0300 Subject: [PATCH 06/20] Augmented _add_archive_flows_store Updated deleted_flow --- main.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/main.py b/main.py index 1d4d5c90..27d9cc97 100644 --- a/main.py +++ b/main.py @@ -311,7 +311,11 @@ def _del_matched_flows_store(self, flow_dict, switch): # No strict match if match_flow(flow_dict, version, stored_flow["flow"]): deleted_flows_idxs.add(i) - deleted_flows.append(stored_flow["flow"]) + + deleted_flow = dict(stored_flow) + deleted_flow["deleted_at"] = now().strftime("%Y-%m-%dT%H:%M:%S") + deleted_flow["reason"] = "delete" + deleted_flows.append(deleted_flow) if not deleted_flows_idxs: continue @@ -337,14 +341,10 @@ def _del_matched_flows_store(self, flow_dict, switch): def _add_archived_flows_store(self, dpid, archived_flows) -> None: """Store archived flows.""" for archived_flow in archived_flows: - cookie = archived_flow.get("cookie", 0) if dpid not in self.archived_flows: - self.archived_flows[dpid] = {cookie: [archived_flow]} + self.archived_flows[dpid] = [archived_flow] else: - if cookie not in self.archived_flows[dpid]: - self.archived_flows[dpid][cookie] = [archived_flow] - else: - self.archived_flows[dpid][cookie].append(archived_flow) + self.archived_flows[dpid].append(archived_flow) self.storehouse.save_archived_flow(self.archived_flows) def _add_flow_store(self, flow_dict, switch): From f92c42f3b1c92b7b1fb4219f49b13ec171f2d62f Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 11:46:43 -0300 Subject: [PATCH 07/20] Added archived flows on settings Hooked overlapped flows to save on archive --- main.py | 28 ++++++++++++++++++++++++++-- settings.py | 5 +++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/main.py b/main.py index 27d9cc97..838a388d 100644 --- a/main.py +++ b/main.py @@ -11,7 +11,11 @@ from napps.kytos.flow_manager.match import match_flow from napps.kytos.flow_manager.storehouse import StoreHouse from napps.kytos.of_core.flow import FlowFactory -from napps.kytos.of_core.settings import STATS_INTERVAL +from napps.kytos.of_core.settings import ( + STATS_INTERVAL, + ARCHIVED_MAX_FLOWS_PER_SWITCH, + ARCHIVED_ROTATION_DELETED, +) from pyof.foundation.base import UBIntBase from pyof.v0x01.asynchronous.error_msg import BadActionCode from pyof.v0x01.common.phy_port import PortConfig @@ -338,8 +342,20 @@ def _del_matched_flows_store(self, flow_dict, switch): self.stored_flows = deepcopy(stored_flows_box) self._add_archived_flows_store(switch.id, deleted_flows) - def _add_archived_flows_store(self, dpid, archived_flows) -> None: + def _add_archived_flows_store( + self, + dpid, + archived_flows, + max_len=ARCHIVED_MAX_FLOWS_PER_SWITCH, + max_deleted=ARCHIVED_ROTATION_DELETED, + ) -> None: """Store archived flows.""" + if not archived_flows: + return + + if len(self.archived_flows[dpid]) + len(archived_flows) > max_len: + self.archived_flows[dpid] = self.archived_flows[max_deleted:] + for archived_flow in archived_flows: if dpid not in self.archived_flows: self.archived_flows[dpid] = [archived_flow] @@ -358,6 +374,7 @@ def _add_flow_store(self, flow_dict, switch): if switch.id not in stored_flows_box: stored_flows_box[switch.id] = OrderedDict() + overlapped_flows = [] if not stored_flows_box[switch.id].get(cookie): stored_flows_box[switch.id][cookie] = [installed_flow] else: @@ -371,6 +388,11 @@ def _add_flow_store(self, flow_dict, switch): match_flow(flow_dict, version, stored_flows[i]["flow"]), ) ): + overlapped_flow = dict(stored_flows[i]) + overlapped_flow["deleted_at"] = now().strftime("%Y-%m-%dT%H:%M:%S") + overlapped_flow["reason"] = "overlap" + overlapped_flows.append(overlapped_flow) + stored_flows_box[switch.id][cookie][i] = installed_flow break else: @@ -381,6 +403,8 @@ def _add_flow_store(self, flow_dict, switch): del stored_flows_box["id"] self.stored_flows = deepcopy(stored_flows_box) + self._add_archived_flows_store(switch.id, overlapped_flows) + def _store_changed_flows(self, command, flow_dict, switch): """Store changed flows. diff --git a/settings.py b/settings.py index 1a594282..58f1a0d6 100644 --- a/settings.py +++ b/settings.py @@ -11,3 +11,8 @@ # To filter by a cookie or `table_id` range [(value1, value2)] CONSISTENCY_COOKIE_IGNORED_RANGE = [] CONSISTENCY_TABLE_ID_IGNORED_RANGE = [] + +# Maximum number of flows to archive per switch +ARCHIVED_MAX_FLOWS_PER_SWITCH = 2000 +# Number of old flows to delete whenever 'ARCHIVED_MAX_FLOWS_PER_SWITCH' overflows +ARCHIVED_ROTATION_DELETED = 500 From e943ba68d74e1173b34b9decae87efa3ce6a102f Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 12:43:02 -0300 Subject: [PATCH 08/20] Fixed imports --- main.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/main.py b/main.py index 838a388d..2696a9c9 100644 --- a/main.py +++ b/main.py @@ -4,18 +4,13 @@ import itertools from collections import OrderedDict, defaultdict from copy import deepcopy -from threading import Lock -from threading import Thread +from threading import Lock, Thread from flask import jsonify, request from napps.kytos.flow_manager.match import match_flow from napps.kytos.flow_manager.storehouse import StoreHouse from napps.kytos.of_core.flow import FlowFactory -from napps.kytos.of_core.settings import ( - STATS_INTERVAL, - ARCHIVED_MAX_FLOWS_PER_SWITCH, - ARCHIVED_ROTATION_DELETED, -) +from napps.kytos.of_core.settings import STATS_INTERVAL from pyof.foundation.base import UBIntBase from pyof.v0x01.asynchronous.error_msg import BadActionCode from pyof.v0x01.common.phy_port import PortConfig @@ -26,6 +21,8 @@ from .exceptions import InvalidCommandError from .settings import ( + ARCHIVED_MAX_FLOWS_PER_SWITCH, + ARCHIVED_ROTATION_DELETED, CONSISTENCY_COOKIE_IGNORED_RANGE, CONSISTENCY_TABLE_ID_IGNORED_RANGE, ENABLE_CONSISTENCY_CHECK, From 3ebc3b89caf179112683b5bd1fdfabc796ee2cdd Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 12:56:58 -0300 Subject: [PATCH 09/20] Fixed arg passed to get_data Fixed archived_flows overflow check --- main.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/main.py b/main.py index 2696a9c9..6776c8a6 100644 --- a/main.py +++ b/main.py @@ -276,7 +276,7 @@ def _load_flows( ): """Load stored flows of a box in an attribute.""" try: - data = self.storehouse.get_data(attr=set_attr) + data = self.storehouse.get_data(attr=self.storehouse.box_id_to_attr[box_id]) if data_key: data = data[data_key] if "id" in data: @@ -350,8 +350,11 @@ def _add_archived_flows_store( if not archived_flows: return - if len(self.archived_flows[dpid]) + len(archived_flows) > max_len: - self.archived_flows[dpid] = self.archived_flows[max_deleted:] + if ( + dpid in self.archived_flows + and len(self.archived_flows[dpid]) + len(archived_flows) > max_len + ): + self.archived_flows[dpid] = self.archived_flows[dpid][max_deleted:] for archived_flow in archived_flows: if dpid not in self.archived_flows: From d4f8a224209f83eb2a90e5bff974cf7dc1da2983 Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 14:03:57 -0300 Subject: [PATCH 10/20] Fixed unit tests --- tests/unit/test_main.py | 25 +++++++++++++++++++------ tests/unit/test_storehouse.py | 6 +++--- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index 9b2aaae4..350dcdc5 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -408,10 +408,11 @@ def test_check_switch_consistency_add(self, *args): self.napp.check_switch_consistency(switch) mock_install_flows.assert_called() + @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_archived_flow") @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_flow") def test_add_overlapping_flow(self, *args): """Test add an overlapping flow.""" - (_,) = args + (mock_save_flow, mock_save_archived_flow) = args dpid = "00:00:00:00:00:00:00:01" switch = get_switch_mock(dpid, 0x04) switch.id = dpid @@ -449,6 +450,8 @@ def test_add_overlapping_flow(self, *args): self.napp._add_flow_store(flow_dict, switch) assert len(self.napp.stored_flows[dpid]) == 1 assert self.napp.stored_flows[dpid][0x20][0]["flow"]["actions"] == new_actions + mock_save_flow.assert_called() + mock_save_archived_flow.assert_called() @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_flow") def test_add_overlapping_flow_diff_priority(self, *args): @@ -682,13 +685,14 @@ def test_no_strict_delete_with_cookie(self, *args): @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") + @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_archived_flow") @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow") def test_no_strict_delete(self, *args): """Test the non-strict matching method. Test non-strict matching to delete a Flow using a cookie. """ - (mock_save_flow, _, _) = args + (mock_save_flow, mock_save_archived_flow, _, _) = args dpid = "00:00:00:00:00:00:00:01" switch = get_switch_mock(dpid, 0x04) switch.id = dpid @@ -719,17 +723,19 @@ def test_no_strict_delete(self, *args): self.napp._store_changed_flows(command, flow_to_install, switch) mock_save_flow.assert_called() + mock_save_archived_flow.assert_called() self.assertEqual(len(self.napp.stored_flows), 1) @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") + @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_archived_flow") @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow") def test_no_strict_delete_with_ipv4(self, *args): """Test the non-strict matching method. Test non-strict matching to delete a Flow using IPv4. """ - (mock_save_flow, _, _) = args + (mock_save_flow, mock_save_archived_flow, _, _) = args dpid = "00:00:00:00:00:00:00:01" switch = get_switch_mock(dpid, 0x04) switch.id = dpid @@ -763,6 +769,7 @@ def test_no_strict_delete_with_ipv4(self, *args): self.napp._store_changed_flows(command, flow_to_install, switch) mock_save_flow.assert_called() + mock_save_archived_flow.assert_called() expected_stored = { 4961162389751548787: [ { @@ -778,13 +785,14 @@ def test_no_strict_delete_with_ipv4(self, *args): @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") + @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_archived_flow") @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow") def test_no_strict_delete_in_port(self, *args): """Test the non-strict matching method. Test non-strict matching to delete a Flow matching in_port. """ - (mock_save_flow, _, _) = args + (mock_save_flow, mock_save_archived_flow, _, _) = args dpid = "00:00:00:00:00:00:00:01" switch = get_switch_mock(dpid, 0x04) switch.id = dpid @@ -826,6 +834,7 @@ def test_no_strict_delete_in_port(self, *args): self.napp._store_changed_flows(command, flow_to_install, switch) mock_save_flow.assert_called() + mock_save_archived_flow.assert_called() expected_stored = { 0: [ @@ -838,13 +847,14 @@ def test_no_strict_delete_in_port(self, *args): @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") + @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_archived_flow") @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow") def test_no_strict_delete_all_if_empty_match(self, *args): """Test the non-strict matching method. Test non-strict matching to delete all if empty match is given. """ - (mock_save_flow, _, _) = args + (mock_save_flow, mock_save_archived_flow, _, _) = args dpid = "00:00:00:00:00:00:00:01" switch = get_switch_mock(dpid, 0x04) switch.id = dpid @@ -878,6 +888,7 @@ def test_no_strict_delete_all_if_empty_match(self, *args): self.napp._store_changed_flows(command, flow_to_install, switch) mock_save_flow.assert_called() + mock_save_archived_flow.assert_called() expected_stored = {} self.assertDictEqual(self.napp.stored_flows[dpid], expected_stored) @@ -945,13 +956,14 @@ def test_no_strict_delete_with_ipv4_fail(self, *args): @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") + @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_archived_flow") @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow") def test_no_strict_delete_of10(self, *args): """Test the non-strict matching method. Test non-strict matching to delete a Flow using OF10. """ - (mock_save_flow, _, _) = args + (mock_save_flow, mock_save_archived_flow, _, _) = args dpid = "00:00:00:00:00:00:00:01" switch = get_switch_mock(dpid, 0x01) switch.id = dpid @@ -1008,6 +1020,7 @@ def test_no_strict_delete_of10(self, *args): self.napp._store_changed_flows(command, flow_to_install, switch) mock_save_flow.assert_called() + mock_save_archived_flow.assert_called() self.assertEqual(len(self.napp.stored_flows[dpid]), 0) @patch("napps.kytos.flow_manager.main.Main._install_flows") diff --git a/tests/unit/test_storehouse.py b/tests/unit/test_storehouse.py index e42fc6cb..ba924c5c 100644 --- a/tests/unit/test_storehouse.py +++ b/tests/unit/test_storehouse.py @@ -55,13 +55,13 @@ def test_get_or_create_a_box_from_list_of_boxes(self, *args): """Test create_box.""" (mock_create_box, mock_get_stored_box) = args mock_event = MagicMock() - mock_data = MagicMock() + data = {"flows": "flows"} mock_error = MagicMock() self.napp._get_or_create_a_box_from_list_of_boxes( - mock_event, mock_data, mock_error + mock_event, data, mock_error ) mock_get_stored_box.assert_called() - self.napp._get_or_create_a_box_from_list_of_boxes(mock_event, None, mock_error) + self.napp._get_or_create_a_box_from_list_of_boxes(mock_event, {}, mock_error) mock_create_box.assert_called() @patch("napps.kytos.flow_manager.storehouse.KytosEvent") From 21169a6ad04b1773b849bdb6350c61e69e9a1cdf Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 14:18:00 -0300 Subject: [PATCH 11/20] Aumented unit tests to cover archived_flows --- tests/unit/test_main.py | 45 ++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index 350dcdc5..b9051482 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -418,23 +418,18 @@ def test_add_overlapping_flow(self, *args): switch.id = dpid cookie = 0x20 - self.napp.stored_flows = { - dpid: { - cookie: [ - { - "flow": { - "priority": 10, - "cookie": 84114904, - "match": { - "ipv4_src": "192.168.1.1", - "ipv4_dst": "192.168.0.2", - }, - "actions": [{"action_type": "output", "port": 2}], - } - } - ] + stored_flow = { + "flow": { + "priority": 10, + "cookie": 84114904, + "match": { + "ipv4_src": "192.168.1.1", + "ipv4_dst": "192.168.0.2", + }, + "actions": [{"action_type": "output", "port": 2}], } } + self.napp.stored_flows = {dpid: {cookie: [stored_flow]}} new_actions = [{"action_type": "output", "port": 3}] flow_dict = { @@ -452,6 +447,10 @@ def test_add_overlapping_flow(self, *args): assert self.napp.stored_flows[dpid][0x20][0]["flow"]["actions"] == new_actions mock_save_flow.assert_called() mock_save_archived_flow.assert_called() + assert len(self.napp.archived_flows[dpid]) == 1 + self.assertDictEqual( + self.napp.archived_flows[dpid][0]["flow"], stored_flow["flow"] + ) @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_flow") def test_add_overlapping_flow_diff_priority(self, *args): @@ -724,7 +723,8 @@ def test_no_strict_delete(self, *args): self.napp._store_changed_flows(command, flow_to_install, switch) mock_save_flow.assert_called() mock_save_archived_flow.assert_called() - self.assertEqual(len(self.napp.stored_flows), 1) + self.assertEqual(len(self.napp.stored_flows[dpid]), 1) + self.assertEqual(len(self.napp.archived_flows[dpid]), 1) @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") @@ -782,6 +782,10 @@ def test_no_strict_delete_with_ipv4(self, *args): ] } self.assertDictEqual(self.napp.stored_flows[dpid], expected_stored) + self.assertDictEqual( + self.napp.archived_flows[dpid][0]["flow"], + stored_flows[84114904][0]["flow"], + ) @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") @@ -844,6 +848,14 @@ def test_no_strict_delete_in_port(self, *args): ] } self.assertDictEqual(self.napp.stored_flows[dpid], expected_stored) + self.assertEqual(len(self.napp.archived_flows[dpid]), 2) + + self.assertDictEqual( + self.napp.archived_flows[dpid][0]["flow"], stored_flow[0][0]["flow"] + ) + self.assertDictEqual( + self.napp.archived_flows[dpid][1]["flow"], stored_flow[0][2]["flow"] + ) @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") @@ -892,6 +904,7 @@ def test_no_strict_delete_all_if_empty_match(self, *args): expected_stored = {} self.assertDictEqual(self.napp.stored_flows[dpid], expected_stored) + self.assertEqual(len(self.napp.archived_flows[dpid]), 2) @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") From 8a11dff9053f1c49aaec692eaa7a0b14f9f6faed Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 14:30:24 -0300 Subject: [PATCH 12/20] Added test for archived flows rotation --- tests/unit/test_main.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index b9051482..d92d8736 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -1095,3 +1095,23 @@ def test_consistency_table_id_ignored_range(self, *args): self.napp.stored_flows = {dpid: {0: [flow]}} self.napp.check_storehouse_consistency(switch) self.assertEqual(mock_install_flows.call_count, called) + + @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_archived_flow") + def test_archived_flows_rotation(self, *args) -> None: + """Test archive flows rotation.""" + _ = args + + n_flows = 10 + max_len = 10 + extra_flows = 3 + offset_delete = 5 + dpid = "1" + + self.napp.archived_flows[dpid] = [{} for f in range(n_flows)] + deleted_flows = [{} for f in range(extra_flows)] + + self.napp._add_archived_flows_store(dpid, deleted_flows, max_len, offset_delete) + assert ( + len(self.napp.archived_flows[dpid]) + == (max_len - offset_delete) + extra_flows + ) From 62730ef369c5b4eb462c935e72f5ffbf7b065f77 Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 14:30:47 -0300 Subject: [PATCH 13/20] Refactored max_delete as offset_delete --- main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.py b/main.py index 6776c8a6..3346e4f6 100644 --- a/main.py +++ b/main.py @@ -344,7 +344,7 @@ def _add_archived_flows_store( dpid, archived_flows, max_len=ARCHIVED_MAX_FLOWS_PER_SWITCH, - max_deleted=ARCHIVED_ROTATION_DELETED, + offset_delete=ARCHIVED_ROTATION_DELETED, ) -> None: """Store archived flows.""" if not archived_flows: @@ -354,7 +354,7 @@ def _add_archived_flows_store( dpid in self.archived_flows and len(self.archived_flows[dpid]) + len(archived_flows) > max_len ): - self.archived_flows[dpid] = self.archived_flows[dpid][max_deleted:] + self.archived_flows[dpid] = self.archived_flows[dpid][offset_delete:] for archived_flow in archived_flows: if dpid not in self.archived_flows: From f6ea55c7a6fcfda9063410ecddf46433e2d199ec Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 14:32:47 -0300 Subject: [PATCH 14/20] Linter fixes --- main.py | 10 +++++----- tests/unit/test_storehouse.py | 4 +--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/main.py b/main.py index 3346e4f6..09f6c524 100644 --- a/main.py +++ b/main.py @@ -128,11 +128,11 @@ def _load_flow_boxes(self) -> None: ("flows", "stored_flows", "flow_persistence"), ("flows_archived", "archived_flows", None), ): - t = Thread(target=self._load_flows, args=(box_id, set_attr, data_key)) - threads.append(t) - t.start() - for t in threads: - t.join() + thread = Thread(target=self._load_flows, args=(box_id, set_attr, data_key)) + threads.append(thread) + thread.start() + for thread in threads: + thread.join() def shutdown(self): """Shutdown routine of the NApp.""" diff --git a/tests/unit/test_storehouse.py b/tests/unit/test_storehouse.py index ba924c5c..5c36cdf5 100644 --- a/tests/unit/test_storehouse.py +++ b/tests/unit/test_storehouse.py @@ -57,9 +57,7 @@ def test_get_or_create_a_box_from_list_of_boxes(self, *args): mock_event = MagicMock() data = {"flows": "flows"} mock_error = MagicMock() - self.napp._get_or_create_a_box_from_list_of_boxes( - mock_event, data, mock_error - ) + self.napp._get_or_create_a_box_from_list_of_boxes(mock_event, data, mock_error) mock_get_stored_box.assert_called() self.napp._get_or_create_a_box_from_list_of_boxes(mock_event, {}, mock_error) mock_create_box.assert_called() From 9139c81a3edb328331aabd7ec0a5cf3415bfbf62 Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 15:18:22 -0300 Subject: [PATCH 15/20] Augmented unit tests --- tests/unit/test_main.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index d92d8736..a0acd9d5 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -451,6 +451,7 @@ def test_add_overlapping_flow(self, *args): self.assertDictEqual( self.napp.archived_flows[dpid][0]["flow"], stored_flow["flow"] ) + assert self.napp.archived_flows[dpid][0]["reason"] == "overlap" @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_flow") def test_add_overlapping_flow_diff_priority(self, *args): @@ -625,6 +626,7 @@ def test_check_switch_consistency_ignore(self, *args): self.napp.check_switch_consistency(switch) mock_install_flows.assert_not_called() + @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_archived_flow") @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") def test_check_storehouse_consistency(self, *args): @@ -632,7 +634,7 @@ def test_check_storehouse_consistency(self, *args): This test checks the case when a flow is missing in storehouse. """ - (mock_flow_factory, mock_install_flows) = args + (mock_flow_factory, mock_install_flows, mock_save_archived_flow) = args cookie_exception_interval = [(0x2B00000000000011, 0x2B000000000000FF)] self.napp.cookie_exception_range = cookie_exception_interval dpid = "00:00:00:00:00:00:00:01" @@ -650,6 +652,7 @@ def test_check_storehouse_consistency(self, *args): self.napp.stored_flows = {dpid: {0: stored_flows}} self.napp.check_storehouse_consistency(switch) mock_install_flows.assert_called() + mock_save_archived_flow.assert_called() @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") @@ -725,6 +728,7 @@ def test_no_strict_delete(self, *args): mock_save_archived_flow.assert_called() self.assertEqual(len(self.napp.stored_flows[dpid]), 1) self.assertEqual(len(self.napp.archived_flows[dpid]), 1) + assert self.napp.archived_flows[dpid][0]["reason"] == "delete" @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") @@ -1036,11 +1040,12 @@ def test_no_strict_delete_of10(self, *args): mock_save_archived_flow.assert_called() self.assertEqual(len(self.napp.stored_flows[dpid]), 0) + @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_archived_flow") @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") def test_consistency_cookie_ignored_range(self, *args): """Test the consistency `cookie` ignored range.""" - (mock_flow_factory, mock_install_flows) = args + (mock_flow_factory, mock_install_flows, _) = args dpid = "00:00:00:00:00:00:00:01" switch = get_switch_mock(dpid, 0x04) cookie_ignored_interval = [ @@ -1068,11 +1073,12 @@ def test_consistency_cookie_ignored_range(self, *args): self.napp.check_storehouse_consistency(switch) self.assertEqual(mock_install_flows.call_count, called) + @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_archived_flow") @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") def test_consistency_table_id_ignored_range(self, *args): """Test the consistency `table_id` ignored range.""" - (mock_flow_factory, mock_install_flows) = args + (mock_flow_factory, mock_install_flows, _) = args dpid = "00:00:00:00:00:00:00:01" switch = get_switch_mock(dpid, 0x04) table_id_ignored_interval = [(1, 2), 3] @@ -1097,7 +1103,7 @@ def test_consistency_table_id_ignored_range(self, *args): self.assertEqual(mock_install_flows.call_count, called) @patch("napps.kytos.flow_manager.storehouse.StoreHouse.save_archived_flow") - def test_archived_flows_rotation(self, *args) -> None: + def test_archived_flows_rotation(self, *args): """Test archive flows rotation.""" _ = args From 2db1c976b68e566bfce05c9c39b63aedb4a8e96b Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 15:18:44 -0300 Subject: [PATCH 16/20] Extracted function new_archive_flow Refactored calling sites --- main.py | 47 +++++++++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/main.py b/main.py index 09f6c524..aaacdb36 100644 --- a/main.py +++ b/main.py @@ -82,6 +82,14 @@ def _valid_consistency_ignored(consistency_ignored_list): return True +def new_archive_flow(dict_flow, reason): + """Build an archive flow given an stored dict flow.""" + archive_flow = dict(dict_flow) + archive_flow["deleted_at"] = now().strftime("%Y-%m-%dT%H:%M:%S") + archive_flow["reason"] = reason + return archive_flow + + class Main(KytosNApp): """Main class to be used by Kytos controller.""" @@ -244,6 +252,7 @@ def check_storehouse_consistency(self, switch): f"{self.stored_flows.get(dpid, {}).get(cookie, [])}" ) + archived_flows = [] for installed_flow in flows: if self.is_ignored(installed_flow.table_id, self.tab_id_ignored_range): continue @@ -259,6 +268,9 @@ def check_storehouse_consistency(self, switch): log.info( f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}" ) + archived_flows.append( + new_archive_flow(installed_flow.as_dict(), reason="alien") + ) continue if installed_flow not in stored_flows_list: @@ -269,6 +281,13 @@ def check_storehouse_consistency(self, switch): log.info( f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}" ) + archived_flows.append( + new_archive_flow(installed_flow.as_dict(), reason="alien") + ) + + if archived_flows: + with self._storehouse_lock: + self._add_archived_flows_store(switch.id, archived_flows) # pylint: disable=attribute-defined-outside-init def _load_flows( @@ -300,7 +319,7 @@ def _del_matched_flows_store(self, flow_dict, switch): else [int(flow_dict.get("cookie", 0))] ) - deleted_flows = [] + archived_flows = [] for cookie in cookies: stored_flows = stored_flows_box[switch.id].get(cookie, []) if not stored_flows: @@ -312,11 +331,9 @@ def _del_matched_flows_store(self, flow_dict, switch): # No strict match if match_flow(flow_dict, version, stored_flow["flow"]): deleted_flows_idxs.add(i) - - deleted_flow = dict(stored_flow) - deleted_flow["deleted_at"] = now().strftime("%Y-%m-%dT%H:%M:%S") - deleted_flow["reason"] = "delete" - deleted_flows.append(deleted_flow) + archived_flows.append( + new_archive_flow(stored_flow, reason="delete") + ) if not deleted_flows_idxs: continue @@ -332,12 +349,12 @@ def _del_matched_flows_store(self, flow_dict, switch): else: stored_flows_box[switch.id].pop(cookie, None) - if deleted_flows: + if archived_flows: stored_flows_box["id"] = "flow_persistence" self.storehouse.save_flow(stored_flows_box) del stored_flows_box["id"] self.stored_flows = deepcopy(stored_flows_box) - self._add_archived_flows_store(switch.id, deleted_flows) + self._add_archived_flows_store(switch.id, archived_flows) def _add_archived_flows_store( self, @@ -345,7 +362,7 @@ def _add_archived_flows_store( archived_flows, max_len=ARCHIVED_MAX_FLOWS_PER_SWITCH, offset_delete=ARCHIVED_ROTATION_DELETED, - ) -> None: + ): """Store archived flows.""" if not archived_flows: return @@ -374,7 +391,7 @@ def _add_flow_store(self, flow_dict, switch): if switch.id not in stored_flows_box: stored_flows_box[switch.id] = OrderedDict() - overlapped_flows = [] + archived_flows = [] if not stored_flows_box[switch.id].get(cookie): stored_flows_box[switch.id][cookie] = [installed_flow] else: @@ -388,11 +405,9 @@ def _add_flow_store(self, flow_dict, switch): match_flow(flow_dict, version, stored_flows[i]["flow"]), ) ): - overlapped_flow = dict(stored_flows[i]) - overlapped_flow["deleted_at"] = now().strftime("%Y-%m-%dT%H:%M:%S") - overlapped_flow["reason"] = "overlap" - overlapped_flows.append(overlapped_flow) - + archived_flows.append( + new_archive_flow(stored_flows[i], reason="overlap") + ) stored_flows_box[switch.id][cookie][i] = installed_flow break else: @@ -403,7 +418,7 @@ def _add_flow_store(self, flow_dict, switch): del stored_flows_box["id"] self.stored_flows = deepcopy(stored_flows_box) - self._add_archived_flows_store(switch.id, overlapped_flows) + self._add_archived_flows_store(switch.id, archived_flows) def _store_changed_flows(self, command, flow_dict, switch): """Store changed flows. From 605169018825d914e9dbc925330eeab163113eee Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 15:51:01 -0300 Subject: [PATCH 17/20] Updated flow installed argument --- main.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/main.py b/main.py index aaacdb36..bb8e0961 100644 --- a/main.py +++ b/main.py @@ -269,7 +269,10 @@ def check_storehouse_consistency(self, switch): f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}" ) archived_flows.append( - new_archive_flow(installed_flow.as_dict(), reason="alien") + new_archive_flow( + {"flow": installed_flow.as_dict(include_id=False)}, + reason="alien", + ) ) continue @@ -282,7 +285,10 @@ def check_storehouse_consistency(self, switch): f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}" ) archived_flows.append( - new_archive_flow(installed_flow.as_dict(), reason="alien") + new_archive_flow( + {"flow": installed_flow.as_dict(include_id=False)}, + reason="alien", + ) ) if archived_flows: From 46ed324def8d41ff669045f2c62587ea6c4d5e39 Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 16:25:56 -0300 Subject: [PATCH 18/20] Updated settings.py --- settings.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/settings.py b/settings.py index 58f1a0d6..5e8bea75 100644 --- a/settings.py +++ b/settings.py @@ -13,6 +13,6 @@ CONSISTENCY_TABLE_ID_IGNORED_RANGE = [] # Maximum number of flows to archive per switch -ARCHIVED_MAX_FLOWS_PER_SWITCH = 2000 +ARCHIVED_MAX_FLOWS_PER_SWITCH = 5000 # Number of old flows to delete whenever 'ARCHIVED_MAX_FLOWS_PER_SWITCH' overflows -ARCHIVED_ROTATION_DELETED = 500 +ARCHIVED_ROTATION_DELETED = 1000 From 6b984a72fa17aa9e2c1b1af72517b40ed97f2f20 Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 16:36:18 -0300 Subject: [PATCH 19/20] Bumped version 5.2.0 --- CHANGELOG.rst | 13 +++++++++++++ kytos.json | 2 +- setup.py | 2 +- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1c52b44c..d36ae2a0 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -21,6 +21,19 @@ Removed Security ======== +[5.2.0] - 2021-11.10 +******************** + +Added +===== +- Added support for archiving overlapped and deleted flows, issue #33. +- Updated settings to set the maximum number of archived flows per switch. + +Changed +======= +- Updated ``stored_flows`` box to stop using a random generated box, now it's named ``flows``. This is a breaking changing, but facilitates moving forward to also find for it, since it was assuming it only had one box in this namespace, which isn't the case anymore since we have to introduce a box for archived flows. + + [5.1.0] - 2021-11.08 ******************** diff --git a/kytos.json b/kytos.json index ad0072ec..00846690 100644 --- a/kytos.json +++ b/kytos.json @@ -3,7 +3,7 @@ "username": "kytos", "name": "flow_manager", "description": "Manage switches' flows through a REST API.", - "version": "5.1.0", + "version": "5.2.0", "napp_dependencies": ["kytos/of_core", "kytos/storehouse"], "license": "MIT", "url": "https://github.com/kytos/flow_manager.git", diff --git a/setup.py b/setup.py index 2032e703..50879411 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ BASE_ENV = Path(os.environ.get("VIRTUAL_ENV", "/")) NAPP_NAME = "flow_manager" -NAPP_VERSION = "5.1.0" +NAPP_VERSION = "5.2.0" # Kytos var folder VAR_PATH = BASE_ENV / "var" / "lib" / "kytos" From d05354c1bd3baa30ce69a9208b688298773f81b9 Mon Sep 17 00:00:00 2001 From: Vinicius Arcanjo Date: Wed, 10 Nov 2021 17:06:15 -0300 Subject: [PATCH 20/20] Updated order of operations --- main.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/main.py b/main.py index bb8e0961..a36a1cd3 100644 --- a/main.py +++ b/main.py @@ -262,28 +262,23 @@ def check_storehouse_consistency(self, switch): f"Consistency check: alien flow on switch {dpid}, dpid" " not indexed" ) - flow = {"flows": [installed_flow.as_dict()]} - command = "delete_strict" - self._install_flows(command, flow, [switch], save=False) - log.info( - f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}" - ) archived_flows.append( new_archive_flow( {"flow": installed_flow.as_dict(include_id=False)}, reason="alien", ) ) - continue - if installed_flow not in stored_flows_list: - log.info(f"Consistency check: alien flow on switch {dpid}") flow = {"flows": [installed_flow.as_dict()]} command = "delete_strict" self._install_flows(command, flow, [switch], save=False) log.info( f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}" ) + continue + + if installed_flow not in stored_flows_list: + log.info(f"Consistency check: alien flow on switch {dpid}") archived_flows.append( new_archive_flow( {"flow": installed_flow.as_dict(include_id=False)}, @@ -291,6 +286,13 @@ def check_storehouse_consistency(self, switch): ) ) + flow = {"flows": [installed_flow.as_dict()]} + command = "delete_strict" + self._install_flows(command, flow, [switch], save=False) + log.info( + f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}" + ) + if archived_flows: with self._storehouse_lock: self._add_archived_flows_store(switch.id, archived_flows)