diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7cc4268b..821acf04 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -21,6 +21,23 @@ Removed Security ======== +[2022.1.2] - 2022-02-07 +*********************** + +Changed +======= +- Adapted consistency check to skip recent deleted flows +- Extracted ``is_recent_flow`` static method +- Changed ``_del_matched_flows_store`` to also archive flows +- Changed consistency check to also archive alien flows + +Added +===== + +- Added archived_flows and its lock to store in memory +- Added ``_add_Archived_flows`` method + + [2022.1.1] - 2022-02-04 *********************** diff --git a/kytos.json b/kytos.json index e9c28aab..c80e4ef5 100644 --- a/kytos.json +++ b/kytos.json @@ -3,9 +3,9 @@ "username": "kytos", "name": "flow_manager", "description": "Manage switches' flows through a REST API.", - "version": "2022.1.1", + "version": "2022.1.2", "napp_dependencies": ["kytos/of_core", "kytos/storehouse"], "license": "MIT", "url": "https://github.com/kytos/flow_manager.git", - "tags": ["openflow", "manager", "flow", "rest", "work-in-progress"] + "tags": ["openflow", "manager", "flow", "rest"] } diff --git a/main.py b/main.py index e0c46008..911fbd12 100644 --- a/main.py +++ b/main.py @@ -28,6 +28,7 @@ from .barrier_request import new_barrier_request from .exceptions import InvalidCommandError, SwitchNotConnectedError from .settings import ( + ARCHIVED_FLOWS_MAX_SIZE, CONN_ERR_MAX_RETRIES, CONN_ERR_MIN_WAIT, CONN_ERR_MULTIPLIER, @@ -41,6 +42,7 @@ _valid_consistency_ignored, cast_fields, get_min_wait_diff, + new_archive_flow_dict, new_flow_dict, ) @@ -87,11 +89,13 @@ def setup(self): self._flow_mods_sent_error_lock = Lock() self._flow_mods_retry_count = {} self._flow_mods_retry_count_lock = Lock() + self._archived_flows_lock = Lock() # Format of stored flow data: # {'flow_persistence': {'dpid_str': {cookie_val: [ # {'flow': {flow_dict}}]}}} self.stored_flows = {} + self.archived_flows = {} self.resent_flows = set() def execute(self): @@ -381,6 +385,14 @@ def switch_flows_by_id(switch): """Build switch.flows indexed by id.""" return {flow.id: flow for flow in switch.flows} + @staticmethod + def is_recent_flow(stored_flow, created_attr="created_at", interval=STATS_INTERVAL): + """Check if it's a recent flow based on its created attribute.""" + stored_time = get_time(stored_flow.get(created_attr, "0001-01-01T00:00:00")) + if (now() - stored_time).seconds <= interval: + return True + return False + def check_switch_consistency(self, switch): """Check consistency of stored flows for a specific switch.""" dpid = switch.dpid @@ -389,10 +401,7 @@ def check_switch_consistency(self, switch): for cookie, stored_flows in self.stored_flows[dpid].items(): for stored_flow in stored_flows: - stored_time = get_time( - stored_flow.get("created_at", "0001-01-01T00:00:00") - ) - if (now() - stored_time).seconds <= STATS_INTERVAL: + if self.is_recent_flow(stored_flow, "created_at"): continue stored_flow_obj = serializer.from_dict(stored_flow["flow"], switch) if stored_flow_obj in installed_flows[cookie]: @@ -432,6 +441,15 @@ def check_storehouse_consistency(self, switch): for installed_flow in flows: if self.is_ignored(installed_flow.table_id, self.tab_id_ignored_range): continue + if ( + switch.id in self.archived_flows + and installed_flow.id in self.archived_flows[switch.id] + and self.is_recent_flow( + self.archived_flows[switch.id][installed_flow.id], + "deleted_at", + ) + ): + continue if dpid not in self.stored_flows: log.info( @@ -446,6 +464,16 @@ def check_storehouse_consistency(self, switch): f"Flow forwarded to switch {dpid} to be deleted. " f"Flow: {flow}" ) + self._add_archived_flows( + switch.id, + [ + new_archive_flow_dict( + installed_flow.as_dict(include_id=False), + "alien", + installed_flow.id, + ) + ], + ) continue except SwitchNotConnectedError: log.error( @@ -463,7 +491,16 @@ def check_storehouse_consistency(self, switch): f"Flow forwarded to switch {dpid} to be deleted. " f"Flow: {flow}" ) - continue + self._add_archived_flows( + switch.id, + [ + new_archive_flow_dict( + installed_flow.as_dict(include_id=False), + "alien", + installed_flow.id, + ) + ], + ) except SwitchNotConnectedError: log.error( f"Failed to forward flow to switch {dpid} to be deleted. " @@ -496,7 +533,7 @@ def _del_matched_flows_store(self, flow_dict, _flow_id, switch): else [int(flow_dict.get("cookie", 0))] ) - has_deleted_any_flow = False + archived_flows = [] for cookie in cookies: stored_flows = stored_flows_box[switch.id].get(cookie, []) if not stored_flows: @@ -508,6 +545,11 @@ def _del_matched_flows_store(self, flow_dict, _flow_id, switch): # No strict match if match_flow(flow_dict, version, stored_flow["flow"]): deleted_flows_idxs.add(i) + archived_flows.append( + new_archive_flow_dict( + stored_flow["flow"], "delete", stored_flow.get("_id") + ) + ) if not deleted_flows_idxs: continue @@ -517,14 +559,14 @@ def _del_matched_flows_store(self, flow_dict, _flow_id, switch): for i, flow in enumerate(stored_flows) if i not in deleted_flows_idxs ] - has_deleted_any_flow = True if stored_flows: stored_flows_box[switch.id][cookie] = stored_flows else: stored_flows_box[switch.id].pop(cookie, None) - if has_deleted_any_flow: + if archived_flows: + self._add_archived_flows(switch.id, archived_flows) stored_flows_box["id"] = "flow_persistence" self.storehouse.save_flow(stored_flows_box) del stored_flows_box["id"] @@ -564,6 +606,24 @@ def _add_flow_store(self, flow_dict, flow_id, switch): del stored_flows_box["id"] self.stored_flows = deepcopy(stored_flows_box) + def _add_archived_flows( + self, + dpid, + archived_flows, + max_len=ARCHIVED_FLOWS_MAX_SIZE, + ): + """Add archived flows in memory (will be stored in the DB in the future).""" + if not archived_flows: + return + with self._archived_flows_lock: + if dpid not in self.archived_flows: + self.archived_flows[dpid] = OrderedDict() + for archived_flow in archived_flows: + self.archived_flows[dpid][archived_flow["_id"]] = archived_flow + + while len(self.archived_flows[dpid]) > max_len: + self.archived_flows[dpid].popitem(last=False) + def _update_flow_state_store(self, dpid, flow_ids, state): """Try to bulk update the state of some flow ids given a dpid.""" if not flow_ids: @@ -959,5 +1019,6 @@ def handle_errors(self, event): f"Deleting flow: {flow.as_dict()}, xid: {xid}, cookie: {flow.cookie}, " f"error: {error_kwargs}" ) - self._del_stored_flow_by_id(switch.id, flow.cookie, flow.id) + with self._storehouse_lock: + self._del_stored_flow_by_id(switch.id, flow.cookie, flow.id) self._send_napp_event(flow.switch, flow, "error", **error_kwargs) diff --git a/settings.py b/settings.py index a1c02141..6b904c39 100644 --- a/settings.py +++ b/settings.py @@ -13,6 +13,9 @@ CONSISTENCY_COOKIE_IGNORED_RANGE = [] CONSISTENCY_TABLE_ID_IGNORED_RANGE = [] +# Maximum number of archived flows per switch +ARCHIVED_FLOWS_MAX_SIZE = 5000 + # Retries options for `kytos/core.openflow.connection.error` CONN_ERR_MAX_RETRIES = 3 CONN_ERR_MIN_WAIT = 1 # minimum wait between iterations in seconds diff --git a/setup.cfg b/setup.cfg index 187dccfc..c6f7a847 100644 --- a/setup.cfg +++ b/setup.cfg @@ -6,7 +6,7 @@ add-ignore = D105 [yala] radon mi args = --min C -pylint args = --disable==too-many-arguments,too-many-locals,too-few-public-methods,too-many-instance-attributes,no-else-return,dangerous-default-value,duplicate-code,raise-missing-from,too-many-arguments,too-many-public-methods --ignored-modules=napps.kytos.topology +pylint args = --disable==too-many-arguments,too-many-locals,too-few-public-methods,too-many-instance-attributes,no-else-return,dangerous-default-value,duplicate-code,raise-missing-from,too-many-arguments,too-many-public-methods,too-many-lines --ignored-modules=napps.kytos.topology linters=pylint,pycodestyle,isort,black [flake8] diff --git a/setup.py b/setup.py index 0476d2f7..ac952472 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ BASE_ENV = Path(os.environ.get("VIRTUAL_ENV", "/")) NAPP_NAME = "flow_manager" -NAPP_VERSION = "2022.1.1" +NAPP_VERSION = "2022.1.2" # Kytos var folder VAR_PATH = BASE_ENV / "var" / "lib" / "kytos" diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index f05f4636..c1fad3f8 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -1,4 +1,5 @@ """Test Main methods.""" +from datetime import timedelta import threading from unittest import TestCase from unittest.mock import MagicMock, patch @@ -794,6 +795,7 @@ def test_check_switch_consistency_ignore(self, *args): self.napp.check_switch_consistency(switch) mock_install_flows.assert_not_called() + @patch("napps.kytos.flow_manager.main.Main._add_archived_flows") @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") def test_check_storehouse_consistency(self, *args): @@ -801,7 +803,7 @@ def test_check_storehouse_consistency(self, *args): This test checks the case when a flow is missing in storehouse. """ - (mock_flow_factory, mock_install_flows) = args + (mock_flow_factory, mock_install_flows, mock_add_archived) = args cookie_exception_interval = [(0x2B00000000000011, 0x2B000000000000FF)] self.napp.cookie_exception_range = cookie_exception_interval dpid = "00:00:00:00:00:00:00:01" @@ -819,6 +821,7 @@ def test_check_storehouse_consistency(self, *args): self.napp.stored_flows = {dpid: {0: stored_flows}} self.napp.check_storehouse_consistency(switch) mock_install_flows.assert_called() + mock_add_archived.assert_called() @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") @@ -1181,11 +1184,12 @@ def test_no_strict_delete_of10(self, *args): mock_save_flow.assert_called() self.assertEqual(len(self.napp.stored_flows[dpid]), 0) + @patch("napps.kytos.flow_manager.main.Main._add_archived_flows") @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") def test_consistency_cookie_ignored_range(self, *args): """Test the consistency `cookie` ignored range.""" - (_, mock_install_flows) = args + (_, mock_install_flows, _) = args dpid = "00:00:00:00:00:00:00:01" switch = get_switch_mock(dpid, 0x04) cookie_ignored_interval = [ @@ -1210,11 +1214,12 @@ def test_consistency_cookie_ignored_range(self, *args): self.napp.check_storehouse_consistency(switch) self.assertEqual(mock_install_flows.call_count, called) + @patch("napps.kytos.flow_manager.main.Main._add_archived_flows") @patch("napps.kytos.flow_manager.main.Main._install_flows") @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") def test_consistency_table_id_ignored_range(self, *args): """Test the consistency `table_id` ignored range.""" - (_, mock_install_flows) = args + (_, mock_install_flows, _) = args dpid = "00:00:00:00:00:00:00:01" switch = get_switch_mock(dpid, 0x04) table_id_ignored_interval = [(1, 2), 3] @@ -1611,3 +1616,81 @@ def test_build_flow_mod_from_command(self): with self.assertRaises(InvalidCommandError): self.napp.build_flow_mod_from_command(mock, "invalid_command") + + def test_is_recent_flow(self): + """Test is_recent.""" + stored_flow = { + "created_at": now().strftime("%Y-%m-%dT%H:%M:%S"), + "flow": {"flow_1": "data"}, + } + assert self.napp.is_recent_flow(stored_flow) + + past_dt = now() - timedelta(minutes=2) + stored_flow["created_at"] = past_dt.strftime("%Y-%m-%dT%H:%M:%S") + assert not self.napp.is_recent_flow(stored_flow) + + stored_flow = { + "deleted_at": now().strftime("%Y-%m-%dT%H:%M:%S"), + "flow": {"flow_1": "data"}, + } + assert self.napp.is_recent_flow(stored_flow, "deleted_at") + + def test_del_matched_flows_store(self): + """Test is_recent.""" + dpid = "00:00:00:00:00:00:00:01" + switch = get_switch_mock(dpid, 0x04) + switch.id = dpid + flow_id = "1" + stored_flow = {"flow": {"flow_1": "data"}, "_id": flow_id} + self.napp.storehouse = MagicMock() + self.napp.stored_flows = {dpid: {0: [stored_flow]}} + assert not self.napp.archived_flows + + self.napp._del_matched_flows_store(stored_flow, None, switch) + + assert self.napp.archived_flows[dpid][flow_id]["reason"] == "delete" + self.assertDictEqual( + self.napp.archived_flows[dpid][flow_id]["flow"], stored_flow["flow"] + ) + + @patch("napps.kytos.flow_manager.main.Main._install_flows") + @patch("napps.kytos.flow_manager.main.FlowFactory.get_class") + def test_consistency_recent_deleted_flow(self, _, mock_install): + """Test consistency check recent delete flow.""" + flow = MagicMock() + flow.id = "1" + flow.cookie = 10 + flow2 = MagicMock() + flow2.id = "2" + flow2.cookie = 20 + + dpid = "00:00:00:00:00:00:00:01" + switch = get_switch_mock(dpid, 0x04) + switch.id = dpid + switch.flows = [flow] + self.napp.stored_flows = {dpid: {flow2.cookie: [flow2]}} + assert not self.napp.archived_flows + self.napp.check_storehouse_consistency(switch) + assert len(self.napp.archived_flows[dpid]) == 1 + assert self.napp.archived_flows[dpid][flow.id]["_id"] == flow.id + assert self.napp.archived_flows[dpid][flow.id]["reason"] == "alien" + assert mock_install.call_count == 1 + + # another second consistency run shouldn't delete again + self.napp.check_storehouse_consistency(switch) + assert mock_install.call_count == 1 + assert len(self.napp.archived_flows[dpid]) == 1 + + def test_max_archived_flows(self): + """Test max archived flows.""" + max_len = 15 + n_archived = 20 + archived_flows = [{"_id": i} for i in range(n_archived)] + + dpid = "00:00:00:00:00:00:00:01" + assert not self.napp.archived_flows + self.napp._add_archived_flows(dpid, archived_flows, max_len) + assert len(self.napp.archived_flows[dpid]) == max_len + keys = list(self.napp.archived_flows[dpid].keys()) + assert self.napp.archived_flows[dpid][keys[-1]]["_id"] == n_archived - 1 + assert self.napp.archived_flows[dpid][keys[0]]["_id"] == n_archived - max_len diff --git a/utils.py b/utils.py index 7cf0d221..28d7a91b 100644 --- a/utils.py +++ b/utils.py @@ -6,6 +6,16 @@ from kytos.core.helpers import now +def new_archive_flow_dict(flow_dict, reason, _id=None): + """Build an archive flow given an stored dict flow.""" + archive_flow = {} + archive_flow["_id"] = _id + archive_flow["flow"] = flow_dict + archive_flow["deleted_at"] = now().strftime("%Y-%m-%dT%H:%M:%S") + archive_flow["reason"] = reason + return archive_flow + + def new_flow_dict(flow_dict, _id=None, state="pending"): """Create a new flow dict to be stored.""" flow = {}