Skip to content

Commit

Permalink
Merge branch 'main' into bump/aiohttp_to_3.9.0
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel (dB.) Doubrovkine <[email protected]>
  • Loading branch information
dblock authored Jan 19, 2024
2 parents 526510d + 2ab3a40 commit 9d06377
Show file tree
Hide file tree
Showing 32 changed files with 168 additions and 30 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added a log collection guide ([#579](https://github.com/opensearch-project/opensearch-py/pull/579))
- Added GHA release ([#614](https://github.com/opensearch-project/opensearch-py/pull/614))
### Changed
- Updated the `get_policy` API in the index_management plugin to allow the policy_id argument as optional ([#633](https://github.com/opensearch-project/opensearch-py/pull/633))
### Deprecated
### Removed
- Removed unnecessary `# -*- coding: utf-8 -*-` headers from .py files ([#615](https://github.com/opensearch-project/opensearch-py/pull/615), [#617](https://github.com/opensearch-project/opensearch-py/pull/617))
### Fixed
- Fix KeyError when scroll return no hits ([#616](https://github.com/opensearch-project/opensearch-py/pull/616))
- Fix reuse of `OpenSearch` using `Urllib3HttpConnection` and `AsyncOpenSearch` after calling `close` ([#639](https://github.com/opensearch-project/opensearch-py/pull/639))
### Security
### Dependencies
- Bumps `aiohttp` from >=3,<4 to >=3.9.0,<4 ([#634](https://github.com/opensearch-project/opensearch-py/pull/634))
- Bumps `pytest-asyncio` from <=0.21.1 to <=0.23.2
- Bumps `pytest-asyncio` from <=0.21.1 to <=0.23.3

## [2.4.2]
### Added
Expand Down
12 changes: 12 additions & 0 deletions benchmarks/bench_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


async def index_records(client: Any, index_name: str, item_count: int) -> None:
"""asynchronously bulk index item_count records into the index (index_name)"""
await asyncio.gather(
*[
client.index(
Expand All @@ -34,6 +35,10 @@ async def index_records(client: Any, index_name: str, item_count: int) -> None:


async def test_async(client_count: int = 1, item_count: int = 1) -> None:
"""
asynchronously index with item_count records and run client_count clients. This function can be used to
test balancing the number of items indexed with the number of documents.
"""
host = "localhost"
port = 9200
auth = ("admin", "admin")
Expand Down Expand Up @@ -74,6 +79,7 @@ async def test_async(client_count: int = 1, item_count: int = 1) -> None:


def test(item_count: int = 1, client_count: int = 1) -> None:
"""sets up and executes the asynchronous tests"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(test_async(item_count, client_count))
Expand All @@ -84,26 +90,32 @@ def test(item_count: int = 1, client_count: int = 1) -> None:


def test_1() -> None:
"""run a test for one item and 32*ITEM_COUNT clients"""
test(1, 32 * ITEM_COUNT)


def test_2() -> None:
"""run a test for two items and 16*ITEM_COUNT clients"""
test(2, 16 * ITEM_COUNT)


def test_4() -> None:
"""run a test for two items and 8*ITEM_COUNT clients"""
test(4, 8 * ITEM_COUNT)


def test_8() -> None:
"""run a test for four items and 4*ITEM_COUNT clients"""
test(8, 4 * ITEM_COUNT)


def test_16() -> None:
"""run a test for 16 items and 2*ITEM_COUNT clients"""
test(16, 2 * ITEM_COUNT)


def test_32() -> None:
"""run a test for 32 items and ITEM_COUNT clients"""
test(32, ITEM_COUNT)


Expand Down
7 changes: 7 additions & 0 deletions benchmarks/bench_info_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


def get_info(client: Any, request_count: int) -> float:
"""get info from client"""
tt: float = 0
for n in range(request_count):
start = time.time() * 1000
Expand All @@ -31,6 +32,7 @@ def get_info(client: Any, request_count: int) -> float:


def test(thread_count: int = 1, request_count: int = 1, client_count: int = 1) -> None:
"""test to index with thread_count threads, item_count records and run client_count clients"""
host = "localhost"
port = 9200
auth = ("admin", "admin")
Expand Down Expand Up @@ -79,22 +81,27 @@ def test(thread_count: int = 1, request_count: int = 1, client_count: int = 1) -


def test_1() -> None:
"""testing 1 threads"""
test(1, 32 * REQUEST_COUNT, 1)


def test_2() -> None:
"""testing 2 threads"""
test(2, 16 * REQUEST_COUNT, 2)


def test_4() -> None:
"""testing 4 threads"""
test(4, 8 * REQUEST_COUNT, 3)


def test_8() -> None:
"""testing 8 threads"""
test(8, 4 * REQUEST_COUNT, 8)


def test_32() -> None:
"""testing 32 threads"""
test(32, REQUEST_COUNT, 32)


Expand Down
7 changes: 7 additions & 0 deletions benchmarks/bench_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@


def index_records(client: Any, index_name: str, item_count: int) -> Any:
"""bulk index item_count records into index_name"""
tt = 0
for n in range(10):
data: Any = []
Expand All @@ -48,6 +49,7 @@ def index_records(client: Any, index_name: str, item_count: int) -> Any:


def test(thread_count: int = 1, item_count: int = 1, client_count: int = 1) -> None:
"""test to index with thread_count threads, item_count records and run client_count clients"""
host = "localhost"
port = 9200
auth = ("admin", "admin")
Expand Down Expand Up @@ -118,22 +120,27 @@ def test(thread_count: int = 1, item_count: int = 1, client_count: int = 1) -> N


def test_1() -> None:
"""testing 1 threads"""
test(1, 32 * ITEM_COUNT, 1)


def test_2() -> None:
"""testing 2 threads"""
test(2, 16 * ITEM_COUNT, 2)


def test_4() -> None:
"""testing 4 threads"""
test(4, 8 * ITEM_COUNT, 3)


def test_8() -> None:
"""testing 8 threads"""
test(8, 4 * ITEM_COUNT, 8)


def test_32() -> None:
"""testing 32 threads"""
test(32, ITEM_COUNT, 32)


Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ twine

# Requirements for testing [async] extra
aiohttp
pytest-asyncio<=0.23.2
pytest-asyncio<=0.23.3
unasync
1 change: 1 addition & 0 deletions opensearchpy/_async/http_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ async def close(self) -> Any:
"""
if self.session:
await self.session.close()
self.session = None

async def _create_aiohttp_session(self) -> Any:
"""Creates an aiohttp.ClientSession(). This is delayed until
Expand Down
6 changes: 2 additions & 4 deletions opensearchpy/_async/plugins/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@ async def add_policy(

@query_params()
async def get_policy(
self, policy: Any, params: Any = None, headers: Any = None
self, policy: Any = None, params: Any = None, headers: Any = None
) -> Any:
"""
Gets the policy by `policy_id`.
Gets the policy by `policy_id`; returns all policies if no policy_id is provided.
:arg policy: The name of the policy
"""
if policy in SKIP_IN_PATH:
raise ValueError("Empty value passed for a required argument 'policy'.")

return await self.transport.perform_request(
"GET",
Expand Down
1 change: 1 addition & 0 deletions opensearchpy/connection/http_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ async def close(self) -> Any:
"""
if self.session:
await self.session.close()
self.session = None

async def _create_aiohttp_session(self) -> Any:
"""Creates an aiohttp.ClientSession(). This is delayed until
Expand Down
14 changes: 12 additions & 2 deletions opensearchpy/connection/http_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,13 @@ def __init__(
if pool_maxsize and isinstance(pool_maxsize, int):
kw["maxsize"] = pool_maxsize

self.pool = pool_class(
self._urllib3_pool_factory = lambda: pool_class(
self.hostname, port=self.port, timeout=self.timeout, **kw
)
self._create_urllib3_pool()

def _create_urllib3_pool(self) -> None:
self.pool = self._urllib3_pool_factory() # type: ignore

def perform_request(
self,
Expand All @@ -228,6 +232,10 @@ def perform_request(
ignore: Collection[int] = (),
headers: Optional[Mapping[str, str]] = None,
) -> Any:
if self.pool is None:
self._create_urllib3_pool()
assert self.pool is not None

url = self.url_prefix + url
if params:
url = "%s?%s" % (url, urlencode(params))
Expand Down Expand Up @@ -305,4 +313,6 @@ def close(self) -> None:
"""
Explicitly closes connection
"""
self.pool.close()
if self.pool:
self.pool.close()
self.pool = None
8 changes: 4 additions & 4 deletions opensearchpy/plugins/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ def add_policy(
)

@query_params()
def get_policy(self, policy: Any, params: Any = None, headers: Any = None) -> Any:
def get_policy(
self, policy: Any = None, params: Any = None, headers: Any = None
) -> Any:
"""
Gets the policy by `policy_id`.
Gets the policy by `policy_id`; returns all policies if no policy_id is provided.
:arg policy: The name of the policy
"""
if policy in SKIP_IN_PATH:
raise ValueError("Empty value passed for a required argument 'policy'.")

return self.transport.perform_request(
"GET",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@


def main() -> None:
"""
demonstrates various functions to operate on the index (e.g. clear different levels of cache, refreshing the
index)
"""
# Set up
client = OpenSearch(
hosts=["https://localhost:9200"],
Expand Down
6 changes: 6 additions & 0 deletions samples/aws/search_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@


def main() -> None:
"""
connects to a cluster specified in environment variables, creates an index, inserts documents,
searches the index, deletes the document, deletes the index.
the environment variables are "ENDPOINT" for the cluster endpoint, AWS_REGION for the region in which the cluster
is hosted, and SERVICE to indicate if this is an ES 7.10.2 compatible cluster
"""
# verbose logging
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)

Expand Down
10 changes: 9 additions & 1 deletion samples/aws/search_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@


def main() -> None:
"""
1. connects to an OpenSearch cluster on AWS defined by environment variables (i.e. ENDPOINT - cluster endpoint like
my-test-domain.us-east-1.es.amazonaws.com; AWS_REGION like us-east-1, us-west-2; and SERVICE like es which
differentiates beteween serverless and the managed service.
2. creates an index called "movies" and adds a single document
3. queries for that document
4. deletes the document
5. deletes the index
"""
# verbose logging
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)

# cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com
url = urlparse(environ["ENDPOINT"])
region = environ.get("AWS_REGION", "us-east-1")
service = environ.get("SERVICE", "es")
Expand Down
1 change: 1 addition & 0 deletions samples/bulk/bulk_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


def main() -> None:
"""demonstrates how to bulk load data into an index"""
# connect to an instance of OpenSearch

host = os.getenv("HOST", default="localhost")
Expand Down
4 changes: 4 additions & 0 deletions samples/bulk/bulk_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@


def main() -> None:
"""
demonstrates how to bulk load data using opensearchpy.helpers including examples of serial, parallel, and streaming
bulk load
"""
# connect to an instance of OpenSearch

host = os.getenv("HOST", default="localhost")
Expand Down
3 changes: 3 additions & 0 deletions samples/bulk/bulk_ld.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@


def main() -> None:
"""
bulk index 100 items and then delete the index
"""
# connect to an instance of OpenSearch

host = os.getenv("HOST", default="localhost")
Expand Down
3 changes: 3 additions & 0 deletions samples/document_lifecycle/document_lifecycle_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@


def main() -> None:
"""
provides samples for different ways to handle documents including indexing, searching, updating, and deleting
"""
# Connect to OpenSearch
client = OpenSearch(
hosts=["https://localhost:9200"],
Expand Down
4 changes: 4 additions & 0 deletions samples/hello/hello.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@


def main() -> None:
"""
an example showing how to create an synchronous connection to OpenSearch, create an index, index a document
and search to return the document
"""
host = "localhost"
port = 9200
auth = ("admin", "admin") # For testing only. Don't store credentials in code.
Expand Down
4 changes: 4 additions & 0 deletions samples/hello/hello_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@


async def main() -> None:
"""
an example showing how to create an asynchronous connection to OpenSearch, create an index, index a document
and search to return the document
"""
# connect to OpenSearch
host = "localhost"
port = 9200
Expand Down
Loading

0 comments on commit 9d06377

Please sign in to comment.