Skip to content

Commit

Permalink
Added a sample that uses a bulk function generator.
Browse files Browse the repository at this point in the history
Signed-off-by: dblock <[email protected]>
  • Loading branch information
dblock committed Nov 17, 2023
1 parent b3da091 commit 331c684
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
30 changes: 30 additions & 0 deletions guides/bulk.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
- [Bulk Indexing](#bulk-indexing)
- [Line-Delimited JSON](#line-delimited-json)
- [Bulk Helper](#bulk-helper)
- [Parallel Bulk](#parallel-bulk)
- [Data Generator](#data-generator)

# Bulk Indexing

Expand Down Expand Up @@ -71,6 +73,8 @@ response = helpers.bulk(client, docs, max_retries=3)
print(response)
```

## Parallel Bulk

Bulk helpers support `parallel_bulk` which has options to turn off exceptions, chunk size, etc.

```python
Expand All @@ -97,3 +101,29 @@ if len(failed) > 0:
if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items.")
```

## Data Generator

Use a data generator function with bulk helpers instead of building arrays.

```python
def _generate_data():
for i in range(100):
yield {"_index": index_name, "_id": i, "value": i}

succeeded = []
failed = []
for success, item in helpers.parallel_bulk(client, actions=_generate_data()):
if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")
```
28 changes: 26 additions & 2 deletions samples/bulk/bulk-helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@


import os
from typing import Any

from opensearchpy import OpenSearch, helpers

Expand Down Expand Up @@ -51,7 +52,7 @@

# serialized bulk raising an exception on error
rc = helpers.bulk(client, data)
print(f"Bulk-inserted {rc[0]} items.")
print(f"Bulk-inserted {rc[0]} items (bulk).")

# parallel bulk with explicit error checking
succeeded = []
Expand All @@ -76,7 +77,30 @@
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items.")
print(f"Bulk-inserted {len(succeeded)} items (parallel_bulk).")


# streaming bulk with a data generator
def _generate_data() -> Any:
for i in range(100):
yield {"_index": index_name, "_id": i, "value": i}


succeeded = []
failed = []
for success, item in helpers.streaming_bulk(client, actions=_generate_data()):
if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")

# delete index
client.indices.delete(index=index_name)

0 comments on commit 331c684

Please sign in to comment.