Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added support for batching FlowMods #172

Closed
wants to merge 10 commits into from
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ file.
[UNRELEASED] - Under development
********************************

Added
=====
- Augmented ``GET v2/stored_flows`` ``cookie_range`` to accept an even list of ranges
- Augmented ``POST v2/flows`` to support batching flows, ``batch_size`` and ``batch_interval`` were introduced. Check out the OpenAPI documentation.
- Augmented ``kytos.flow_manager.flows.(install|delete)`` event to accept ``batch_size`` and ``batch_interval`` in the ``flow_dict`` payload to support batching.


[2023.1.0] - 2023-06-05
***********************
Expand Down
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Subscribed
- ``kytos/of_core.v0x04.messages.in.ofpt_barrier_reply``
- ``kytos/core.openflow.connection.error``
- ``.*.of_core.*.ofpt_error``
- ``kytos.flow_manager.send_flows``

Generated
---------
Expand Down
158 changes: 135 additions & 23 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,15 +564,35 @@ def handle_flows_install_delete(self, event):
log.error(f"{str(exc)} for flow_dict {flow_dict} ")
return

force = bool(event.content.get("force", False))
try:
force = bool(event.content.get("force", False))
batch_size = int(event.content.get("batch_size", 0))
batch_interval = int(event.content.get("batch_interval", 0))
except (ValueError, TypeError):
log.error(
"Invalid 'force', 'batch_size' or 'batch_interval' value/type. "
f"force: {event.content.get('force', False)}, "
f"batch_size: {event.content.get('batch_size', 0)}, "
f"batch_interval: {event.content.get('batch_interval', 0)}, "
)

switch = self.controller.get_switch_by_dpid(dpid)

flows_to_log_info(
f"Send FlowMod from KytosEvent dpid: {dpid}, command: {command}, "
f"force: {force}, ",
flow_dict,
f"force: {force}, batch_size: {batch_size}, "
f"batch_interval: {batch_interval},",
flow_dict.get("flows", []),
)
try:
self._install_flows(command, flow_dict, [switch], reraise_conn=not force)
self._install_flows(
command,
flow_dict,
[switch],
reraise_conn=not force,
batch_size=batch_size,
batch_interval=batch_interval,
)
except InvalidCommandError as error:
log.error(
"Error installing or deleting Flow through" f" Kytos Event: {error}"
Expand Down Expand Up @@ -631,11 +651,24 @@ def _send_flow_mods_from_request(
except ValueError as exc:
raise HTTPException(400, detail=str(exc))

force = bool(flows_dict.get("force", False))
try:
force = bool(flows_dict.get("force", False))
batch_size = int(flows_dict.get("batch_size", 0))
batch_interval = int(flows_dict.get("batch_interval", 0))
except (ValueError, TypeError):
raise HTTPException(
400,
detail="Invalid 'force', 'batch_size' or 'batch_interval' value/type. "
f"force: {flows_dict.get('force', False)}, "
f"batch_size: {flows_dict.get('batch_size', 0)}, "
f"batch_interval: {flows_dict.get('batch_interval', 0)}, ",
)

flows_to_log_info(
f"Send FlowMod from request dpid: {dpid}, command: {command}, "
f"force: {force}, ",
flows_dict,
f"force: {force}, batch_size: {batch_size}, "
f"batch_interval: {batch_interval},",
flows_dict.get("flows", []),
)
try:
if not dpid:
Expand All @@ -644,6 +677,8 @@ def _send_flow_mods_from_request(
flows_dict,
self._get_all_switches_enabled(),
reraise_conn=not force,
batch_size=batch_size,
batch_interval=batch_interval,
)
return JSONResponse(
{"response": "FlowMod Messages Sent"}, status_code=202
Expand All @@ -656,7 +691,14 @@ def _send_flow_mods_from_request(
if not switch.is_enabled() and command == "add":
raise HTTPException(404, detail=f"Switch {dpid} is disabled.")

self._install_flows(command, flows_dict, [switch], reraise_conn=not force)
self._install_flows(
command,
flows_dict,
[switch],
reraise_conn=not force,
batch_size=batch_size,
batch_interval=batch_interval,
)
return JSONResponse({"response": "FlowMod Messages Sent"}, status_code=202)

except SwitchNotConnectedError as error:
Expand All @@ -677,6 +719,8 @@ def _install_flows(
save=True,
reraise_conn=True,
send_barrier=ENABLE_BARRIER_REQUEST,
batch_size=0,
batch_interval=0,
):
"""Execute all procedures to bulk install flows in the switches.

Expand All @@ -687,6 +731,15 @@ def _install_flows(
save: A boolean to save flows in the database
reraise_conn: True to reraise switch connection errors
send_barrier: True to send barrier_request
batch_size: size of the batch of FlowMods, 0 to send all at once
batch_interval: interval to sleep in seconds per batch_size

batch_size and batch_interval parameters are for batching FlowMods, which
you'll want to set if you're sending a large number of flows. Just so
you can avoid overwhelming a switch with too many FlowMods at once.
If batch_size and batch_interval are positive values, they'll be used
accordingly batching at most batch_size per batch_interval in seconds.

"""
flow_mods, flows, flow_dicts = [], [], []
for switch in switches:
Expand Down Expand Up @@ -716,7 +769,31 @@ def _install_flows(
self.delete_matched_flows(
flow_dicts, {switch.id: switch for switch in switches}
)
self._send_flow_mods(switches, flow_mods, flows, reraise_conn, send_barrier)

# If there's no need to reraise conn errors, then exec in background
if not reraise_conn:
event = KytosEvent(
"kytos/flow_manager.send_flows",
content={
"switches": switches,
"flow_mods": flow_mods,
"flows": flows,
"send_barrier": send_barrier,
"batch_size": batch_size,
"batch_interval": batch_interval,
},
)
self.controller.buffers.app.put(event)
else:
self._send_flow_mods(
switches,
flow_mods,
flows,
reraise_conn=reraise_conn,
send_barrier=send_barrier,
batch_size=batch_size,
batch_interval=batch_interval,
)

def _send_flow_mods(
self,
Expand All @@ -725,24 +802,46 @@ def _send_flow_mods(
flows,
reraise_conn=True,
send_barrier=ENABLE_BARRIER_REQUEST,
batch_size=0,
batch_interval=0,
):
"""Send FlowMod (and BarrierRequest) given a list of flow_dicts to switches."""
flows_zipped = list((fmod, flow) for fmod, flow in zip(flow_mods, flows))
batch_size = (
min(batch_size, len(flows_zipped))
if batch_size > 0
else max(1, len(flows_zipped))
)
batch_interval = max(0, batch_interval)
for switch in switches:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing is stopping multiple requests from sending in flow_mods to the same switch, which could potentially overload the switch. Perhaps instead of performing batching here, we could use a per switch queue for the flow_mods received to rate limit.
Additionally, batch size doesn't make much sense as a per request parameter, but makes sense as a per switch parameter.

Copy link
Member Author

@viniarck viniarck Sep 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, @Ktmi that's a good idea. We might as well go ahead and start queueing per switch on flow_manager. Initially this was just adding the same client per request behavior.

It's worth also noting that we also have in our backlog to have per message type to support rate limiting on core queue but hasn't been prioritized kytos-ng/kytos#245

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ktmi check out this thread kytos-ng/kytos#245 (comment), I'll need your help with this one if we'll go for the ideal solution, appreciated your review, help, and suggestions. In the meantime, I'll keep making progress on telemetry_int. Otherwise, I can still handle it in the future. If you confirm that you can help out with that one on version 2023.2, of course following the current planned priorities, we can close this PR here. Let me know.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing this PR in favor of kytos-ng/kytos#412. David is helping out with it. Thanks, David.

for i, (flow_mod, flow) in enumerate(zip(flow_mods, flows)):
try:
self._send_flow_mod(switch, flow_mod)
if send_barrier and i == len(flow_mods) - 1:
self._send_barrier_request(switch, flow_mods)
except SwitchNotConnectedError:
if reraise_conn:
raise
with self._flow_mods_sent_lock:
self._add_flow_mod_sent(
flow_mod.header.xid,
flow,
build_command_from_flow_mod(flow_mod),
try:
k = 0
for i in range(0, len(flows_zipped), batch_size):
Ktmi marked this conversation as resolved.
Show resolved Hide resolved
if i > 0 and batch_interval > 0:
log.info(
f"Send FlowMods batching will sleep for {batch_interval} "
f"seconds before sending slice[{i}:{batch_size}]"
)
time.sleep(batch_interval)
flows_to_log_info(
f"Sending FlowMods slice[{i}: {i+batch_size}], iteration: {k},",
[flow.as_dict() for flow in flows[i : i + batch_size]],
)
self._send_napp_event(switch, flow, "pending")
for flow_mod, flow in flows_zipped[i : i + batch_size]:
self._send_flow_mod(switch, flow_mod)
with self._flow_mods_sent_lock:
self._add_flow_mod_sent(
flow_mod.header.xid,
flow,
build_command_from_flow_mod(flow_mod),
)
self._send_napp_event(switch, flow, "pending")
k += 1
if flow_mods and send_barrier:
self._send_barrier_request(switch, flow_mods)
except SwitchNotConnectedError:
if reraise_conn:
raise
return flow_mods

def _add_flow_mod_sent(self, xid, flow, command):
Expand Down Expand Up @@ -813,6 +912,19 @@ def _send_napp_event(self, switch, flow, command, **kwargs):
event_app = KytosEvent(name, content)
self.controller.buffers.app.put(event_app)

@listen_to("kytos.flow_manager.send_flows")
def on_send_flows(self, event: KytosEvent) -> None:
"""Send flow mods handler for interval background usage."""
self._send_flow_mods(
event.content["switches"],
event.content["flow_mods"],
event.content["flows"],
reraise_conn=False,
send_barrier=event.content.get("send_barrier", ENABLE_BARRIER_REQUEST),
batch_size=event.content.get("batch_size", 0),
batch_interval=event.content.get("batch_interval", 0),
)

@listen_to("kytos/core.openflow.connection.error")
def on_openflow_connection_error(self, event):
"""Listen to openflow connection error and publish the flow error."""
Expand Down
10 changes: 10 additions & 0 deletions openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,16 @@ components:
type: boolean
description: The force option is for ignoring switch connection errors, and delegating the flows to be automatically sent later on via consistency check.
default: false
batch_size:
type: integer
description: This is for batching, represents how many flows to send per iteration, every batch_interval seconds. If this value is zero it will send all at once. If you are sending many flows you probably want to use batching to potentially avoid overwhelming switches. Also, if the force option is true, then it will not block the request.
minimum: 0
default: 0
batch_interval:
type: integer
description: The interval to wait for in seconds when sending a batch of flows.
minimum: 0
default: 0
FlowDoc:
type: object
properties:
Expand Down
52 changes: 50 additions & 2 deletions tests/unit/test_main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Test Main methods."""
import asyncio
import math
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
from uuid import uuid4
Expand Down Expand Up @@ -499,7 +500,12 @@ def test_event_add_flow(self, mock_install_flows, mock_flows_log):
)
self.napp.handle_flows_install_delete(event)
mock_install_flows.assert_called_with(
"add", mock_flow_dict, [switch], reraise_conn=True
"add",
mock_flow_dict,
[switch],
reraise_conn=True,
batch_size=0,
batch_interval=0,
)
mock_flows_log.assert_called()

Expand Down Expand Up @@ -545,7 +551,12 @@ def test_event_flows_install_delete(self, mock_install_flows, mock_flows_log):
)
self.napp.handle_flows_install_delete(event)
mock_install_flows.assert_called_with(
"delete", mock_flow_dict, [switch], reraise_conn=True
"delete",
mock_flow_dict,
[switch],
reraise_conn=True,
batch_size=0,
batch_interval=0,
)
mock_flows_log.assert_called()

Expand Down Expand Up @@ -627,6 +638,43 @@ def test_send_flow_mod(self):

mock_buffers_put.assert_called()

def test_send_flow_mods(self, monkeypatch):
"""Test _send_flow_mods method."""
time, log, flows_log = MagicMock(), MagicMock(), MagicMock()
monkeypatch.setattr("napps.kytos.flow_manager.main.time", time)
monkeypatch.setattr("napps.kytos.flow_manager.main.log", log)
monkeypatch.setattr(
"napps.kytos.flow_manager.main.flows_to_log_info", flows_log
)
monkeypatch.setattr(
"napps.kytos.flow_manager.main.build_command_from_flow_mod", MagicMock()
)
mock_buffers_put = MagicMock()
mock_send_flow_mod = MagicMock()
self.napp.controller.buffers.msg_out.put = mock_buffers_put
self.napp._send_flow_mod = mock_send_flow_mod()
switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
n_flows = 5
flow_mods = [MagicMock() for _ in range(n_flows)]
flows = [MagicMock() for _ in range(n_flows)]
batch_size = 2
batch_interval = 1

self.napp._send_flow_mods(
[switch],
flow_mods,
flows,
batch_size=batch_size,
batch_interval=batch_interval,
)

mock_buffers_put.assert_called()
log.info.assert_called()
assert time.sleep.call_count == math.ceil(n_flows / batch_size) - 1
time.sleep.assert_called_with(batch_interval)
assert flows_log.call_count == math.ceil(n_flows / batch_size)
assert self.napp._send_flow_mod.call_count == len(flow_mods)

@patch("kytos.core.buffers.KytosEventBuffer.put")
def test_send_flow_mod_error(self, mock_buffers_put):
"""Test _send_flow_mod method error."""
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,6 @@ def test_get_min_wait_diff(self):
@patch("napps.kytos.flow_manager.utils.log")
def test_flows_to_log_info(self, mock_log):
"""Test flows_to_log_info"""
flows = {"flows": list(range(500))}
flows_to_log_info("", flows)
flows = list(range(500))
flows_to_log_info("", flows, maximum=200)
assert mock_log.info.call_count == 3
13 changes: 6 additions & 7 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,12 @@ def validate_cookies_del(flows: list[dict]) -> None:
)


def flows_to_log_info(message: str, flow_dict: dict[str, list]) -> None:
def flows_to_log_info(message: str, flows: list[dict], maximum=200) -> None:
"""Log flows, maximun flows in a log is 200"""
maximun = 200
flows_n = len(flow_dict["flows"])
i, j = 0, maximun
while flow_dict["flows"][i:j]:
flows_n = len(flows)
i, j = 0, maximum
while flows[i:j]:
log.info(
f"{message} flows[{i}, {(j if j < flows_n else flows_n)}]: {flow_dict['flows'][i:j]}"
f"{message} flows[{i}, {(j if j < flows_n else flows_n)}]: {flows[i:j]}"
)
i, j = i + maximun, j + maximun
i, j = i + maximum, j + maximum