Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Added support for force option when sending flow mods #50

Merged
merged 20 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ Removed
Security
========

[5.2.0] - 2021-11.17
********************

Added
=====
- Added support for ``force`` option when sending flow mods


[5.1.0] - 2021-11.08
********************

Expand Down
2 changes: 1 addition & 1 deletion kytos.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"username": "kytos",
"name": "flow_manager",
"description": "Manage switches' flows through a REST API.",
"version": "5.1.0",
"version": "5.2.0",
"napp_dependencies": ["kytos/of_core", "kytos/storehouse"],
"license": "MIT",
"url": "https://github.com/kytos/flow_manager.git",
Expand Down
161 changes: 81 additions & 80 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import itertools
from collections import OrderedDict, defaultdict
from copy import deepcopy
from enum import Enum
from threading import Lock

from flask import jsonify, request
from napps.kytos.flow_manager.match import match_flow, match_strict_flow
from napps.kytos.flow_manager.storehouse import StoreHouse
from napps.kytos.of_core.flow import FlowFactory
from napps.kytos.of_core.settings import STATS_INTERVAL
from pyof.foundation.base import UBIntBase
from pyof.v0x01.asynchronous.error_msg import BadActionCode
from pyof.v0x01.common.phy_port import PortConfig
from werkzeug.exceptions import (
Expand All @@ -31,58 +31,14 @@
ENABLE_CONSISTENCY_CHECK,
FLOWS_DICT_MAX_SIZE,
)
from .utils import _valid_consistency_ignored, cast_fields, new_flow_dict


def cast_fields(flow_dict):
"""Make casting the match fields from UBInt() to native int ."""
match = flow_dict["match"]
for field, value in match.items():
if isinstance(value, UBIntBase):
match[field] = int(value)
flow_dict["match"] = match
return flow_dict


def _validate_range(values):
"""Check that the range of flows ignored by the consistency is valid."""
if len(values) != 2:
msg = f"The tuple must have 2 items, not {len(values)}"
raise ValueError(msg)
first, second = values
if second < first:
msg = f"The first value is bigger than the second: {values}"
raise ValueError(msg)
if not isinstance(first, int) or not isinstance(second, int):
msg = f"Expected a tuple of integers, received {values}"
raise TypeError(msg)


def _valid_consistency_ignored(consistency_ignored_list):
"""Check the format of the list of ignored consistency flows.

Check that the list of ignored flows in the consistency check
is well formatted. Returns True, if the list is well
formatted, otherwise return False.
"""
msg = (
"The list of ignored flows in the consistency check"
"is not well formatted, it will be ignored: %s"
)
for consistency_ignored in consistency_ignored_list:
if isinstance(consistency_ignored, tuple):
try:
_validate_range(consistency_ignored)
except (TypeError, ValueError) as error:
log.warn(msg, error)
return False
elif not isinstance(consistency_ignored, (int, tuple)):
error_msg = (
"The elements must be of class int or tuple"
f" but they are: {type(consistency_ignored)}"
)
log.warn(msg, error_msg)
return False
return True
class FlowEntryState(Enum):
"""Enum for stored Flow Entry states."""

PENDING = "pending" # initial state, it has been stored, but not confirmed yet
INSTALLED = "installed" # final state, when the installtion has been confirmed


class Main(KytosNApp):
Expand Down Expand Up @@ -144,11 +100,16 @@ def resend_stored_flows(self, event):
log.debug(f"Flow already resent to the switch {dpid}")
return
if dpid in self.stored_flows:
has_connection_failed = False
for flow in self.stored_flows_list(dpid):
flows_dict = {"flows": [flow["flow"]]}
self._install_flows("add", flows_dict, [switch])
self.resent_flows.add(dpid)
log.info(f"Flows resent to Switch {dpid}")
try:
self._install_flows("add", flows_dict, [switch])
except SwitchNotConnectedError:
has_connection_failed = True
if not has_connection_failed:
self.resent_flows.add(dpid)
log.info(f"Flows resent to Switch {dpid}")

@staticmethod
def is_ignored(field, ignored_range):
Expand Down Expand Up @@ -210,10 +171,16 @@ def check_switch_consistency(self, switch):

log.info(f"Consistency check: missing flow on switch {dpid}.")
flow = {"flows": [stored_flow["flow"]]}
self._install_flows("add", flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be installed. Flow: {flow}"
)
try:
self._install_flows("add", flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be installed. Flow: {flow}"
)
except SwitchNotConnectedError:
log.error(
f"Failed to forward flow to switch {dpid} to be installed. "
f"Flow: {flow}"
)

def check_storehouse_consistency(self, switch):
"""Check consistency of installed flows for a specific switch."""
Expand Down Expand Up @@ -244,20 +211,35 @@ def check_storehouse_consistency(self, switch):
)
flow = {"flows": [installed_flow.as_dict()]}
command = "delete_strict"
self._install_flows(command, flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}"
)
continue
try:
self._install_flows(command, flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be deleted. "
f"Flow: {flow}"
)
continue
except SwitchNotConnectedError:
log.error(
f"Failed to forward flow to switch {dpid} to be deleted. "
f"Flow: {flow}"
)

if installed_flow not in stored_flows_list:
log.info(f"Consistency check: alien flow on switch {dpid}")
flow = {"flows": [installed_flow.as_dict()]}
command = "delete_strict"
self._install_flows(command, flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be deleted. Flow: {flow}"
)
try:
self._install_flows(command, flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be deleted. "
f"Flow: {flow}"
)
continue
except SwitchNotConnectedError:
log.error(
f"Failed to forward flow to switch {dpid} to be deleted. "
f"Flow: {flow}"
)

# pylint: disable=attribute-defined-outside-init
def _load_flows(self):
Expand Down Expand Up @@ -321,9 +303,7 @@ def _del_matched_flows_store(self, flow_dict, switch):

def _add_flow_store(self, flow_dict, switch):
"""Try to add a flow dict in the store idempotently."""
installed_flow = {}
installed_flow["flow"] = flow_dict
installed_flow["created_at"] = now().strftime("%Y-%m-%dT%H:%M:%S")
installed_flow = new_flow_dict(flow_dict, state=FlowEntryState.PENDING.value)

stored_flows_box = deepcopy(self.stored_flows)
cookie = int(flow_dict.get("cookie", 0))
Expand Down Expand Up @@ -394,6 +374,7 @@ def list(self, dpid=None):

return jsonify(switch_flows)

# pylint: disable=fixme
@listen_to("kytos.flow_manager.flows.(install|delete)")
def event_flows_install_delete(self, event):
"""Install or delete flows in the switches through events.
Expand All @@ -415,13 +396,17 @@ def event_flows_install_delete(self, event):
msg = f'Invalid event "{event.name}", should be install|delete'
raise ValueError(msg)

force = bool(event.content.get("force", False))
switch = self.controller.get_switch_by_dpid(dpid)
try:
self._install_flows(command, flow_dict, [switch])
self._install_flows(command, flow_dict, [switch], reraise_conn=not force)
except InvalidCommandError as error:
log.error(
"Error installing or deleting Flow through" f" Kytos Event: {error}"
)
except SwitchNotConnectedError:
# TODO handle event error, issue 2
pass

@rest("v2/flows", methods=["POST"])
@rest("v2/flows/<dpid>", methods=["POST"])
Expand Down Expand Up @@ -471,14 +456,18 @@ def _send_flow_mods_from_request(self, dpid, command, flows_dict=None):
result = "The request body is not well-formed."
raise BadRequest(result)

force = bool(flows_dict.get("force", False))
log.info(
f"Send FlowMod from request dpid: {dpid} command: {command}"
f" flows_dict: {flows_dict}"
f"Send FlowMod from request dpid: {dpid}, command: {command}, "
f"force: {force}, flows_dict: {flows_dict}"
)
try:
if not dpid:
self._install_flows(
command, flows_dict, self._get_all_switches_enabled()
command,
flows_dict,
self._get_all_switches_enabled(),
reraise_conn=not force,
)
return jsonify({"response": "FlowMod Messages Sent"}), 202

Expand All @@ -489,20 +478,24 @@ def _send_flow_mods_from_request(self, dpid, command, flows_dict=None):
if not switch.is_enabled() and command == "add":
raise NotFound("switch is disabled.")

self._install_flows(command, flows_dict, [switch])
self._install_flows(command, flows_dict, [switch], reraise_conn=not force)
return jsonify({"response": "FlowMod Messages Sent"}), 202

except SwitchNotConnectedError as error:
raise FailedDependency(str(error))

def _install_flows(self, command, flows_dict, switches=[], save=True):
# pylint: disable=fixme
def _install_flows(
self, command, flows_dict, switches=[], save=True, reraise_conn=True
):
"""Execute all procedures to install flows in the switches.

Args:
command: Flow command to be installed
flows_dict: Dictionary with flows to be installed in the switches.
switches: A list of switches
save: A boolean to save flows in the storehouse (True) or not
reraise_conn: True to reraise switch connection errors
"""
for switch in switches:
serializer = FlowFactory.get_class(switch)
Expand All @@ -517,13 +510,21 @@ def _install_flows(self, command, flows_dict, switches=[], save=True):
flow_mod = flow.as_of_add_flow_mod()
else:
raise InvalidCommandError
self._send_flow_mod(flow.switch, flow_mod)

try:
self._send_flow_mod(flow.switch, flow_mod)
except SwitchNotConnectedError:
if reraise_conn:
raise
self._add_flow_mod_sent(flow_mod.header.xid, flow, command)

# TODO issue 2, only call this when the reply is confirmed
self._send_napp_event(switch, flow, command)
if save:
with self._storehouse_lock:
self._store_changed_flows(command, flow_dict, switch)

if not save:
continue
with self._storehouse_lock:
self._store_changed_flows(command, flow_dict, switch)

def _add_flow_mod_sent(self, xid, flow, command):
"""Add the flow mod to the list of flow mods sent."""
Expand Down
39 changes: 15 additions & 24 deletions openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ paths:
content:
application/json:
schema:
type: object
properties:
flows:
type: array
items:
$ref: '#/components/schemas/Flow'
$ref: '#/components/schemas/FlowsBody'
responses:
'202':
description: FlowMod messages sent.
Expand All @@ -61,12 +56,7 @@ paths:
content:
application/json:
schema:
type: object
properties:
flows:
type: array
items:
$ref: '#/components/schemas/Flow'
$ref: '#/components/schemas/FlowsBody'
responses:
'202':
description: FlowMod messages sent.
Expand Down Expand Up @@ -103,12 +93,7 @@ paths:
content:
application/json:
schema:
type: object
properties:
flows:
type: array
items:
$ref: '#/components/schemas/Flow'
$ref: '#/components/schemas/FlowsBody'
parameters:
- name: dpid
in: path
Expand All @@ -133,12 +118,7 @@ paths:
content:
application/json:
schema:
type: object
properties:
flows:
type: array
items:
$ref: '#/components/schemas/Flow'
$ref: '#/components/schemas/FlowsBody'
parameters:
- name: dpid
in: path
Expand Down Expand Up @@ -301,3 +281,14 @@ components:
type: array
items:
$ref: '#/components/schemas/Flow'
FlowsBody:
type: object
properties:
flows:
type: array
items:
$ref: '#/components/schemas/Flow'
force:
type: boolean
description: The force option is for ignoring switch connection errors, and delegating the flows to be automatically sent later on via consistency check.
default: false
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ add-ignore = D105

[yala]
radon mi args = --min C
pylint args = --disable==too-many-arguments,too-many-locals,too-few-public-methods,too-many-instance-attributes,no-else-return,dangerous-default-value,duplicate-code,raise-missing-from --ignored-modules=napps.kytos.topology
pylint args = --disable==too-many-arguments,too-many-locals,too-few-public-methods,too-many-instance-attributes,no-else-return,dangerous-default-value,duplicate-code,raise-missing-from,too-many-arguments --ignored-modules=napps.kytos.topology

[flake8]
max-line-length = 88
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
BASE_ENV = Path(os.environ.get("VIRTUAL_ENV", "/"))

NAPP_NAME = "flow_manager"
NAPP_VERSION = "5.1.0"
NAPP_VERSION = "5.2.0"

# Kytos var folder
VAR_PATH = BASE_ENV / "var" / "lib" / "kytos"
Expand Down
Loading