Skip to content

Commit

Permalink
Add Flow to HeaderConstraints converter (#464)
Browse files Browse the repository at this point in the history
  • Loading branch information
progwriter authored Sep 17, 2019
1 parent 61fa0cd commit c7440c4
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 4 deletions.
35 changes: 34 additions & 1 deletion pybatfish/datamodel/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ class FilterStepDetail(DataModelElement):
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> FilterStepDetail
return FilterStepDetail(json_dict.get("filter", ""), json_dict.get("type", ""))
return FilterStepDetail(json_dict.get("filter", ""),
json_dict.get("type", ""))

def __str__(self):
# type: () -> str
Expand Down Expand Up @@ -952,6 +953,38 @@ def dict(self):
del d['firewallClassifications']
return d

@classmethod
def of(cls, flow):
# type: (Flow) -> HeaderConstraints
"""Create header constraints from an existing flow."""
srcPorts = dstPorts = icmpCodes = icmpTypes = tcpFlags = None
if flow._has_ports():
srcPorts = str(flow.srcPort)
dstPorts = str(flow.dstPort)
if flow.ipProtocol.lower() == 'icmp':
icmpCodes = flow.icmpCode
icmpTypes = flow.icmpVar
if flow.ipProtocol.lower() == 'tcp':
tcpFlags = MatchTcpFlags(
tcpFlags=TcpFlags(bool(flow.tcpFlagsAck),
bool(flow.tcpFlagsCwr),
bool(flow.tcpFlagsEce),
bool(flow.tcpFlagsFin),
bool(flow.tcpFlagsPsh),
bool(flow.tcpFlagsRst),
bool(flow.tcpFlagsSyn),
bool(flow.tcpFlagsUrg)))
return HeaderConstraints(srcIps=flow.srcIp, dstIps=flow.dstIp,
ipProtocols=[str(flow.ipProtocol)],
srcPorts=srcPorts,
dstPorts=dstPorts,
icmpCodes=icmpCodes,
icmpTypes=icmpTypes,
tcpFlags=tcpFlags,
firewallClassifications=flow.state,
fragmentOffsets=flow.fragmentOffset,
packetLengths=flow.packetLength)


@attr.s(frozen=True)
class PathConstraints(DataModelElement):
Expand Down
86 changes: 83 additions & 3 deletions tests/datamodel/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def test_exit_output_iface_step_detail_str():

def test_transformation_step_detail_str():
no_diffs = TransformationStepDetail("type", [])
one_diff = TransformationStepDetail("type", [FlowDiff("field", "old", "new")])
one_diff = TransformationStepDetail("type",
[FlowDiff("field", "old", "new")])
two_diffs = TransformationStepDetail("type",
[FlowDiff("field1", "old1", "new1"),
FlowDiff("field2", "old2", "new2")])
Expand All @@ -49,7 +50,8 @@ def test_transformation_step_detail_str():
assert str(step) == "ACTION(type field: old -> new)"

step = Step(two_diffs, "ACTION")
assert str(step) == "ACTION(type field1: old1 -> new1, field2: old2 -> new2)"
assert str(
step) == "ACTION(type field1: old1 -> new1, field2: old2 -> new2)"


def test_flow_deserialization():
Expand Down Expand Up @@ -327,7 +329,8 @@ def test_flow_repr_html_start_location():
flow_dict['ingressInterface'] = "ingressIface"

flow = Flow.from_dict(flow_dict)
assert ("Start Location: ingressNode interface=ingressIface" in flow._repr_html_lines())
assert (
"Start Location: ingressNode interface=ingressIface" in flow._repr_html_lines())


def test_flow_repr_html_state():
Expand Down Expand Up @@ -420,5 +423,82 @@ def test_MatchSessionStepDetail_str():
assert str(MatchSessionStepDetail()) == ""


def test_header_constraints_of():
hc = HeaderConstraints.of(
Flow(ipProtocol='ICMP', icmpCode=7,
srcIp="1.1.1.1",
dstIp="2.2.2.2",
dscp=0,
dstPort=0,
srcPort=0,
ecn=0,
fragmentOffset=0,
icmpVar=0,
ingressInterface='',
ingressNode='',
ingressVrf='',
packetLength=0,
state='new',
tag='tag',
tcpFlagsAck=0,
tcpFlagsCwr=0,
tcpFlagsEce=0,
tcpFlagsFin=0,
tcpFlagsPsh=0,
tcpFlagsRst=0,
tcpFlagsSyn=0,
tcpFlagsUrg=0,
))
assert hc.srcIps == "1.1.1.1"
assert hc.dstIps == "2.2.2.2"
assert hc.ipProtocols == ['ICMP']
assert hc.icmpCodes == '7'
assert hc.srcPorts is None
assert hc.dstPorts is None
assert hc.tcpFlags is None

hc = HeaderConstraints.of(
Flow(ipProtocol='TCP', srcPort=1000, dstPort=2000, tcpFlagsAck=True,
srcIp="1.1.1.1",
dstIp="2.2.2.2",
dscp=0,
ecn=0,
fragmentOffset=0,
icmpCode=0,
icmpVar=0,
ingressInterface='',
ingressNode='',
ingressVrf='',
packetLength=0,
state='new',
tag='tag',
tcpFlagsCwr=0,
tcpFlagsEce=0,
tcpFlagsFin=0,
tcpFlagsPsh=0,
tcpFlagsRst=0,
tcpFlagsSyn=0,
tcpFlagsUrg=0,
))
assert hc.srcIps == "1.1.1.1"
assert hc.dstIps == "2.2.2.2"
assert hc.ipProtocols == ['TCP']
assert hc.icmpCodes is None
assert hc.icmpTypes is None
assert hc.srcPorts == '1000'
assert hc.dstPorts == '2000'
assert hc.tcpFlags == [MatchTcpFlags(
TcpFlags(ack=True),
useAck=True,
useCwr=True,
useEce=True,
useFin=True,
usePsh=True,
useRst=True,
useSyn=True,
useUrg=True,
)]


if __name__ == "__main__":
pytest.main()

0 comments on commit c7440c4

Please sign in to comment.