From c7440c4558de9a989653258458870c1ad83007af Mon Sep 17 00:00:00 2001 From: Victor Heorhiadi Date: Tue, 17 Sep 2019 12:53:48 -0700 Subject: [PATCH] Add Flow to HeaderConstraints converter (#464) --- pybatfish/datamodel/flow.py | 35 ++++++++++++++- tests/datamodel/test_flow.py | 86 ++++++++++++++++++++++++++++++++++-- 2 files changed, 117 insertions(+), 4 deletions(-) diff --git a/pybatfish/datamodel/flow.py b/pybatfish/datamodel/flow.py index e884a9a3..1cea3886 100644 --- a/pybatfish/datamodel/flow.py +++ b/pybatfish/datamodel/flow.py @@ -491,7 +491,8 @@ class FilterStepDetail(DataModelElement): @classmethod def from_dict(cls, json_dict): # type: (Dict) -> FilterStepDetail - return FilterStepDetail(json_dict.get("filter", ""), json_dict.get("type", "")) + return FilterStepDetail(json_dict.get("filter", ""), + json_dict.get("type", "")) def __str__(self): # type: () -> str @@ -952,6 +953,38 @@ def dict(self): del d['firewallClassifications'] return d + @classmethod + def of(cls, flow): + # type: (Flow) -> HeaderConstraints + """Create header constraints from an existing flow.""" + srcPorts = dstPorts = icmpCodes = icmpTypes = tcpFlags = None + if flow._has_ports(): + srcPorts = str(flow.srcPort) + dstPorts = str(flow.dstPort) + if flow.ipProtocol.lower() == 'icmp': + icmpCodes = flow.icmpCode + icmpTypes = flow.icmpVar + if flow.ipProtocol.lower() == 'tcp': + tcpFlags = MatchTcpFlags( + tcpFlags=TcpFlags(bool(flow.tcpFlagsAck), + bool(flow.tcpFlagsCwr), + bool(flow.tcpFlagsEce), + bool(flow.tcpFlagsFin), + bool(flow.tcpFlagsPsh), + bool(flow.tcpFlagsRst), + bool(flow.tcpFlagsSyn), + bool(flow.tcpFlagsUrg))) + return HeaderConstraints(srcIps=flow.srcIp, dstIps=flow.dstIp, + ipProtocols=[str(flow.ipProtocol)], + srcPorts=srcPorts, + dstPorts=dstPorts, + icmpCodes=icmpCodes, + icmpTypes=icmpTypes, + tcpFlags=tcpFlags, + firewallClassifications=flow.state, + fragmentOffsets=flow.fragmentOffset, + packetLengths=flow.packetLength) + @attr.s(frozen=True) class PathConstraints(DataModelElement): diff --git a/tests/datamodel/test_flow.py b/tests/datamodel/test_flow.py index 10bf03f9..0a4b913a 100644 --- a/tests/datamodel/test_flow.py +++ b/tests/datamodel/test_flow.py @@ -37,7 +37,8 @@ def test_exit_output_iface_step_detail_str(): def test_transformation_step_detail_str(): no_diffs = TransformationStepDetail("type", []) - one_diff = TransformationStepDetail("type", [FlowDiff("field", "old", "new")]) + one_diff = TransformationStepDetail("type", + [FlowDiff("field", "old", "new")]) two_diffs = TransformationStepDetail("type", [FlowDiff("field1", "old1", "new1"), FlowDiff("field2", "old2", "new2")]) @@ -49,7 +50,8 @@ def test_transformation_step_detail_str(): assert str(step) == "ACTION(type field: old -> new)" step = Step(two_diffs, "ACTION") - assert str(step) == "ACTION(type field1: old1 -> new1, field2: old2 -> new2)" + assert str( + step) == "ACTION(type field1: old1 -> new1, field2: old2 -> new2)" def test_flow_deserialization(): @@ -327,7 +329,8 @@ def test_flow_repr_html_start_location(): flow_dict['ingressInterface'] = "ingressIface" flow = Flow.from_dict(flow_dict) - assert ("Start Location: ingressNode interface=ingressIface" in flow._repr_html_lines()) + assert ( + "Start Location: ingressNode interface=ingressIface" in flow._repr_html_lines()) def test_flow_repr_html_state(): @@ -420,5 +423,82 @@ def test_MatchSessionStepDetail_str(): assert str(MatchSessionStepDetail()) == "" +def test_header_constraints_of(): + hc = HeaderConstraints.of( + Flow(ipProtocol='ICMP', icmpCode=7, + srcIp="1.1.1.1", + dstIp="2.2.2.2", + dscp=0, + dstPort=0, + srcPort=0, + ecn=0, + fragmentOffset=0, + icmpVar=0, + ingressInterface='', + ingressNode='', + ingressVrf='', + packetLength=0, + state='new', + tag='tag', + tcpFlagsAck=0, + tcpFlagsCwr=0, + tcpFlagsEce=0, + tcpFlagsFin=0, + tcpFlagsPsh=0, + tcpFlagsRst=0, + tcpFlagsSyn=0, + tcpFlagsUrg=0, + )) + assert hc.srcIps == "1.1.1.1" + assert hc.dstIps == "2.2.2.2" + assert hc.ipProtocols == ['ICMP'] + assert hc.icmpCodes == '7' + assert hc.srcPorts is None + assert hc.dstPorts is None + assert hc.tcpFlags is None + + hc = HeaderConstraints.of( + Flow(ipProtocol='TCP', srcPort=1000, dstPort=2000, tcpFlagsAck=True, + srcIp="1.1.1.1", + dstIp="2.2.2.2", + dscp=0, + ecn=0, + fragmentOffset=0, + icmpCode=0, + icmpVar=0, + ingressInterface='', + ingressNode='', + ingressVrf='', + packetLength=0, + state='new', + tag='tag', + tcpFlagsCwr=0, + tcpFlagsEce=0, + tcpFlagsFin=0, + tcpFlagsPsh=0, + tcpFlagsRst=0, + tcpFlagsSyn=0, + tcpFlagsUrg=0, + )) + assert hc.srcIps == "1.1.1.1" + assert hc.dstIps == "2.2.2.2" + assert hc.ipProtocols == ['TCP'] + assert hc.icmpCodes is None + assert hc.icmpTypes is None + assert hc.srcPorts == '1000' + assert hc.dstPorts == '2000' + assert hc.tcpFlags == [MatchTcpFlags( + TcpFlags(ack=True), + useAck=True, + useCwr=True, + useEce=True, + useFin=True, + usePsh=True, + useRst=True, + useSyn=True, + useUrg=True, + )] + + if __name__ == "__main__": pytest.main()