Skip to content

Commit

Permalink
Merge pull request #270 from sk2/thesis16
Browse files Browse the repository at this point in the history
rework to python3
  • Loading branch information
sk2 authored Apr 28, 2021
2 parents e35e6ca + a1b0f84 commit b161475
Show file tree
Hide file tree
Showing 83 changed files with 6,486 additions and 0 deletions.
Empty file added autonetkit/compile/__init__.py
Empty file.
Empty file.
23 changes: 23 additions & 0 deletions autonetkit/compile/node/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import abc
from typing import Dict

from autonetkit.network_model.network_model import NetworkModel
from autonetkit.network_model.node import Node


class BaseCompiler:
"""
"""

def __init__(self):
pass

@abc.abstractmethod
def compile(self, network_model: NetworkModel, node: Node) -> Dict:
"""
@param network_model:
@param node:
"""
pass
27 changes: 27 additions & 0 deletions autonetkit/compile/node/linux_host.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Dict

from autonetkit.compile.node.base import BaseCompiler
from autonetkit.network_model.network_model import NetworkModel
from autonetkit.network_model.node import Node


class LinuxHostCompiler(BaseCompiler):
"""
"""

def __init__(self):
super().__init__()

# TODO: assign this at topology load time in pre-process step
self.lo0_id = "lo:1"

def compile(self, network_model: NetworkModel, node: Node) -> Dict:
"""
@param network_model:
@param node:
@return:
"""
# Note: can expand depending on services defined on host, l7 topologies, etc
return {}
160 changes: 160 additions & 0 deletions autonetkit/compile/node/quagga.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from typing import Dict, List

from autonetkit.compile.node.base import BaseCompiler
from autonetkit.design.utils import filters
from autonetkit.network_model.exceptions import NodeNotFound
from autonetkit.network_model.network_model import NetworkModel
from autonetkit.network_model.node import Node


class QuaggaCompiler(BaseCompiler):
"""
"""

def __init__(self):
super().__init__()

# TODO: assign this at topology load time in pre-process step
self.lo0_id = "lo:1"

def compile(self, network_model: NetworkModel, node: Node) -> Dict:
"""
@param network_model:
@param node:
@return:
"""
# ensure node is from phy
t_phy = network_model.get_topology("physical")
node = t_phy.get_node_by_id(node.id)

result = {}

result["zebra"] = self._zebra(node)
result["interfaces"] = self._interfaces(network_model, node)
result["ospf"] = self._ospf(network_model, node)
result["bgp"] = self._bgp(network_model, node)

return result

def _zebra(self, node):
return {
"hostname": node.label,
"password": "zebra",
"enable_password": "zebra",
}

def _interfaces(self, network_model: NetworkModel, node: Node) -> List:
result = []

t_ipv4 = network_model.get_topology("ipv4")

physical_ports = filters.physical_ports(node)
logical_ports = [p for p in node.ports()
if p not in physical_ports]

physical_ports.sort(key=lambda x: x.label)
logical_ports.sort(key=lambda x: x.label)
for port in physical_ports:
try:
description = port.peer_ports()[0].node.label
except IndexError:
description = ""

data = {
"label": port.label,
"description": description,
"connected": port.connected
}

if port.connected:
ipv4_port = t_ipv4.get_port_by_id(port.id)
data["broadcast"] = ipv4_port.get("network").broadcast_address
data["network"] = ipv4_port.get("network").network_address
data["ip"] = ipv4_port.get("ip")

result.append(data)

return result

def _ospf(self, network_model: NetworkModel, node: Node) -> Dict:
t_ospf = network_model.get_topology("ospf")
try:
ospf_node = t_ospf.get_node_by_id(node.id)
except NodeNotFound:
# ospf not configured for this node
return {}

result = {}
asn = node.get("asn")
t_ipv4 = network_model.get_topology("ipv4")

infrastructure_blocks = t_ipv4.get("infrastructure_by_asn").get(asn) or []
networks = []
for network in infrastructure_blocks:
networks.append({
"network": network.network_address,
"area": ospf_node.get("area")
})

result["networks"] = networks

return result

def _bgp(self, network_model: NetworkModel, node: Node) -> Dict:
result = {}
asn = node.get("asn")
result["asn"] = asn

t_ipv4 = network_model.get_topology("ipv4")
ipv4_node = t_ipv4.get_node_by_id(node.id)
lo0_ipv4_address = ipv4_node.loopback_zero().get("ip")

infrastructure_blocks = t_ipv4.get("infrastructure_by_asn").get(asn) or []

result["networks"] = infrastructure_blocks

t_ibgp = network_model.get_topology("ibgp")
ibgp_node = t_ibgp.get_node_by_id(node.id)

ibgp_neighbors = []
for peer_port in ibgp_node.peer_ports():
peer_node = peer_port.node
peer_ip_port = t_ipv4.get_port_by_id(peer_port.id)
peer_ipv4 = peer_ip_port.get("ip")
neighbor = f"{peer_port.label}.{peer_node.label}"
description = "link to " + neighbor
ibgp_neighbors.append({
"update_source": lo0_ipv4_address,
"desc": neighbor,
"asn": peer_node.get("asn"),
"neigh_ip": peer_ipv4,
"neighbor": neighbor,
"description": description
})

result["ibgp_neighbors"] = ibgp_neighbors

t_ebgp = network_model.get_topology("ebgp")
ebgp_node = t_ebgp.get_node_by_id(node.id)

ebgp_neighbors = []
for peer_port in ebgp_node.peer_ports():
peer_node = peer_port.node
peer_ip_port = t_ipv4.get_port_by_id(peer_port.id)
peer_ipv4 = peer_ip_port.get("ip")
neighbor = f"{peer_port.label}.{peer_node.label}"
description = "link to " + neighbor
ebgp_neighbors.append({
"update_source": lo0_ipv4_address,
"desc": neighbor,
"asn": peer_node.get("asn"),
"neigh_ip": peer_ipv4,
"neighbor": neighbor,
"description": description
})

result["ebgp_neighbors"] = ebgp_neighbors

return result
40 changes: 40 additions & 0 deletions autonetkit/compile/node_compiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import typing
from typing import Dict

from autonetkit.compile.node.linux_host import LinuxHostCompiler
from autonetkit.compile.node.quagga import QuaggaCompiler

if typing.TYPE_CHECKING:
from autonetkit.network_model.network_model import NetworkModel
from autonetkit.network_model.node import Node


class TargetCompilerNotFound(Exception):
pass


class NodeCompiler:
"""
"""

def __init__(self):
self.node_mapping = {
"quagga": QuaggaCompiler(),
"linux": LinuxHostCompiler()
}

def compile_node(self, network_model: 'NetworkModel', node: 'Node') -> Dict:
"""
@param network_model:
@param node:
@return:
"""
target = node.get("target")
try:
compiler = self.node_mapping[target]
except KeyError:
raise TargetCompilerNotFound(target)

return compiler.compile(network_model, node)
Empty file.
21 changes: 21 additions & 0 deletions autonetkit/compile/platform/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import abc
from typing import Dict

from autonetkit.network_model.network_model import NetworkModel


class BasePlatformCompiler:
"""
"""

def __init__(self):
pass

@abc.abstractmethod
def compile(self, network_model: NetworkModel) -> Dict:
"""
@param network_model:
"""
pass
101 changes: 101 additions & 0 deletions autonetkit/compile/platform/kathara.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import logging
from typing import Dict, List

from autonetkit.compile.platform.base import BasePlatformCompiler
from autonetkit.design.utils import filters
from autonetkit.network_model.exceptions import PortNotFound
from autonetkit.network_model.network_model import NetworkModel
from autonetkit.network_model.node import Node
from autonetkit.network_model.types import DeviceType

logger = logging.getLogger(__name__)


class KatharaCompiler(BasePlatformCompiler):
def compile(self, network_model: NetworkModel) -> Dict:
"""
@param network_model:
@return:
"""
result = {}
t_phy = network_model.get_topology("physical")

result["nodes"] = self._compile_node_startup(network_model, t_phy)
result["lab"] = self._compile_lab(t_phy)

return result

def _compile_lab(self, t_phy) -> Dict:
result = {
"nodes": []
}

for node in t_phy.nodes():
interfaces = []
for port in filters.physical_ports(node):
if port.degree() > 0:
if port.degree() > 1:
logger.warning("Port %s on %s has multiple physical links: only using first",
port.label, node.label)
# TODO: note this assumes single physical link per port
link = port.links()[0]
link_id = link.id

interfaces.append({
"slot": port.slot,
"link_id": link_id
})

entry = {
"label": node.label,
"interfaces": interfaces
}
result["nodes"].append(entry)

return result

def _compile_node_startup(self, network_model, t_phy) -> List:
result = []
t_ipv4 = network_model.get_topology("ipv4")
for node in t_phy.nodes():
interfaces = []
for port in filters.physical_ports(node):
data = {
"label": port.label,
"connected": port.connected
}

if port.connected:
try:
ipv4_port = t_ipv4.get_port_by_id(port.id)
except PortNotFound:
# no ip for this port
continue

data["ip"] = ipv4_port.get("ip")
data["netmask"] = ipv4_port.get("network").netmask

interfaces.append(data)

services = {}
if node.type == DeviceType.ROUTER:
services["zebra"] = True

result.append({
"label": node.label,
"interfaces": interfaces,
"services": services
})

return result

def compile_node(self, node: Node) -> Dict:
"""
@param node:
@return:
"""
return {
"hostname": node.label
}
Empty file added autonetkit/design/__init__.py
Empty file.
Loading

0 comments on commit b161475

Please sign in to comment.