diff --git a/netconf/asmodeus/constants.py b/netconf/asmodeus/constants.py new file mode 100644 index 00000000..6d7078c1 --- /dev/null +++ b/netconf/asmodeus/constants.py @@ -0,0 +1,20 @@ +from .utils import NetGroup + + +dn42_as = 4242421846 + +dn42_allowed_transit = NetGroup( + base_name="dn42-allowed-transit", + v4=[ + "10.0.0.0/8", + "172.20.0.0/14", + "172.31.0.0/16", + ], + v6=["fd00::/8"], +) +ifd3f_dn42 = NetGroup( + base_name="ifd3f-dn42", + v4=["172.23.7.176/28"], + v6=["fd00:ca7:b015::/48"], +) + diff --git a/netconf/asmodeus/firewall.py b/netconf/asmodeus/firewall.py index 28e750c5..fab0c9d2 100644 --- a/netconf/asmodeus/firewall.py +++ b/netconf/asmodeus/firewall.py @@ -1,16 +1,91 @@ -from typing import List +from typing import List, NamedTuple, Optional + +from .constants import dn42_allowed_transit, ifd3f_dn42 +from .utils import NetGroup, afs + + +DN42_ALLOWED_TRANSIT = "dn42-allowed-transit" +IFD3F_DN42 = "ifd3f-dn42" + + +class Rule(NamedTuple): + description: str + cmds: List[str] + src: Optional[NetGroup] = None + dst: Optional[NetGroup] = None + + def render(self, firewall_name: str, rule_num: int) -> List[str]: + cmds = [] + for af in afs: + srcname = af.suffix(self.src.base_name) if self.src else None + dstname = af.suffix(self.dst.base_name) if self.dst else None + cmds.append( + f"set firewall name {firewall_name} rule {rule_num} description {self.description}" + ) + if srcname: + cmds.append( + f"set firewall {af.firewall_name} name {firewall_name} rule {rule_num} source group network-group {srcname}" + ) + if dstname: + cmds.append( + f"set firewall {af.firewall_name} name {firewall_name} rule {rule_num} destination group network-group {dstname}" + ) + for cmd in self.cmds: + cmds.append( + f"set firewall {af.firewall_name} name {firewall_name} rule {rule_num} {cmd}" + ) + return cmds def make_firewall() -> List[str]: - return [ + cmds = [] + cmds += [ "delete firewall", "set firewall global-options state-policy established action 'accept'", "set firewall global-options state-policy related action 'accept'", "set firewall global-options state-policy invalid action 'accept'", - "set firewall group ipv6-network-group dn42-allowed-transit-v6 network 'fd00::/8'", - "set firewall group network-group dn42-allowed-transit-v4 network '10.0.0.0/8'", - "set firewall group network-group dn42-allowed-transit-v4 network '172.20.0.0/14'", - "set firewall group network-group dn42-allowed-transit-v4 network '172.31.0.0/16'", - "set firewall group ipv6-network-group ifd3f-dn42-v6 network 'fd00:ca7:b015::/48'", - "set firewall group network-group ifd3f-dn42-v4 network '172.23.7.176/28'", ] + + cmds += dn42_allowed_transit.make_firewall_groups() + cmds += ifd3f_dn42.make_firewall_groups() + cmds += inbound_connections() + + return cmds + + +def inbound_connections() -> List[str]: + return make_named_firewall( + name="dn42-tunnels-in", + description="'DN42 peering tunnels, inbound traffic'", + rules=[ + Rule( + description="Block Traffic to Operator Assigned IP Space", + dst=ifd3f_dn42, + cmds=["action drop"], + ), + Rule( + description="Allow Peer Transit", + src=dn42_allowed_transit, + dst=dn42_allowed_transit, + cmds=["action accept"], + ), + ], + ) + + +def make_named_firewall( + name: str, description: str, rules: List[Rule], default_action: str = "drop" +) -> List[str]: + cmds = [] + + for af in afs: + cmds.extend( + [ + f"set firewall {af.firewall_name} name {af.suffix(name)} default-action {default_action}", + f"set firewall {af.firewall_name} name {af.suffix(name)} description {description}", + ] + ) + + for i, r in enumerate(rules): + cmds.extend(r.render(name, i + 1)) + return cmds diff --git a/netconf/asmodeus/utils.py b/netconf/asmodeus/utils.py index 844d7cbf..b5edcbe4 100644 --- a/netconf/asmodeus/utils.py +++ b/netconf/asmodeus/utils.py @@ -66,3 +66,47 @@ def render(self) -> List[str]: f"set protocols bgp neighbor {self.peer_address} remote-as {self.peer_asn}", f"set protocols bgp neighbor {self.peer_address} update-source {self.ifname}", ] + + +class AddressFamily(NamedTuple): + group_suffix: str + network_group_key: str + firewall_name: str + + def suffix(self, group_name: str): + """Suffix the given string with -v4 or -v6""" + return f"{group_name}-{self.group_suffix}" + + def extract(self, obj): + """Given an object, extracts attribute .v4 or .v6""" + if self == v4: + return obj.v4 + if self == v6: + return obj.v6 + + +v4 = AddressFamily( + group_suffix="v4", network_group_key="network-group", firewall_name="ipv4" +) +v6 = AddressFamily( + group_suffix="v6", network_group_key="ipv6-network-group", firewall_name="ipv6" +) +afs = [v4, v6] + + +class NetGroup(NamedTuple): + base_name: str + v4: List[str] + v6: List[str] + + def make_firewall_groups(self): + cmds = [] + for af in afs: + cmds += [ + f"set firewall group {af.network_group_key} {af.suffix(self.base_name)}" + ] + for addr in af.extract(self): + cmds += [ + f"set firewall group {af.network_group_key} {af.suffix(self.base_name)} network {addr}" + ] + return cmds