This repository has been archived by the owner on Nov 7, 2024. It is now read-only.
forked from rexzhang/aiomemcached
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_pool.py
144 lines (117 loc) · 4.29 KB
/
test_pool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import random
import asyncio
import pytest
from aiomemcached.pool import MemcachedPool, MemcachedConnection
from aiomemcached.client import acquire
@pytest.mark.asyncio
async def test_pool_creation(mcache_params):
pool = MemcachedPool(minsize=1, maxsize=5, **mcache_params)
assert pool.size() == 0
assert pool._pool_minsize == 1
@pytest.mark.asyncio
async def test_pool_acquire_release(mcache_params):
pool = MemcachedPool(minsize=1, maxsize=5, **mcache_params)
conn = await pool.acquire()
assert isinstance(conn.reader, asyncio.StreamReader)
assert isinstance(conn.writer, asyncio.StreamWriter)
await pool.release(conn)
@pytest.mark.asyncio
async def test_pool_acquire_release2(mcache_params):
pool = MemcachedPool(minsize=1, maxsize=5, **mcache_params)
reader, writer = await asyncio.open_connection(
mcache_params["host"], mcache_params["port"]
)
# put dead connection to the pool
writer.close()
reader.feed_eof()
conn = MemcachedConnection(reader, writer)
pool._pool.append(conn)
conn = await pool.acquire()
assert isinstance(conn.reader, asyncio.StreamReader)
assert isinstance(conn.writer, asyncio.StreamWriter)
@pytest.mark.asyncio
async def test_pool_clear(mcache_params):
pool = MemcachedPool(minsize=1, maxsize=5, **mcache_params)
conn = await pool.acquire()
await pool.release(conn)
assert pool.size() == 1
await pool.clear()
assert pool.size() == 0
@pytest.mark.asyncio
async def test_acquire_dont_create_new_connection_if_have_conn_in_pool(
mcache_params,
):
pool = MemcachedPool(minsize=1, maxsize=5, **mcache_params)
assert pool.size() == 0
# Add a valid connection
_conn = await pool._create_new_connection()
pool._pool.append(_conn)
assert pool.size() == 1
conn = await pool.acquire()
assert conn is _conn
assert pool.size() == 1
@pytest.mark.asyncio
async def test_acquire_limit_maxsize(mcache_params):
pool = MemcachedPool(minsize=1, maxsize=1, **mcache_params)
assert pool.size() == 0
# Create up to max connections
_conn = await pool.acquire()
assert pool.size() == 1
await pool.release(_conn)
async def acquire_wait_release():
conn = await pool.acquire()
assert conn is _conn
await asyncio.sleep(0.01)
assert pool.size() == 1
await pool.release(conn)
await asyncio.gather(*([acquire_wait_release()] * 50))
assert pool.size() == 1
assert pool.size() == 1
@pytest.mark.asyncio
async def test_acquire_task_cancellation(
mcache_params,
):
class Client:
def __init__(self, pool_size=4):
self._pool = MemcachedPool(
minsize=pool_size, maxsize=pool_size, **mcache_params
)
@acquire
async def acquire_wait_release(self, conn):
assert self._pool.size() <= pool_size
await asyncio.sleep(random.uniform(0.01, 0.02))
return "foo"
pool_size = 4
client = Client(pool_size=pool_size)
tasks = [
asyncio.wait_for(client.acquire_wait_release(), random.uniform(1, 2))
for x in range(1000)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
assert client._pool.size() <= pool_size
assert "foo" in results
@pytest.mark.asyncio
async def test_maxsize_greater_than_minsize(mcache_params):
pool = MemcachedPool(minsize=5, maxsize=1, **mcache_params)
conn = await pool.acquire()
assert isinstance(conn.reader, asyncio.StreamReader)
assert isinstance(conn.writer, asyncio.StreamWriter)
await pool.release(conn)
@pytest.mark.asyncio
async def test_0_minsize(mcache_params):
pool = MemcachedPool(minsize=0, maxsize=5, **mcache_params)
conn = await pool.acquire()
assert isinstance(conn.reader, asyncio.StreamReader)
assert isinstance(conn.writer, asyncio.StreamWriter)
await pool.release(conn)
@pytest.mark.asyncio
async def test_bad_connection(mcache_params):
pool = MemcachedPool(minsize=5, maxsize=1, **mcache_params)
pool._host = "INVALID_HOST"
assert pool.size() == 0
with pytest.raises(Exception):
conn = await pool.acquire()
assert isinstance(conn.reader, asyncio.StreamReader)
assert isinstance(conn.writer, asyncio.StreamWriter)
await pool.release(conn)
assert pool.size() == 0