-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathtest.py
162 lines (127 loc) · 4.77 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from srudp import SecureReliableSocket, Packet
from concurrent.futures import ThreadPoolExecutor
from time import sleep
import socket as s
import random
import unittest
import asyncio
import logging
import os
logger = logging.getLogger("srudp")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'[%(levelname)-6s] [%(threadName)-10s] [%(asctime)-24s] %(message)s')
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(formatter)
logger.addHandler(sh)
IS_TRAVIS = os.getenv('TRAVIS') == 'true'
IS_WINDOWS = os.name == 'nt'
class TestSocket(unittest.TestCase):
def setUp(self) -> None:
logger.info("start")
self.port = random.randint(10000, 30000)
self.executor = ThreadPoolExecutor(4, thread_name_prefix="Thread")
def tearDown(self) -> None:
self.executor.shutdown(True)
logger.info("end")
def test_basic(self):
logger.info("start test_basic()")
sock1 = SecureReliableSocket()
sock2 = SecureReliableSocket()
sock1.settimeout(5.0)
sock2.settimeout(5.0)
# connect
fut1 = self.executor.submit(sock1.connect, ("127.0.0.1", self.port))
fut2 = self.executor.submit(sock2.connect, ("127.0.0.1", self.port))
fut1.result(10.0)
fut2.result(10.0)
# connection info
assert sock1.getpeername() == sock2.getsockname(), (sock1.getpeername(), sock2.getsockname())
# normal sending
sock1.sendall(b"hello world")
assert sock2.recv(1024) == b"hello world"
# broadcast sending
sock2.broadcast(b"good man")
assert sock1.recv(1024) == b"good man"
# broadcast hook fnc
def hook_fnc(packet: Packet, _sock: SecureReliableSocket):
assert packet.data == b"broadcasting now"
sock1.broadcast_hook_fnc = hook_fnc
sock2.broadcast(b"broadcasting now")
# close
sock1.close()
sock2.close()
def test_big_size(self):
logger.info("start test_big_size()")
sock1 = SecureReliableSocket()
sock2 = SecureReliableSocket()
sock1.settimeout(5.0)
sock2.settimeout(5.0)
# connect
fut1 = self.executor.submit(sock1.connect, ("127.0.0.1", self.port))
fut2 = self.executor.submit(sock2.connect, ("127.0.0.1", self.port))
fut1.result(10.0)
fut2.result(10.0)
# 1M bytes data
data = os.urandom(1000000)
self.executor.submit(sock2.sendall, data)\
.add_done_callback(lambda fut: fut.result())
received = b""
while True:
try:
received += sock1.recv(4096)
if 1000000 <= len(received):
break
except s.timeout:
break
assert received == data, (len(received), len(data))
# close
sock1.close()
sock2.close()
def test_ipv6(self):
logger.info("start test_ipv6()")
# https://docs.travis-ci.com/user/reference/overview/#virtualisation-environment-vs-operating-system
if IS_TRAVIS:
return unittest.skip("ipv6 isn't supported")
sock1 = SecureReliableSocket(s.AF_INET6)
sock2 = SecureReliableSocket(s.AF_INET6)
sock1.settimeout(5.0)
sock2.settimeout(5.0)
# connect
fut1 = self.executor.submit(sock1.connect, ("::1", self.port))
sleep(1.0)
fut2 = self.executor.submit(sock2.connect, ("::1", self.port))
fut1.result(10.0)
fut2.result(10.0)
assert sock1.established and sock2.established, (sock1, sock2)
# close
sock1.close()
sock2.close()
def test_asyncio(self):
logger.info("start test_asyncio()")
if IS_TRAVIS and IS_WINDOWS:
return unittest.skip("travis's windows fail stream drain()")
loop = asyncio.get_event_loop()
sock1 = SecureReliableSocket()
sock2 = SecureReliableSocket()
sock1.setblocking(False)
sock2.setblocking(False)
async def coro():
fut1 = loop.run_in_executor(self.executor, sock1.connect, ("127.0.0.1", self.port))
fut2 = loop.run_in_executor(self.executor, sock2.connect, ("127.0.0.1", self.port))
await fut1
await fut2
reader1, writer1 = await asyncio.open_connection(sock=sock1)
reader2, writer2 = await asyncio.open_connection(sock=sock2)
writer1.write(b"nice world")
await writer1.drain()
received = await reader2.read(1024)
assert received == b"nice world"
writer1.close()
writer2.close()
loop.run_until_complete(coro())
sleep(1.0)
assert sock1.is_closed and sock2.is_closed, (sock1, sock2)
if __name__ == "__main__":
unittest.main()