-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
153 lines (137 loc) · 5.88 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import os
import tempfile
import unittest
import bson
from collections import OrderedDict as odict
from subprocess import check_call, check_output, CalledProcessError, STDOUT
from math import log
def bytes_needed(n):
if n == 0:
return 1
return int(log(abs(n), 256)) + 1
class EncodeTest(unittest.TestCase):
_multiprocess_can_split_ = True
def setUp(self):
self._tempdir = tempfile.TemporaryDirectory()
self.dir = self._tempdir.name
self.target = os.path.join(self.dir, "test")
self.data = [
odict([("asas", 20)]),
odict([("abaiksj", 200), ("kddsokn", 900), ("dsdsojok", 250)]),
odict([("k4", 0xDEADBEEFFEED)]),
odict([("k5", -0x0EADBEEFFEED)])
]
self.compilers = ["gcc", "g++", "clang", "clang++"]
def tearDown(self):
self._tempdir.cleanup()
def test_encode(self):
for cc in self.compilers:
for d in self.data:
self.make_exe(self.target, lambda: self.generate_string(d), cc=cc)
encoded = ''.join(["\\x{:x}".format(x) for x in bson.dumps(d)])
encoded_c = check_output([self.target]).decode("utf-8")
self.assertEqual(encoded, encoded_c, msg="test encode on {cc} data: {d}".format(cc=cc, d=d))
def test_encode_two(self):
for cc in self.compilers:
for d in self.data:
g = lambda: self.generate_string(d) + ", " + self.generate_string(d)
self.make_exe(self.target, g, cc=cc)
encoded = ''.join(["\\x{:x}".format(x) for x in bson.dumps(d)])
encoded_c = check_output([self.target]).decode("utf-8")
self.assertEqual(encoded+encoded, encoded_c, msg="test two on {cc} data: {d}".format(cc=cc, d=d))
def test_decode(self):
for cc in self.compilers:
for d in self.data:
self.make_exe(self.target, lambda: self.generate_string(d), cc=cc)
encoded_c = map(lambda x: x.zfill(2) if x!='' else '' , check_output([self.target]).decode("utf-8").split("\\x"))
encoded_c = bytes(bytearray.fromhex(" ".join(encoded_c)))
self.assertEqual(d, bson.loads(encoded_c), msg="test decode on {cc} data: {d}".format(cc=cc, d=d))
def test_verify_decode(self):
for cc in self.compilers:
for d in self.data:
self.make_exe(self.target, lambda: self.generate_mapping(d), cc=cc)
check_call([self.target])
def test_verify_decode_bson(self):
for cc in self.compilers:
if cc=='g++':
continue
for d in self.data:
self.make_exe(self.target, lambda: self.generate_mapping(d, own_bson=True), cc=cc)
check_call([self.target])
def test_err_verify_decode_bson(self):
for cc in self.compilers:
if cc=='g++':
continue
for ind, d in enumerate(self.data):
self.make_exe(
self.target,
lambda: self.generate_mapping(
d,
another_d=self.data[
ind+1 if ind != len(self.data)-1 else 0
]),
cc=cc)
with self.assertRaises(CalledProcessError):
with open(os.devnull, "w") as devnull:
check_call([self.target], stdout=devnull, stderr=STDOUT)
def generate_mapping(self, d, own_bson=False, another_d=None):
fcn = ""
vars = ""
call = []
asserts = ""
for k,v in reversed(d.items()):
t = "INT32" if bytes_needed(v) <= 4 else "INT64"
tt = "int32_t" if bytes_needed(v) <= 4 else "int64_t"
fcn += "{0}, {1}, {2}, ".format(t, k, len(k))
vars += "{0} {1} = 0;".format(tt, k)
call.append("&{0} ".format(k))
asserts += "assert({} == {});".format(k, v)
fcn = "BSON_VERIFY(msg, {0}, {1})".format(len(d.keys()), fcn)
call = "verify_msg(a, sizeof(a), {});".format(",".join(reversed(call)))
d = another_d if another_d else d
return {
"//EDITVERFCN": fcn,
"//EDITDOC":
', '.join(["'\\x{:02x}'".format(x) for x in bson.dumps(d)])
if own_bson
else self.generate_string(d),
"//EDITVAR": vars,
"//EDITVERIFY": call,
"//EDITASSERT": asserts
}
def make_exe(self, target, d, cc="gcc"):
targetc = target + '.c'
macro = d()
if isinstance(macro,str):
self.make_file(targetc, macro)
else:
self.make_file_from_mapping(targetc, macro)
c = [ cc, ]
if cc=='clang++':
c.extend(['-x','c++']) #silence annoying warning
c.extend(["-I", os.path.dirname(os.path.abspath(__file__)), "-o", target, targetc])
check_call(c)
def generate_string(self, d):
r = ""
for k,v in reversed(d.items()):
t = "INT32" if bytes_needed(v) <= 4 else "INT64"
r += "{0}, {1}, {2}, {3}, ".format(t, k, len(k), v)
return "BSON_DOCUMENT({0}, {1})".format(len(d.keys()), r)
def make_file(self, of, test_line):
with open("test.ctemplate") as base, open(of, "w") as out:
for line in base:
if line.startswith("//EDITHERE"):
out.write(test_line)
else:
out.write(line)
def make_file_from_mapping(self, of, mapping):
with open("test_verify.ctemplate") as base, open(of, "w") as out:
for line in base:
for tag in mapping:
if line.startswith(tag):
out.write(mapping[tag])
break
else:
out.write(line)
if __name__ == "__main__":
unittest.main()