Skip to content

Commit

Permalink
Fix: TypeError on calling parallel_bulk. (opensearch-project#601)
Browse files Browse the repository at this point in the history
* Fix: TypeError on calling parallel_bulk.

Signed-off-by: dblock <[email protected]>

* Added a sample that uses a bulk function generator.

Signed-off-by: dblock <[email protected]>

---------

Signed-off-by: dblock <[email protected]>
Signed-off-by: roma2023 <[email protected]>
  • Loading branch information
dblock authored and roma2023 committed Dec 28, 2023
1 parent ad036ac commit a1ed1ab
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Deprecated
### Removed
### Fixed
- Fix `TypeError` on `parallel_bulk` ([#601](https://github.com/opensearch-project/opensearch-py/pull/601))
### Security

## [2.4.1]
Expand Down
58 changes: 58 additions & 0 deletions guides/bulk.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
- [Bulk Indexing](#bulk-indexing)
- [Line-Delimited JSON](#line-delimited-json)
- [Bulk Helper](#bulk-helper)
- [Parallel Bulk](#parallel-bulk)
- [Data Generator](#data-generator)

# Bulk Indexing

Expand Down Expand Up @@ -46,6 +48,8 @@ data = [
response = client.bulk(data)
if response["errors"]:
print(f"There were errors!")
for item in response["items"]:
print(f"{item['index']['status']}: {item['index']['error']['type']}")
else:
print(f"Bulk-inserted {len(rc['items'])} items.")
```
Expand All @@ -69,3 +73,57 @@ response = helpers.bulk(client, docs, max_retries=3)
print(response)
```

## Parallel Bulk

Bulk helpers support `parallel_bulk` which has options to turn off exceptions, chunk size, etc.

```python
succeeded = []
failed = []
for success, item in helpers.parallel_bulk(client,
actions=data,
chunk_size=10,
raise_on_error=False,
raise_on_exception=False,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=60):

if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(f"{item['index']['error']}: {item['index']['exception']}")

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items.")
```

## Data Generator

Use a data generator function with bulk helpers instead of building arrays.

```python
def _generate_data():
for i in range(100):
yield {"_index": index_name, "_id": i, "value": i}

succeeded = []
failed = []
for success, item in helpers.parallel_bulk(client, actions=_generate_data()):
if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")
```
11 changes: 10 additions & 1 deletion opensearchpy/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ def parallel_bulk(
max_chunk_bytes: int = 100 * 1024 * 1024,
queue_size: int = 4,
expand_action_callback: Any = expand_action,
raise_on_exception: bool = True,
raise_on_error: bool = True,
ignore_status: Any = (),
*args: Any,
**kwargs: Any
Expand Down Expand Up @@ -485,7 +487,14 @@ def _setup_queues(self) -> None:
for result in pool.imap(
lambda bulk_chunk: list(
_process_bulk_chunk(
client, bulk_chunk[1], bulk_chunk[0], ignore_status, *args, **kwargs
client,
bulk_chunk[1],
bulk_chunk[0],
raise_on_exception,
raise_on_error,
ignore_status,
*args,
**kwargs
)
),
_chunk_actions(
Expand Down
52 changes: 51 additions & 1 deletion samples/bulk/bulk-helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@


import os
from typing import Any

from opensearchpy import OpenSearch, helpers

Expand Down Expand Up @@ -49,8 +50,57 @@
for i in range(100):
data.append({"_index": index_name, "_id": i, "value": i})

# serialized bulk raising an exception on error
rc = helpers.bulk(client, data)
print(f"Bulk-inserted {rc[0]} items.")
print(f"Bulk-inserted {rc[0]} items (bulk).")

# parallel bulk with explicit error checking
succeeded = []
failed = []
for success, item in helpers.parallel_bulk(
client,
actions=data,
chunk_size=10,
raise_on_error=False,
raise_on_exception=False,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=60,
):
if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (parallel_bulk).")


# streaming bulk with a data generator
def _generate_data() -> Any:
for i in range(100):
yield {"_index": index_name, "_id": i, "value": i}


succeeded = []
failed = []
for success, item in helpers.streaming_bulk(client, actions=_generate_data()):
if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")

# delete index
client.indices.delete(index=index_name)
53 changes: 52 additions & 1 deletion test_opensearchpy/test_helpers/test_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,62 @@ def test_all_chunks_sent(self, _process_bulk_chunk: Any) -> None:

self.assertEqual(50, mock_process_bulk_chunk.call_count) # type: ignore

@mock.patch("opensearchpy.OpenSearch.bulk")
def test_with_all_options(self, _bulk: Any) -> None:
actions = ({"x": i} for i in range(100))
list(
helpers.parallel_bulk(
OpenSearch(),
actions=actions,
chunk_size=2,
raise_on_error=False,
raise_on_exception=False,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=160,
ignore_status=(123),
)
)

self.assertEqual(50, _bulk.call_count)
_bulk.assert_called_with(
'{"index":{}}\n{"x":98}\n{"index":{}}\n{"x":99}\n', request_timeout=160
)

@mock.patch("opensearchpy.helpers.actions._process_bulk_chunk")
def test_process_bulk_chunk_with_all_options(
self, _process_bulk_chunk: Any
) -> None:
actions = ({"x": i} for i in range(100))
client = OpenSearch()
list(
helpers.parallel_bulk(
client,
actions=actions,
chunk_size=2,
raise_on_error=True,
raise_on_exception=True,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=160,
ignore_status=(123),
)
)

self.assertEqual(50, _process_bulk_chunk.call_count)
_process_bulk_chunk.assert_called_with(
client,
['{"index":{}}', '{"x":98}', '{"index":{}}', '{"x":99}'],
[({"index": {}}, {"x": 98}), ({"index": {}}, {"x": 99})],
True,
True,
123,
request_timeout=160,
)

@pytest.mark.skip # type: ignore
@mock.patch(
"opensearchpy.helpers.actions._process_bulk_chunk",
# make sure we spend some time in the thread
side_effect=lambda *a: [
side_effect=lambda *args, **kwargs: [
(True, time.sleep(0.001) or threading.current_thread().ident) # type: ignore
],
)
Expand Down

0 comments on commit a1ed1ab

Please sign in to comment.