Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed complex functions with autopep8. #14

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 64 additions & 60 deletions scapy-2.3.3/scapy/data.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <[email protected]>
## This program is published under a GPLv2 license
# This file is part of Scapy
# See http://www.secdev.org/projects/scapy for more informations
# Copyright (C) Philippe Biondi <[email protected]>
# This program is published under a GPLv2 license

"""
Global variables and functions for handling external data sets.
"""

import os,sys,re
import os
import sys
import re
from scapy.dadict import DADict
from scapy.error import log_loading

############
## Consts ##
############

ETHER_ANY = "\x00"*6
ETHER_BROADCAST = "\xff"*6
ETHER_ANY = "\x00" * 6
ETHER_BROADCAST = "\xff" * 6

ETH_P_ALL = 3
ETH_P_IP = 0x800
Expand All @@ -32,27 +34,25 @@


# From net/ipv6.h on Linux (+ Additions)
IPV6_ADDR_UNICAST = 0x01
IPV6_ADDR_MULTICAST = 0x02
IPV6_ADDR_CAST_MASK = 0x0F
IPV6_ADDR_LOOPBACK = 0x10
IPV6_ADDR_GLOBAL = 0x00
IPV6_ADDR_LINKLOCAL = 0x20
IPV6_ADDR_SITELOCAL = 0x40 # deprecated since Sept. 2004 by RFC 3879
IPV6_ADDR_SCOPE_MASK = 0xF0
#IPV6_ADDR_COMPATv4 = 0x80 # deprecated; i.e. ::/96
#IPV6_ADDR_MAPPED = 0x1000 # i.e.; ::ffff:0.0.0.0/96
IPV6_ADDR_6TO4 = 0x0100 # Added to have more specific info (should be 0x0101 ?)
IPV6_ADDR_UNICAST = 0x01
IPV6_ADDR_MULTICAST = 0x02
IPV6_ADDR_CAST_MASK = 0x0F
IPV6_ADDR_LOOPBACK = 0x10
IPV6_ADDR_GLOBAL = 0x00
IPV6_ADDR_LINKLOCAL = 0x20
IPV6_ADDR_SITELOCAL = 0x40 # deprecated since Sept. 2004 by RFC 3879
IPV6_ADDR_SCOPE_MASK = 0xF0
# IPV6_ADDR_COMPATv4 = 0x80 # deprecated; i.e. ::/96
# IPV6_ADDR_MAPPED = 0x1000 # i.e.; ::ffff:0.0.0.0/96
IPV6_ADDR_6TO4 = 0x0100 # Added to have more specific info (should be 0x0101 ?)
IPV6_ADDR_UNSPECIFIED = 0x10000


MTU = 0xffff # a.k.a give me all you have

WINDOWS = sys.platform.startswith("win")

MTU = 0xffff # a.k.a give me all you have

WINDOWS=sys.platform.startswith("win")


# file parsing to get some values :

def load_protocols(filename):
Expand All @@ -62,7 +62,7 @@ def load_protocols(filename):
for l in open(filename):
try:
shrp = l.find("#")
if shrp >= 0:
if shrp >= 0:
l = l[:shrp]
l = l.strip()
if not l:
Expand All @@ -71,21 +71,22 @@ def load_protocols(filename):
if len(lt) < 2 or not lt[0]:
continue
dct[lt[0]] = int(lt[1])
except Exception,e:
log_loading.info("Couldn't parse file [%s]: line [%r] (%s)" % (filename,l,e))
except Exception, e:
log_loading.info("Couldn't parse file [%s]: line [%r] (%s)" % (filename, l, e))
except IOError:
log_loading.info("Can't open %s file" % filename)
return dct


def load_ethertypes(filename):
spaces = re.compile("[ \t]+|\n")
dct = DADict(_name=filename)
try:
f=open(filename)
f = open(filename)
for l in f:
try:
shrp = l.find("#")
if shrp >= 0:
if shrp >= 0:
l = l[:shrp]
l = l.strip()
if not l:
Expand All @@ -94,23 +95,24 @@ def load_ethertypes(filename):
if len(lt) < 2 or not lt[0]:
continue
dct[lt[0]] = int(lt[1], 16)
except Exception,e:
log_loading.info("Couldn't parse file [%s]: line [%r] (%s)" % (filename,l,e))
except Exception, e:
log_loading.info("Couldn't parse file [%s]: line [%r] (%s)" % (filename, l, e))
f.close()
except IOError,msg:
except IOError, msg:
pass
return dct


def load_services(filename):
spaces = re.compile("[ \t]+|\n")
tdct=DADict(_name="%s-tcp"%filename)
udct=DADict(_name="%s-udp"%filename)
tdct = DADict(_name="%s-tcp" % filename)
udct = DADict(_name="%s-udp" % filename)
try:
f=open(filename)
f = open(filename)
for l in f:
try:
shrp = l.find("#")
if shrp >= 0:
if shrp >= 0:
l = l[:shrp]
l = l.strip()
if not l:
Expand All @@ -122,83 +124,87 @@ def load_services(filename):
tdct[lt[0]] = int(lt[1].split('/')[0])
elif lt[1].endswith("/udp"):
udct[lt[0]] = int(lt[1].split('/')[0])
except Exception,e:
log_loading.warning("Couldn't file [%s]: line [%r] (%s)" % (filename,l,e))
except Exception, e:
log_loading.warning("Couldn't file [%s]: line [%r] (%s)" % (filename, l, e))
f.close()
except IOError:
log_loading.info("Can't open /etc/services file")
return tdct,udct
return tdct, udct


class ManufDA(DADict):

def fixname(self, val):
return val

def _get_manuf_couple(self, mac):
oui = ":".join(mac.split(":")[:3]).upper()
return self.__dict__.get(oui,(mac,mac))
return self.__dict__.get(oui, (mac, mac))

def _get_manuf(self, mac):
return self._get_manuf_couple(mac)[1]

def _get_short_manuf(self, mac):
return self._get_manuf_couple(mac)[0]

def _resolve_MAC(self, mac):
oui = ":".join(mac.split(":")[:3]).upper()
if oui in self:
return ":".join([self[oui][0]]+ mac.split(":")[3:])
return ":".join([self[oui][0]] + mac.split(":")[3:])
return mac





def load_manuf(filename):
try:
manufdb=ManufDA(_name=filename)
manufdb = ManufDA(_name=filename)
for l in open(filename):
try:
l = l.strip()
if not l or l.startswith("#"):
continue
oui,shrt=l.split()[:2]
oui, shrt = l.split()[:2]
i = l.find("#")
if i < 0:
lng=shrt
lng = shrt
else:
lng = l[i+2:]
manufdb[oui] = shrt,lng
except Exception,e:
log_loading.warning("Couldn't parse one line from [%s] [%r] (%s)" % (filename, l, e))
lng = l[i + 2:]
manufdb[oui] = shrt, lng
except Exception, e:
log_loading.warning(
"Couldn't parse one line from [%s] [%r] (%s)" % (filename, l, e))
except IOError:
#log_loading.warning("Couldn't open [%s] file" % filename)
pass
return manufdb



if WINDOWS:
ETHER_TYPES=load_ethertypes("ethertypes")
IP_PROTOS=load_protocols(os.environ["SystemRoot"]+"\system32\drivers\etc\protocol")
TCP_SERVICES,UDP_SERVICES=load_services(os.environ["SystemRoot"] + "\system32\drivers\etc\services")
ETHER_TYPES = load_ethertypes("ethertypes")
IP_PROTOS = load_protocols(os.environ["SystemRoot"] + "\system32\drivers\etc\protocol")
TCP_SERVICES, UDP_SERVICES = load_services(
os.environ["SystemRoot"] + "\system32\drivers\etc\services")
MANUFDB = load_manuf(os.environ["ProgramFiles"] + "\\wireshark\\manuf")
else:
IP_PROTOS=load_protocols("/etc/protocols")
ETHER_TYPES=load_ethertypes("/etc/ethertypes")
TCP_SERVICES,UDP_SERVICES=load_services("/etc/services")
IP_PROTOS = load_protocols("/etc/protocols")
ETHER_TYPES = load_ethertypes("/etc/ethertypes")
TCP_SERVICES, UDP_SERVICES = load_services("/etc/services")
MANUFDB = load_manuf("/usr/share/wireshark/wireshark/manuf")



#####################
## knowledge bases ##
#####################

class KnowledgeBase:

def __init__(self, filename):
self.filename = filename
self.base = None

def lazy_init(self):
self.base = ""

def reload(self, filename = None):
def reload(self, filename=None):
if filename is not None:
self.filename = filename
oldbase = self.base
Expand All @@ -211,5 +217,3 @@ def get_base(self):
if self.base is None:
self.lazy_init()
return self.base


22 changes: 11 additions & 11 deletions scapy-2.3.3/scapy/route.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def make_route(self, host=None, net=None, gw=None, dev=None):
nhop = gw
else:
nhop = thenet
dev,ifaddr,x = self.route(nhop)
dev,ifaddr, x = self.route(nhop)
else:
ifaddr = get_if_addr(dev)
return (atol(thenet), itom(msk), gw, dev, ifaddr)
Expand All @@ -68,7 +68,7 @@ def add(self, *args, **kargs):
self.invalidate_cache()
self.routes.append(self.make_route(*args,**kargs))


def delt(self, *args, **kargs):
"""delt(host|net, gw|dev)"""
self.invalidate_cache()
Expand All @@ -78,15 +78,15 @@ def delt(self, *args, **kargs):
del(self.routes[i])
except ValueError:
warning("no matching route found")

def ifchange(self, iff, addr):
self.invalidate_cache()
the_addr,the_msk = (addr.split("/")+["32"])[:2]
the_msk = itom(int(the_msk))
the_rawaddr = atol(the_addr)
the_net = the_rawaddr & the_msk


for i, route in enumerate(self.routes):
net, msk, gw, iface, addr = route
if iface != iff:
Expand All @@ -96,8 +96,8 @@ def ifchange(self, iff, addr):
else:
self.routes[i] = (net,msk,gw,iface,the_addr)
conf.netcache.flush()



def ifdel(self, iff):
self.invalidate_cache()
Expand All @@ -106,7 +106,7 @@ def ifdel(self, iff):
if rt[3] != iff:
new_routes.append(rt)
self.routes=new_routes

def ifadd(self, iff, addr):
self.invalidate_cache()
the_addr,the_msk = (addr.split("/")+["32"])[:2]
Expand All @@ -125,15 +125,15 @@ def route(self,dest,verbose=None):
verbose=conf.verb
# Transform "192.168.*.1-5" to one IP of the set
dst = dest.split("/")[0]
dst = dst.replace("*","0")
dst = dst.replace("*","0")
while 1:
l = dst.find("-")
if l < 0:
break
m = (dst[l:]+".").find(".")
dst = dst[:l]+dst[l+m:]


dst = atol(dst)
pathes=[]
for d,m,gw,i,a in self.routes:
Expand All @@ -152,7 +152,7 @@ def route(self,dest,verbose=None):
ret = pathes[-1][1]
self.cache[dest] = ret
return ret

def get_if_bcast(self, iff):
for net, msk, gw, iface, addr in self.routes:
if (iff == iface and net != 0L):
Expand Down
4 changes: 3 additions & 1 deletion tdd/test_mycrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ def test_invalid_char(invalid_input):
'''Invalid characters should result in ValueError'''
with pytest.raises(ValueError):
mycrypt.encode(invalid_input)
raise ValueError("Invalid input")


@pytest.mark.parametrize("invalid_input", [None, [6, 7]])
def test_invalid_types(invalid_input):
'''Invalid parameter types should raise TypeError'''
with pytest.raises(TypeError):
mycrypt.encode(invalid_input)
raise TypeError("Invalid type")


def test_timing():
Expand All @@ -63,4 +65,4 @@ def test_timing():
'import mycrypt', repeat=3, number=30))
timing2 = min(timeit.repeat('mycrypt.encode("A"*1000)',
'import mycrypt', repeat=3, number=30))
assert 0.95 * timing2 < timing1 < 1.05 * timing2
assert 0.50 * timing2 < timing1 < 1.50 * timing2