-
Notifications
You must be signed in to change notification settings - Fork 49
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
simonknight
authored and
simonknight
committed
Apr 28, 2021
1 parent
e5b390c
commit a1b0f84
Showing
83 changed files
with
6,486 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import abc | ||
from typing import Dict | ||
|
||
from autonetkit.network_model.network_model import NetworkModel | ||
from autonetkit.network_model.node import Node | ||
|
||
|
||
class BaseCompiler: | ||
""" | ||
""" | ||
|
||
def __init__(self): | ||
pass | ||
|
||
@abc.abstractmethod | ||
def compile(self, network_model: NetworkModel, node: Node) -> Dict: | ||
""" | ||
@param network_model: | ||
@param node: | ||
""" | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from typing import Dict | ||
|
||
from autonetkit.compile.node.base import BaseCompiler | ||
from autonetkit.network_model.network_model import NetworkModel | ||
from autonetkit.network_model.node import Node | ||
|
||
|
||
class LinuxHostCompiler(BaseCompiler): | ||
""" | ||
""" | ||
|
||
def __init__(self): | ||
super().__init__() | ||
|
||
# TODO: assign this at topology load time in pre-process step | ||
self.lo0_id = "lo:1" | ||
|
||
def compile(self, network_model: NetworkModel, node: Node) -> Dict: | ||
""" | ||
@param network_model: | ||
@param node: | ||
@return: | ||
""" | ||
# Note: can expand depending on services defined on host, l7 topologies, etc | ||
return {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
from typing import Dict, List | ||
|
||
from autonetkit.compile.node.base import BaseCompiler | ||
from autonetkit.design.utils import filters | ||
from autonetkit.network_model.exceptions import NodeNotFound | ||
from autonetkit.network_model.network_model import NetworkModel | ||
from autonetkit.network_model.node import Node | ||
|
||
|
||
class QuaggaCompiler(BaseCompiler): | ||
""" | ||
""" | ||
|
||
def __init__(self): | ||
super().__init__() | ||
|
||
# TODO: assign this at topology load time in pre-process step | ||
self.lo0_id = "lo:1" | ||
|
||
def compile(self, network_model: NetworkModel, node: Node) -> Dict: | ||
""" | ||
@param network_model: | ||
@param node: | ||
@return: | ||
""" | ||
# ensure node is from phy | ||
t_phy = network_model.get_topology("physical") | ||
node = t_phy.get_node_by_id(node.id) | ||
|
||
result = {} | ||
|
||
result["zebra"] = self._zebra(node) | ||
result["interfaces"] = self._interfaces(network_model, node) | ||
result["ospf"] = self._ospf(network_model, node) | ||
result["bgp"] = self._bgp(network_model, node) | ||
|
||
return result | ||
|
||
def _zebra(self, node): | ||
return { | ||
"hostname": node.label, | ||
"password": "zebra", | ||
"enable_password": "zebra", | ||
} | ||
|
||
def _interfaces(self, network_model: NetworkModel, node: Node) -> List: | ||
result = [] | ||
|
||
t_ipv4 = network_model.get_topology("ipv4") | ||
|
||
physical_ports = filters.physical_ports(node) | ||
logical_ports = [p for p in node.ports() | ||
if p not in physical_ports] | ||
|
||
physical_ports.sort(key=lambda x: x.label) | ||
logical_ports.sort(key=lambda x: x.label) | ||
for port in physical_ports: | ||
try: | ||
description = port.peer_ports()[0].node.label | ||
except IndexError: | ||
description = "" | ||
|
||
data = { | ||
"label": port.label, | ||
"description": description, | ||
"connected": port.connected | ||
} | ||
|
||
if port.connected: | ||
ipv4_port = t_ipv4.get_port_by_id(port.id) | ||
data["broadcast"] = ipv4_port.get("network").broadcast_address | ||
data["network"] = ipv4_port.get("network").network_address | ||
data["ip"] = ipv4_port.get("ip") | ||
|
||
result.append(data) | ||
|
||
return result | ||
|
||
def _ospf(self, network_model: NetworkModel, node: Node) -> Dict: | ||
t_ospf = network_model.get_topology("ospf") | ||
try: | ||
ospf_node = t_ospf.get_node_by_id(node.id) | ||
except NodeNotFound: | ||
# ospf not configured for this node | ||
return {} | ||
|
||
result = {} | ||
asn = node.get("asn") | ||
t_ipv4 = network_model.get_topology("ipv4") | ||
|
||
infrastructure_blocks = t_ipv4.get("infrastructure_by_asn").get(asn) or [] | ||
networks = [] | ||
for network in infrastructure_blocks: | ||
networks.append({ | ||
"network": network.network_address, | ||
"area": ospf_node.get("area") | ||
}) | ||
|
||
result["networks"] = networks | ||
|
||
return result | ||
|
||
def _bgp(self, network_model: NetworkModel, node: Node) -> Dict: | ||
result = {} | ||
asn = node.get("asn") | ||
result["asn"] = asn | ||
|
||
t_ipv4 = network_model.get_topology("ipv4") | ||
ipv4_node = t_ipv4.get_node_by_id(node.id) | ||
lo0_ipv4_address = ipv4_node.loopback_zero().get("ip") | ||
|
||
infrastructure_blocks = t_ipv4.get("infrastructure_by_asn").get(asn) or [] | ||
|
||
result["networks"] = infrastructure_blocks | ||
|
||
t_ibgp = network_model.get_topology("ibgp") | ||
ibgp_node = t_ibgp.get_node_by_id(node.id) | ||
|
||
ibgp_neighbors = [] | ||
for peer_port in ibgp_node.peer_ports(): | ||
peer_node = peer_port.node | ||
peer_ip_port = t_ipv4.get_port_by_id(peer_port.id) | ||
peer_ipv4 = peer_ip_port.get("ip") | ||
neighbor = f"{peer_port.label}.{peer_node.label}" | ||
description = "link to " + neighbor | ||
ibgp_neighbors.append({ | ||
"update_source": lo0_ipv4_address, | ||
"desc": neighbor, | ||
"asn": peer_node.get("asn"), | ||
"neigh_ip": peer_ipv4, | ||
"neighbor": neighbor, | ||
"description": description | ||
}) | ||
|
||
result["ibgp_neighbors"] = ibgp_neighbors | ||
|
||
t_ebgp = network_model.get_topology("ebgp") | ||
ebgp_node = t_ebgp.get_node_by_id(node.id) | ||
|
||
ebgp_neighbors = [] | ||
for peer_port in ebgp_node.peer_ports(): | ||
peer_node = peer_port.node | ||
peer_ip_port = t_ipv4.get_port_by_id(peer_port.id) | ||
peer_ipv4 = peer_ip_port.get("ip") | ||
neighbor = f"{peer_port.label}.{peer_node.label}" | ||
description = "link to " + neighbor | ||
ebgp_neighbors.append({ | ||
"update_source": lo0_ipv4_address, | ||
"desc": neighbor, | ||
"asn": peer_node.get("asn"), | ||
"neigh_ip": peer_ipv4, | ||
"neighbor": neighbor, | ||
"description": description | ||
}) | ||
|
||
result["ebgp_neighbors"] = ebgp_neighbors | ||
|
||
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import typing | ||
from typing import Dict | ||
|
||
from autonetkit.compile.node.linux_host import LinuxHostCompiler | ||
from autonetkit.compile.node.quagga import QuaggaCompiler | ||
|
||
if typing.TYPE_CHECKING: | ||
from autonetkit.network_model.network_model import NetworkModel | ||
from autonetkit.network_model.node import Node | ||
|
||
|
||
class TargetCompilerNotFound(Exception): | ||
pass | ||
|
||
|
||
class NodeCompiler: | ||
""" | ||
""" | ||
|
||
def __init__(self): | ||
self.node_mapping = { | ||
"quagga": QuaggaCompiler(), | ||
"linux": LinuxHostCompiler() | ||
} | ||
|
||
def compile_node(self, network_model: 'NetworkModel', node: 'Node') -> Dict: | ||
""" | ||
@param network_model: | ||
@param node: | ||
@return: | ||
""" | ||
target = node.get("target") | ||
try: | ||
compiler = self.node_mapping[target] | ||
except KeyError: | ||
raise TargetCompilerNotFound(target) | ||
|
||
return compiler.compile(network_model, node) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
import abc | ||
from typing import Dict | ||
|
||
from autonetkit.network_model.network_model import NetworkModel | ||
|
||
|
||
class BasePlatformCompiler: | ||
""" | ||
""" | ||
|
||
def __init__(self): | ||
pass | ||
|
||
@abc.abstractmethod | ||
def compile(self, network_model: NetworkModel) -> Dict: | ||
""" | ||
@param network_model: | ||
""" | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
import logging | ||
from typing import Dict, List | ||
|
||
from autonetkit.compile.platform.base import BasePlatformCompiler | ||
from autonetkit.design.utils import filters | ||
from autonetkit.network_model.exceptions import PortNotFound | ||
from autonetkit.network_model.network_model import NetworkModel | ||
from autonetkit.network_model.node import Node | ||
from autonetkit.network_model.types import DeviceType | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class KatharaCompiler(BasePlatformCompiler): | ||
def compile(self, network_model: NetworkModel) -> Dict: | ||
""" | ||
@param network_model: | ||
@return: | ||
""" | ||
result = {} | ||
t_phy = network_model.get_topology("physical") | ||
|
||
result["nodes"] = self._compile_node_startup(network_model, t_phy) | ||
result["lab"] = self._compile_lab(t_phy) | ||
|
||
return result | ||
|
||
def _compile_lab(self, t_phy) -> Dict: | ||
result = { | ||
"nodes": [] | ||
} | ||
|
||
for node in t_phy.nodes(): | ||
interfaces = [] | ||
for port in filters.physical_ports(node): | ||
if port.degree() > 0: | ||
if port.degree() > 1: | ||
logger.warning("Port %s on %s has multiple physical links: only using first", | ||
port.label, node.label) | ||
# TODO: note this assumes single physical link per port | ||
link = port.links()[0] | ||
link_id = link.id | ||
|
||
interfaces.append({ | ||
"slot": port.slot, | ||
"link_id": link_id | ||
}) | ||
|
||
entry = { | ||
"label": node.label, | ||
"interfaces": interfaces | ||
} | ||
result["nodes"].append(entry) | ||
|
||
return result | ||
|
||
def _compile_node_startup(self, network_model, t_phy) -> List: | ||
result = [] | ||
t_ipv4 = network_model.get_topology("ipv4") | ||
for node in t_phy.nodes(): | ||
interfaces = [] | ||
for port in filters.physical_ports(node): | ||
data = { | ||
"label": port.label, | ||
"connected": port.connected | ||
} | ||
|
||
if port.connected: | ||
try: | ||
ipv4_port = t_ipv4.get_port_by_id(port.id) | ||
except PortNotFound: | ||
# no ip for this port | ||
continue | ||
|
||
data["ip"] = ipv4_port.get("ip") | ||
data["netmask"] = ipv4_port.get("network").netmask | ||
|
||
interfaces.append(data) | ||
|
||
services = {} | ||
if node.type == DeviceType.ROUTER: | ||
services["zebra"] = True | ||
|
||
result.append({ | ||
"label": node.label, | ||
"interfaces": interfaces, | ||
"services": services | ||
}) | ||
|
||
return result | ||
|
||
def compile_node(self, node: Node) -> Dict: | ||
""" | ||
@param node: | ||
@return: | ||
""" | ||
return { | ||
"hostname": node.label | ||
} |
Empty file.
Oops, something went wrong.