Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Control of link up events by interruptions #136

Merged
merged 14 commits into from
Jun 22, 2023
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ All notable changes to the ``topology`` project will be documented in this file.
[UNRELEASED] - Under development
********************************

Added
=====
- Info on status and status_reason to UI for Switches and Interfaces
- Listener for service interruptions through ``topology.interruption.(start|end)``

Fixed
=====
- Rejected unordered late preempted interface events to avoid state inconsistencies
Expand Down
94 changes: 84 additions & 10 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def setup(self):
self.link_status_hook_link_up_timer)
self.topo_controller.bootstrap_indexes()
self.load_topology()
self.link_up = set()

@staticmethod
def get_topo_controller() -> TopoController:
Expand Down Expand Up @@ -142,6 +143,7 @@ def _load_link(self, link_att):

if link_att['enabled']:
link.enable()
self.link_up.add(link.id)
viniarck marked this conversation as resolved.
Show resolved Hide resolved
else:
link.disable()

Expand Down Expand Up @@ -1033,17 +1035,33 @@ def notify_interface_link_status(self, interface, reason):

def notify_link_status_change(self, link, reason='not given'):
"""Send an event to notify about a status change on a link."""
name = 'kytos/topology.'
if link.status == EntityStatus.UP:
status = 'link_up'
link_id = link.id
if (
(not link.status_reason or link.status == EntityStatus.UP)
viniarck marked this conversation as resolved.
Show resolved Hide resolved
and link_id not in self.link_up
):
self.link_up.add(link_id)
event = KytosEvent(
name='kytos/topology.link_up',
content={
'link': link,
'reason': reason
},
)
elif (
(link.status_reason or link.status != EntityStatus.UP)
and link_id in self.link_up
):
self.link_up.remove(link_id)
event = KytosEvent(
name='kytos/topology.link_down',
content={
'link': link,
'reason': reason
},
)
else:
status = 'link_down'
event = KytosEvent(
name=name+status,
content={
'link': link,
'reason': reason
})
return
self.controller.buffers.app.put(event)

def notify_metadata_changes(self, obj, action):
Expand Down Expand Up @@ -1152,3 +1170,59 @@ def handle_link_maintenance_end(self, event):
link.endpoint_a.enable()
link.endpoint_b.enable()
self.notify_link_status_change(link, reason='maintenance')

@listen_to('topology.interruption.start')
def on_interruption_start(self, event: KytosEvent):
"""Deals with the start of service interruption."""
with self._links_lock:
self.handle_interruption_start(event)

def handle_interruption_start(self, event: KytosEvent):
"""Deals with the start of service interruption."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ktmi considering the point that @italovalcy has also brought to the attention on yesterday's meeting, maybe on both handle_interruption_start and handle_interruption_end it would good points to centralize log.info entries, after all, we're also using log.info for port status changes, so here it'd reasonable too. Another benefit of centralizing here on topology is that it could simplify certain logs for interruption sources. Wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ktmi let me know about this one here, if you agree and also if it'll be part of this PR or a subsequent one.

interrupt_type = event.content['type']
# switches = event.content.get('switches', [])
# interfaces = event.content.get('interfaces', [])
links = event.content.get('links', [])
# for switch_id in switches:
# pass
# for interface_id in interfaces:
# pass
for link_id in links:
link = self.links.get(link_id)
if link is None:
log.error(
"Invalid link id '%s' for interruption of type '%s;",
link_id,
interrupt_type
)
else:
self.notify_link_status_change(link, interrupt_type)
self.notify_topology_update()

@listen_to('topology.interruption.end')
def on_interruption_end(self, event: KytosEvent):
"""Deals with the end of service interruption."""
with self._links_lock:
self.handle_interruption_end(event)

def handle_interruption_end(self, event: KytosEvent):
"""Deals with the end of service interruption."""
interrupt_type = event.content['type']
# switches = event.content.get('switches', [])
# interfaces = event.content.get('interfaces', [])
links = event.content.get('links', [])
# for switch_id in switches:
# pass
# for interface_id in interfaces:
# pass
for link_id in links:
link = self.links.get(link_id)
if link is None:
log.error(
"Invalid link id '%s' for interruption of type '%s;",
link_id,
interrupt_type
)
else:
self.notify_link_status_change(link, interrupt_type)
self.notify_topology_update()
48 changes: 26 additions & 22 deletions tests/integration/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,32 @@ def test_get_switches_dict(self):
def test_get_event_listeners(self):
"""Verify all event listeners registered."""
actual_events = self.napp.listeners()
expected_events = ['kytos/core.shutdown',
'kytos/core.shutdown.kytos/topology',
'kytos/maintenance.start_link',
'kytos/maintenance.end_link',
'kytos/maintenance.start_switch',
'kytos/maintenance.end_switch',
'kytos/.*.link_available_tags',
'kytos/.*.liveness.(up|down)',
'kytos/.*.liveness.disabled',
'kytos/topology.get',
'.*.topo_controller.upsert_switch',
'.*.of_lldp.network_status.updated',
'.*.interface.is.nni',
'.*.connection.lost',
'.*.switch.interfaces.created',
'.*.topology.switch.interface.created',
'.*.switch.interface.deleted',
'.*.switch.interface.link_down',
'.*.switch.interface.link_up',
'.*.switch.(new|reconnected)',
'.*.switch.port.created',
'kytos/topology.notify_link_up_if_status']
expected_events = [
'kytos/core.shutdown',
'kytos/core.shutdown.kytos/topology',
'kytos/maintenance.start_link',
'kytos/maintenance.end_link',
'kytos/maintenance.start_switch',
'kytos/maintenance.end_switch',
'kytos/.*.link_available_tags',
'kytos/.*.liveness.(up|down)',
'kytos/.*.liveness.disabled',
'kytos/topology.get',
'.*.topo_controller.upsert_switch',
'.*.of_lldp.network_status.updated',
'.*.interface.is.nni',
'.*.connection.lost',
'.*.switch.interfaces.created',
'.*.topology.switch.interface.created',
'.*.switch.interface.deleted',
'.*.switch.interface.link_down',
'.*.switch.interface.link_up',
'.*.switch.(new|reconnected)',
'.*.switch.port.created',
'kytos/topology.notify_link_up_if_status',
'topology.interruption.start',
'topology.interruption.end',
]
assert sorted(expected_events) == sorted(actual_events)

async def test_get_interfaces(self):
Expand Down
165 changes: 138 additions & 27 deletions tests/unit/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest
import time
from datetime import timedelta
from unittest.mock import MagicMock, create_autospec, patch
from unittest.mock import MagicMock, create_autospec, patch, call

from kytos.core.common import EntityStatus
from kytos.core.helpers import now
Expand Down Expand Up @@ -62,28 +62,32 @@ def setup_method(self):

def test_get_event_listeners(self):
"""Verify all event listeners registered."""
expected_events = ['kytos/core.shutdown',
'kytos/core.shutdown.kytos/topology',
'kytos/maintenance.start_link',
'kytos/maintenance.end_link',
'kytos/maintenance.start_switch',
'kytos/maintenance.end_switch',
'kytos/.*.link_available_tags',
'.*.topo_controller.upsert_switch',
'.*.of_lldp.network_status.updated',
'.*.interface.is.nni',
'.*.connection.lost',
'.*.switch.interfaces.created',
'.*.topology.switch.interface.created',
'.*.switch.interface.deleted',
'.*.switch.interface.link_down',
'.*.switch.interface.link_up',
'.*.switch.(new|reconnected)',
'kytos/.*.liveness.(up|down)',
'kytos/.*.liveness.disabled',
'kytos/topology.get',
'.*.switch.port.created',
'kytos/topology.notify_link_up_if_status']
expected_events = [
'kytos/core.shutdown',
'kytos/core.shutdown.kytos/topology',
'kytos/maintenance.start_link',
'kytos/maintenance.end_link',
'kytos/maintenance.start_switch',
'kytos/maintenance.end_switch',
'kytos/.*.link_available_tags',
'.*.topo_controller.upsert_switch',
'.*.of_lldp.network_status.updated',
'.*.interface.is.nni',
'.*.connection.lost',
'.*.switch.interfaces.created',
'.*.topology.switch.interface.created',
'.*.switch.interface.deleted',
'.*.switch.interface.link_down',
'.*.switch.interface.link_up',
'.*.switch.(new|reconnected)',
'kytos/.*.liveness.(up|down)',
'kytos/.*.liveness.disabled',
'kytos/topology.get',
'.*.switch.port.created',
'kytos/topology.notify_link_up_if_status',
'topology.interruption.start',
'topology.interruption.end',
]
actual_events = self.napp.listeners()
assert sorted(expected_events) == sorted(actual_events)

Expand Down Expand Up @@ -1343,8 +1347,10 @@ def test_add_links(self, *args):
mock_event = MagicMock()
mock_intf_a = MagicMock()
mock_intf_b = MagicMock()
mock_event.content = {"interface_a": mock_intf_a,
"interface_b": mock_intf_b}
mock_event.content = {
"interface_a": mock_intf_a,
"interface_b": mock_intf_b
}
self.napp.add_links(mock_event)
mock_link.extend_metadata.assert_called()
mock_get_link_or_create.assert_called()
Expand Down Expand Up @@ -1393,8 +1399,33 @@ def test_notify_link_status_change(self):
mock_buffers_put = MagicMock()
self.napp.controller.buffers.app.put = mock_buffers_put
mock_link = create_autospec(Link)
self.napp.notify_link_status_change(mock_link)
mock_buffers_put.assert_called()
mock_link.id = 'test_link'
mock_link.status_reason = frozenset()
mock_link.status = EntityStatus.UP

# Check when switching to up
self.napp.notify_link_status_change(mock_link, 'test')
assert mock_buffers_put.call_count == 1
args, _ = mock_buffers_put.call_args
event = args[0]
assert event.content['link'] is mock_link
assert event.content['reason'] == 'test'
assert event.name == 'kytos/topology.link_up'

# Check result when no change
self.napp.notify_link_status_change(mock_link, 'test2')
assert mock_buffers_put.call_count == 1

# Check when switching to down
mock_link.status_reason = frozenset({'disabled'})
mock_link.status = EntityStatus.DOWN
self.napp.notify_link_status_change(mock_link, 'test3')
assert mock_buffers_put.call_count == 2
args, _ = mock_buffers_put.call_args
event = args[0]
assert event.content['link'] is mock_link
assert event.content['reason'] == 'test3'
assert event.name == 'kytos/topology.link_down'

def test_notify_metadata_changes(self):
"""Test notify metadata changes."""
Expand Down Expand Up @@ -1632,3 +1663,83 @@ def test_notify_interface_link_status(self, *args):
self.napp.notify_interface_link_status(MagicMock(), "link enabled")
assert mock_get_link_from_interface.call_count == 3
assert self.napp.controller.buffers.app.put.call_count == 1

@patch('napps.kytos.topology.main.Main.notify_topology_update')
@patch('napps.kytos.topology.main.Main.notify_link_status_change')
def test_interruption_start(
self,
mock_notify_link_status_change,
mock_notify_topology_update
):
"""Tests processing of received interruption start events."""
link_a = MagicMock()
link_b = MagicMock()
link_c = MagicMock()
self.napp.links = {
'link_a': link_a,
'link_b': link_b,
'link_c': link_c,
}
event = KytosEvent(
"topology.interruption.start",
{
'type': 'test_interruption',
'switches': [
],
'interfaces': [
],
'links': [
'link_a',
'link_c',
],
}
)
self.napp.handle_interruption_start(event)
mock_notify_link_status_change.assert_has_calls(
[
call(link_a, 'test_interruption'),
call(link_c, 'test_interruption'),
]
)
assert mock_notify_link_status_change.call_count == 2
mock_notify_topology_update.assert_called_once()

@patch('napps.kytos.topology.main.Main.notify_topology_update')
@patch('napps.kytos.topology.main.Main.notify_link_status_change')
def test_interruption_end(
self,
mock_notify_link_status_change,
mock_notify_topology_update
):
"""Tests processing of received interruption end events."""
link_a = MagicMock()
link_b = MagicMock()
link_c = MagicMock()
self.napp.links = {
'link_a': link_a,
'link_b': link_b,
'link_c': link_c,
}
event = KytosEvent(
"topology.interruption.start",
{
'type': 'test_interruption',
'switches': [
],
'interfaces': [
],
'links': [
'link_a',
'link_c',
],
}
)
self.napp.handle_interruption_end(event)
mock_notify_link_status_change.assert_has_calls(
[
call(link_a, 'test_interruption'),
call(link_c, 'test_interruption'),
]
)
assert mock_notify_link_status_change.call_count == 2
mock_notify_topology_update.assert_called_once()
5 changes: 4 additions & 1 deletion ui/k-info-panel/link_info.kytos
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@
'active': '',
'id': '',
'endpoint_a': '',
'endpoint_b': '',},
'endpoint_b': '',
'status': '',
viniarck marked this conversation as resolved.
Show resolved Hide resolved
'status_reason': '',
},
}
},
methods: {
Expand Down
Loading