Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: TypeError on calling parallel_bulk. #601

Merged
merged 2 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Deprecated
### Removed
### Fixed
- Fix `TypeError` on `parallel_bulk` ([#601](https://github.com/opensearch-project/opensearch-py/pull/601))
### Security

## [2.4.1]
Expand Down
58 changes: 58 additions & 0 deletions guides/bulk.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
- [Bulk Indexing](#bulk-indexing)
- [Line-Delimited JSON](#line-delimited-json)
- [Bulk Helper](#bulk-helper)
- [Parallel Bulk](#parallel-bulk)
- [Data Generator](#data-generator)

# Bulk Indexing

Expand Down Expand Up @@ -46,6 +48,8 @@ data = [
response = client.bulk(data)
if response["errors"]:
print(f"There were errors!")
for item in response["items"]:
print(f"{item['index']['status']}: {item['index']['error']['type']}")
else:
print(f"Bulk-inserted {len(rc['items'])} items.")
```
Expand All @@ -69,3 +73,57 @@ response = helpers.bulk(client, docs, max_retries=3)
print(response)
```

## Parallel Bulk

Bulk helpers support `parallel_bulk` which has options to turn off exceptions, chunk size, etc.

```python
succeeded = []
failed = []
for success, item in helpers.parallel_bulk(client,
actions=data,
chunk_size=10,
raise_on_error=False,
raise_on_exception=False,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=60):

if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(f"{item['index']['error']}: {item['index']['exception']}")

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items.")
```

## Data Generator

Use a data generator function with bulk helpers instead of building arrays.

```python
def _generate_data():
for i in range(100):
yield {"_index": index_name, "_id": i, "value": i}

succeeded = []
failed = []
for success, item in helpers.parallel_bulk(client, actions=_generate_data()):
if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")
```
11 changes: 10 additions & 1 deletion opensearchpy/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ def parallel_bulk(
max_chunk_bytes: int = 100 * 1024 * 1024,
queue_size: int = 4,
expand_action_callback: Any = expand_action,
raise_on_exception: bool = True,
raise_on_error: bool = True,
ignore_status: Any = (),
*args: Any,
**kwargs: Any
Expand Down Expand Up @@ -485,7 +487,14 @@ def _setup_queues(self) -> None:
for result in pool.imap(
lambda bulk_chunk: list(
_process_bulk_chunk(
client, bulk_chunk[1], bulk_chunk[0], ignore_status, *args, **kwargs
client,
bulk_chunk[1],
bulk_chunk[0],
raise_on_exception,
raise_on_error,
ignore_status,
*args,
**kwargs
)
),
_chunk_actions(
Expand Down
52 changes: 51 additions & 1 deletion samples/bulk/bulk-helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@


import os
from typing import Any

from opensearchpy import OpenSearch, helpers

Expand Down Expand Up @@ -49,8 +50,57 @@
for i in range(100):
data.append({"_index": index_name, "_id": i, "value": i})

# serialized bulk raising an exception on error
rc = helpers.bulk(client, data)
print(f"Bulk-inserted {rc[0]} items.")
print(f"Bulk-inserted {rc[0]} items (bulk).")

# parallel bulk with explicit error checking
succeeded = []
failed = []
for success, item in helpers.parallel_bulk(
client,
actions=data,
chunk_size=10,
raise_on_error=False,
raise_on_exception=False,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=60,
):
if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (parallel_bulk).")


# streaming bulk with a data generator
def _generate_data() -> Any:
for i in range(100):
yield {"_index": index_name, "_id": i, "value": i}


succeeded = []
failed = []
for success, item in helpers.streaming_bulk(client, actions=_generate_data()):
if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")

# delete index
client.indices.delete(index=index_name)
53 changes: 52 additions & 1 deletion test_opensearchpy/test_helpers/test_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,62 @@ def test_all_chunks_sent(self, _process_bulk_chunk: Any) -> None:

self.assertEqual(50, mock_process_bulk_chunk.call_count) # type: ignore

@mock.patch("opensearchpy.OpenSearch.bulk")
def test_with_all_options(self, _bulk: Any) -> None:
actions = ({"x": i} for i in range(100))
list(
helpers.parallel_bulk(
OpenSearch(),
actions=actions,
chunk_size=2,
raise_on_error=False,
raise_on_exception=False,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=160,
ignore_status=(123),
)
)

self.assertEqual(50, _bulk.call_count)
_bulk.assert_called_with(
'{"index":{}}\n{"x":98}\n{"index":{}}\n{"x":99}\n', request_timeout=160
)

@mock.patch("opensearchpy.helpers.actions._process_bulk_chunk")
def test_process_bulk_chunk_with_all_options(
self, _process_bulk_chunk: Any
) -> None:
actions = ({"x": i} for i in range(100))
client = OpenSearch()
list(
helpers.parallel_bulk(
client,
actions=actions,
chunk_size=2,
raise_on_error=True,
raise_on_exception=True,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=160,
ignore_status=(123),
)
)

self.assertEqual(50, _process_bulk_chunk.call_count)
_process_bulk_chunk.assert_called_with(
client,
['{"index":{}}', '{"x":98}', '{"index":{}}', '{"x":99}'],
[({"index": {}}, {"x": 98}), ({"index": {}}, {"x": 99})],
True,
True,
123,
request_timeout=160,
)

@pytest.mark.skip # type: ignore
@mock.patch(
"opensearchpy.helpers.actions._process_bulk_chunk",
# make sure we spend some time in the thread
side_effect=lambda *a: [
side_effect=lambda *args, **kwargs: [
(True, time.sleep(0.001) or threading.current_thread().ident) # type: ignore
],
)
Expand Down
Loading