Skip to content

Commit

Permalink
enhance: Enable bulkwriter to support import v2 (#2295) (#2296)
Browse files Browse the repository at this point in the history
pr: #2295

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Oct 12, 2024
1 parent 663405d commit 09acaee
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 39 deletions.
67 changes: 43 additions & 24 deletions examples/example_bulkwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import tensorflow as tf

import logging

from typing import List

logging.basicConfig(level=logging.INFO)

from pymilvus import (
Expand Down Expand Up @@ -273,7 +276,7 @@ def _append_row(writer: LocalBulkWriter, begin: int, end: int):
print("Data is correct")


def all_types_writer(schema: CollectionSchema, file_type: BulkFileType)->list:
def all_types_writer(schema: CollectionSchema, file_type: BulkFileType)-> List[List[str]]:
print(f"\n===================== all field types ({file_type.name}) ====================")
with RemoteBulkWriter(
schema=schema,
Expand Down Expand Up @@ -347,31 +350,47 @@ def all_types_writer(schema: CollectionSchema, file_type: BulkFileType)->list:
return remote_writer.batch_files


def call_bulkinsert(schema: CollectionSchema, batch_files: list):
print(f"\n===================== call bulkinsert ====================")
def call_bulkinsert(schema: CollectionSchema, batch_files: List[List[str]]):
if utility.has_collection(ALL_TYPES_COLLECTION_NAME):
utility.drop_collection(ALL_TYPES_COLLECTION_NAME)

collection = Collection(name=ALL_TYPES_COLLECTION_NAME, schema=schema)
print(f"Collection '{collection.name}' created")

task_ids = []
for files in batch_files:
task_id = utility.do_bulk_insert(collection_name=ALL_TYPES_COLLECTION_NAME, files=files)
task_ids.append(task_id)
print(f"Create a bulkinert task, task id: {task_id}")
url = f"http://{HOST}:{PORT}"

while len(task_ids) > 0:
print("Wait 1 second to check bulkinsert tasks state...")
print(f"\n===================== import files to milvus ====================")
resp = bulk_import(
url=url,
collection_name=ALL_TYPES_COLLECTION_NAME,
files=batch_files,
)
print(resp.json())
job_id = resp.json()['data']['jobId']
print(f"Create a bulkinsert job, job id: {job_id}")

while True:
print("Wait 1 second to check bulkinsert job state...")
time.sleep(1)
for id in task_ids:
state = utility.get_bulk_insert_state(task_id=id)
if state.state == BulkInsertState.ImportFailed or state.state == BulkInsertState.ImportFailedAndCleaned:
print(f"The task {state.task_id} failed, reason: {state.failed_reason}")
task_ids.remove(id)
elif state.state == BulkInsertState.ImportCompleted:
print(f"The task {state.task_id} completed")
task_ids.remove(id)

print(f"\n===================== get import job progress ====================")
resp = get_import_progress(
url=url,
job_id=job_id,
)

state = resp.json()['data']['state']
progress = resp.json()['data']['progress']
if state == "Importing":
print(f"The job {job_id} is importing... {progress}%")
continue
if state == "Failed":
reason = resp.json()['data']['reason']
print(f"The job {job_id} failed, reason: {reason}")
break
if state == "Completed" and progress == 100:
print(f"The job {job_id} completed")
break

print(f"Collection row number: {collection.num_entities}")

Expand Down Expand Up @@ -427,31 +446,31 @@ def cloud_bulkinsert():
object_url_secret_key = "_your_object_storage_service_secret_key_"
resp = bulk_import(
url=url,
api_key=api_key,
collection_name=collection_name,
partition_name=partition_name,
object_url=object_url,
cluster_id=cluster_id,
api_key=api_key,
access_key=object_url_access_key,
secret_key=object_url_secret_key,
cluster_id=cluster_id,
collection_name=collection_name,
partition_name=partition_name,
)
print(resp.json())

print(f"\n===================== get import job progress ====================")
job_id = resp.json()['data']['jobId']
resp = get_import_progress(
url=url,
api_key=api_key,
job_id=job_id,
cluster_id=cluster_id,
api_key=api_key,
)
print(resp.json())

print(f"\n===================== list import jobs ====================")
resp = list_import_jobs(
url=url,
api_key=api_key,
cluster_id=cluster_id,
api_key=api_key,
page_size=10,
current_page=1,
)
Expand Down
48 changes: 33 additions & 15 deletions pymilvus/bulk_writer/bulk_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import json
import logging
from typing import List, Optional

import requests

Expand Down Expand Up @@ -77,36 +78,43 @@ def _get_request(
## bulkinsert RESTful api wrapper
def bulk_import(
url: str,
api_key: str,
object_url: str,
access_key: str,
secret_key: str,
cluster_id: str,
collection_name: str,
files: Optional[List[List[str]]] = None,
object_url: str = "",
cluster_id: str = "",
api_key: str = "",
access_key: str = "",
secret_key: str = "",
**kwargs,
) -> requests.Response:
"""call bulkinsert restful interface to import files
Args:
url (str): url of the server
object_url (str): data files url
access_key (str): access key to access the object storage
secret_key (str): secret key to access the object storage
cluster_id (str): id of a milvus instance(for cloud)
collection_name (str): name of the target collection
partition_name (str): name of the target partition
files (list of list of str): The files that contain the data to import.
A sub-list contains a single JSON or Parquet file, or a set of Numpy files.
object_url (str): The URL of the object to import.
This URL should be accessible to the S3-compatible
object storage service, such as AWS S3, GCS, Azure blob storage.
cluster_id (str): id of a milvus instance(for cloud)
api_key (str): API key to authenticate your requests.
access_key (str): access key to access the object storage
secret_key (str): secret key to access the object storage
Returns:
json: response of the restful interface
response of the restful interface
"""
request_url = url + "/v2/vectordb/jobs/import/create"

partition_name = kwargs.pop("partition_name", "")
params = {
"clusterId": cluster_id,
"collectionName": collection_name,
"partitionName": partition_name,
"files": files,
"objectUrl": object_url,
"clusterId": cluster_id,
"accessKey": access_key,
"secretKey": secret_key,
}
Expand All @@ -117,17 +125,18 @@ def bulk_import(


def get_import_progress(
url: str, api_key: str, job_id: str, cluster_id: str, **kwargs
url: str, job_id: str, cluster_id: str = "", api_key: str = "", **kwargs
) -> requests.Response:
"""get job progress
Args:
url (str): url of the server
job_id (str): a job id
cluster_id (str): id of a milvus instance(for cloud)
api_key (str): API key to authenticate your requests.
Returns:
json: response of the restful interface
response of the restful interface
"""
request_url = url + "/v2/vectordb/jobs/import/describe"

Expand All @@ -142,22 +151,31 @@ def get_import_progress(


def list_import_jobs(
url: str, api_key: str, cluster_id: str, page_size: int, current_page: int, **kwargs
url: str,
collection_name: str = "",
cluster_id: str = "",
api_key: str = "",
page_size: int = 10,
current_page: int = 1,
**kwargs,
) -> requests.Response:
"""list jobs in a cluster
Args:
url (str): url of the server
collection_name (str): name of the target collection
cluster_id (str): id of a milvus instance(for cloud)
api_key (str): API key to authenticate your requests.
page_size (int): pagination size
current_page (int): pagination
Returns:
json: response of the restful interface
response of the restful interface
"""
request_url = url + "/v2/vectordb/jobs/import/list"

params = {
"collectionName": collection_name,
"clusterId": cluster_id,
"pageSize": page_size,
"currentPage": current_page,
Expand Down

0 comments on commit 09acaee

Please sign in to comment.