forked from opensearch-project/opensearch-py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbench_async.py
124 lines (92 loc) · 3.29 KB
/
bench_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
import asyncio
import os
import uuid
from typing import Any
from opensearchpy import AsyncHttpConnection, AsyncOpenSearch
async def index_records(client: Any, index_name: str, item_count: int) -> None:
"""asynchronously bulk index item_count records into the index (index_name)"""
await asyncio.gather(
*[
client.index(
index=index_name,
body={
"title": "Moneyball",
"director": "Bennett Miller",
"year": "2011",
},
id=uuid.uuid4(),
)
for j in range(item_count)
]
)
async def test_async(client_count: int = 1, item_count: int = 1) -> None:
"""
asynchronously index with item_count records and run client_count
clients. This function can be used to test balancing the number of
items indexed with the number of documents.
"""
host = "localhost"
port = 9200
auth = ("admin", os.getenv("OPENSEARCH_PASSWORD", "admin"))
index_name = "test-index-async"
clients = []
for i in range(client_count):
clients.append(
AsyncOpenSearch(
hosts=[{"host": host, "port": port}],
http_auth=auth,
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
connection_class=AsyncHttpConnection,
pool_maxsize=client_count,
)
)
if await clients[0].indices.exists(index_name):
await clients[0].indices.delete(index_name)
await clients[0].indices.create(index_name)
await asyncio.gather(
*[
index_records(clients[i], index_name, item_count)
for i in range(client_count)
]
)
await clients[0].indices.refresh(index=index_name)
print(await clients[0].count(index=index_name))
await clients[0].indices.delete(index_name)
await asyncio.gather(*[client.close() for client in clients])
def test(item_count: int = 1, client_count: int = 1) -> None:
"""sets up and executes the asynchronous tests"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(test_async(item_count, client_count))
loop.close()
ITEM_COUNT = 100
def test_1() -> None:
"""run a test for one item and 32*ITEM_COUNT clients"""
test(1, 32 * ITEM_COUNT)
def test_2() -> None:
"""run a test for two items and 16*ITEM_COUNT clients"""
test(2, 16 * ITEM_COUNT)
def test_4() -> None:
"""run a test for two items and 8*ITEM_COUNT clients"""
test(4, 8 * ITEM_COUNT)
def test_8() -> None:
"""run a test for four items and 4*ITEM_COUNT clients"""
test(8, 4 * ITEM_COUNT)
def test_16() -> None:
"""run a test for 16 items and 2*ITEM_COUNT clients"""
test(16, 2 * ITEM_COUNT)
def test_32() -> None:
"""run a test for 32 items and ITEM_COUNT clients"""
test(32, ITEM_COUNT)
__benchmarks__ = [(test_1, test_8, "1 client vs. more clients (async)")]