forked from kytos/of_stats
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstats.py
203 lines (162 loc) · 7.96 KB
/
stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""Module with Classes to handle statistics."""
import time
from abc import ABCMeta, abstractmethod
import pyof.v0x01.controller2switch.common as v0x01
# pylint: disable=C0411,C0412
from pyof.v0x01.common.phy_port import Port
from pyof.v0x01.controller2switch.common import AggregateStatsRequest
from pyof.v0x01.controller2switch.stats_request import StatsRequest, StatsType
from pyof.v0x04.controller2switch import multipart_request as v0x04
from pyof.v0x04.controller2switch.common import MultipartType
from pyof.v0x04.controller2switch.multipart_request import MultipartRequest
from kytos.core import KytosEvent, log
# v0x01 and v0x04 PortStats are version independent
from napps.kytos.of_core.flow import FlowFactory
from napps.kytos.of_core.flow import PortStats as OFCorePortStats
class Stats(metaclass=ABCMeta):
"""Abstract class for Statistics implementation."""
def __init__(self, msg_out_buffer, msg_app_buffer):
"""Store a reference to the controller's buffers.
Args:
msg_out_buffer: Where to send events.
msg_app_buffer: Where to send events to other NApps.
"""
self._buffer = msg_out_buffer
self._app_buffer = msg_app_buffer
@abstractmethod
def request(self, conn):
"""Request statistics."""
@abstractmethod
def listen(self, switch, stats):
"""Listen statistic replies."""
def _send_event(self, req, conn):
event = KytosEvent(
name='kytos/of_stats.messages.out.ofpt_stats_request',
content={'message': req, 'destination': conn})
self._buffer.put(event)
@classmethod
def _save_event_callback(cls, _event, data, error):
"""Execute the callback to handle with kronos event to save data."""
if error:
log.error(f'Can\'t save stats in kytos/kronos: {error}')
log.debug(data)
class PortStats(Stats):
"""Deal with PortStats messages."""
def request(self, conn):
"""Ask for port stats."""
request = self._get_versioned_request(conn.protocol.version)
self._send_event(request, conn)
log.debug('PortStats request for switch %s sent.', conn.switch.id)
@staticmethod
def _get_versioned_request(of_version):
if of_version == 0x01:
return StatsRequest(
body_type=StatsType.OFPST_PORT,
body=v0x01.PortStatsRequest(Port.OFPP_NONE)) # All ports
return MultipartRequest(
multipart_type=MultipartType.OFPMP_PORT_STATS,
body=v0x04.PortStatsRequest())
def listen(self, switch, ports_stats):
"""Receive port stats."""
debug_msg = 'Received port %s stats of switch %s: rx_bytes %s,' \
' tx_bytes %s, rx_dropped %s, tx_dropped %s,' \
' rx_errors %s, tx_errors %s'
for port_stat in ports_stats:
self._update_controller_interface(switch, port_stat)
statistics_to_send = {'rx_bytes': port_stat.rx_bytes.value,
'tx_bytes': port_stat.tx_bytes.value,
'rx_dropped': port_stat.rx_dropped.value,
'tx_dropped': port_stat.tx_dropped.value,
'rx_errors': port_stat.rx_errors.value,
'tx_errors': port_stat.tx_errors.value}
port_no = port_stat.port_no.value
namespace = f'kytos.kronos.{switch.id}.port_no.{port_no}'
content = {'namespace': namespace,
'value': statistics_to_send,
'callback': self._save_event_callback,
'timestamp': time.time()}
event = KytosEvent(name='kytos.kronos.save', content=content)
self._app_buffer.put(event)
log.debug(debug_msg, port_stat.port_no.value, switch.id,
port_stat.rx_bytes.value, port_stat.tx_bytes.value,
port_stat.rx_dropped.value, port_stat.tx_dropped.value,
port_stat.rx_errors.value, port_stat.tx_errors.value)
@staticmethod
def _update_controller_interface(switch, port_stats):
port_no = port_stats.port_no.value
iface = switch.get_interface_by_port_no(port_no)
if iface is not None:
if iface.stats is None:
iface.stats = OFCorePortStats()
iface.stats.update(port_stats)
class AggregateStats(Stats):
"""Deal with AggregateStats message."""
def request(self, conn):
"""Ask for flow stats."""
body = AggregateStatsRequest() # Port.OFPP_NONE and All Tables
req = StatsRequest(body_type=StatsType.OFPST_AGGREGATE, body=body)
self._send_event(req, conn)
log.debug('Aggregate Stats request for switch %s sent.',
conn.switch.dpid)
def listen(self, switch, aggregate_stats):
"""Receive flow stats."""
debug_msg = 'Received aggregate stats from switch {}:' \
' packet_count {}, byte_count {}, flow_count {}'
for aggregate in aggregate_stats:
# need to choose the _id to aggregate_stats
# this class isn't used yet.
log.debug(debug_msg, switch.id, aggregate.packet_count.value,
aggregate.byte_count.value, aggregate.flow_count.value)
# Save aggregate stats using kytos/kronos
namespace = f'kytos.kronos.aggregated_stats.{switch.id}'
stats_to_send = {'aggregate_id': aggregate.id,
'packet_count': aggregate.packet_count.value,
'byte_count': aggregate.byte_count.value,
'flow_count': aggregate.flow_count.value}
content = {'namespace': namespace,
'value': stats_to_send,
'callback': self._save_event_callback,
'timestamp': time.time()}
event = KytosEvent(name='kytos.kronos.save', content=content)
self._app_buffer.put(event)
class FlowStats(Stats):
"""Deal with FlowStats message."""
def request(self, conn):
"""Ask for flow stats."""
request = self._get_versioned_request(conn.protocol.version)
self._send_event(request, conn)
log.debug('FlowStats request for switch %s sent.', conn.switch.id)
@staticmethod
def _get_versioned_request(of_version):
if of_version == 0x01:
return StatsRequest(
body_type=StatsType.OFPST_FLOW,
body=v0x01.FlowStatsRequest())
return MultipartRequest(
multipart_type=MultipartType.OFPMP_FLOW,
body=v0x04.FlowStatsRequest())
def listen(self, switch, flows_stats):
"""Receive flow stats."""
flow_class = FlowFactory.get_class(switch)
for flow_stat in flows_stats:
flow = flow_class.from_of_flow_stats(flow_stat, switch)
# Update controller's flow
controller_flow = switch.get_flow_by_id(flow.id)
if controller_flow:
controller_flow.stats = flow.stats
# Save packet_count using kytos/kronos
namespace = f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
content = {'namespace': namespace,
'value': {'packet_count': flow.stats.packet_count},
'callback': self._save_event_callback,
'timestamp': time.time()}
event = KytosEvent(name='kytos.kronos.save', content=content)
self._app_buffer.put(event)
# Save byte_count using kytos/kronos
namespace = f'kytos.kronos.{switch.id}.flow_id.{flow.id}'
content = {'namespace': namespace,
'value': {'byte_count': flow.stats.byte_count},
'callback': self._save_event_callback,
'timestamp': time.time()}
event = KytosEvent(name='kytos.kronos.save', content=content)
self._app_buffer.put(event)