From edfe31d00561c758503f66a8ccc3bcbf995d3c58 Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Thu, 31 Oct 2024 06:55:49 -0400 Subject: [PATCH 1/8] Implement concurrent.futures interface for PineconeGrpcFuture --- pinecone/grpc/future.py | 84 ++++-- tests/unit_grpc/test_futures.py | 494 ++++++++++++++++++++++++++++++++ 2 files changed, 557 insertions(+), 21 deletions(-) create mode 100644 tests/unit_grpc/test_futures.py diff --git a/pinecone/grpc/future.py b/pinecone/grpc/future.py index a4ea56b1..5e3b9d39 100644 --- a/pinecone/grpc/future.py +++ b/pinecone/grpc/future.py @@ -1,34 +1,76 @@ -from grpc._channel import _MultiThreadedRendezvous +from concurrent.futures import Future as ConcurrentFuture +from typing import Optional +from grpc import Future as GrpcFuture, RpcError from pinecone.exceptions.exceptions import PineconeException -class PineconeGrpcFuture: - def __init__(self, delegate): - self._delegate = delegate +class PineconeGrpcFuture(ConcurrentFuture): + def __init__(self, grpc_future: GrpcFuture, timeout: Optional[int] = 10): + super().__init__() + self._grpc_future = grpc_future + self.default_timeout = timeout # seconds - def cancel(self): - return self._delegate.cancel() + # Add callback to subscribe to updates from the gRPC future + self._grpc_future.add_done_callback(self._sync_state) + + # Sync initial state, in case the gRPC future is already done + self._sync_state(self._grpc_future) + + def _sync_state(self, grpc_future): + # Sync the gRPC future completion to the wrapper future + if self.done(): + # Future already done, nothing to do + return - def cancelled(self): - return self._delegate.cancelled() + if grpc_future.cancelled(): + self.cancel() + elif grpc_future.exception(timeout=self.default_timeout): + self.set_exception(grpc_future.exception()) + elif grpc_future.done(): + try: + result = grpc_future.result(timeout=self.default_timeout) + self.set_result(result) + except Exception as e: + self.set_exception(e) + elif grpc_future.running(): + self.set_running_or_notify_cancel() - def running(self): - return self._delegate.running() + def cancel(self): + self._grpc_future.cancel() + return super().cancel() - def done(self): - return self._delegate.done() + def exception(self, timeout=None): + exception = super().exception(timeout=self._timeout(timeout)) + if isinstance(exception, RpcError): + return self._wrap_rpc_exception(exception) + return exception - def add_done_callback(self, fun): - return self._delegate.add_done_callback(fun) + def traceback(self, timeout=None): + # This is not part of the ConcurrentFuture interface, but keeping it for + # backward compatibility + return self._grpc_future.traceback(timeout=self._timeout(timeout)) def result(self, timeout=None): try: - return self._delegate.result(timeout=timeout) - except _MultiThreadedRendezvous as e: - raise PineconeException(e._state.debug_error_string) from e + return super().result(timeout=self._timeout(timeout)) + except RpcError as e: + raise self._wrap_rpc_exception(e) from e - def exception(self, timeout=None): - return self._delegate.exception(timeout=timeout) + def _timeout(self, timeout: Optional[int]) -> int: + return timeout if timeout is not None else self.default_timeout - def traceback(self, timeout=None): - return self._delegate.traceback(timeout=timeout) + def _wrap_rpc_exception(self, e): + if e._state and e._state.debug_error_string: + return PineconeException(e._state.debug_error_string) + else: + return PineconeException("Unknown GRPC error") + + # def __repr__(self): + # return super().__repr__() + + # def __str__(self) -> str: + # return super().__repr__() + + def __del__(self): + self._grpc_future.cancel() + self = None # release the reference to the grpc future diff --git a/tests/unit_grpc/test_futures.py b/tests/unit_grpc/test_futures.py new file mode 100644 index 00000000..593c4c79 --- /dev/null +++ b/tests/unit_grpc/test_futures.py @@ -0,0 +1,494 @@ +import pytest +from pinecone.grpc.future import PineconeGrpcFuture +from pinecone.exceptions import PineconeException + +import grpc +from concurrent.futures import CancelledError, TimeoutError + + +def mock_grpc_future( + mocker, done=False, cancelled=False, exception=None, running=False, result=None +): + grpc_future = mocker.MagicMock() + grpc_future.cancelled.return_value = cancelled + grpc_future.done.return_value = done + grpc_future.exception.return_value = exception + grpc_future.running.return_value = running + grpc_future.result.return_value = result + return grpc_future + + +class FakeGrpcError(grpc.RpcError): + def __init__(self, mocker): + self._state = mocker.Mock() + self._state.debug_error_string = "Test gRPC error" + + +class TestPineconeGrpcFuture: + def test_wraps_grpc_future_already_done(self, mocker): + grpc_future = mock_grpc_future(mocker, done=True, result="final result") + + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future._state == "FINISHED" + assert future.done() + assert future.result() == "final result" + + def test_wraps_grpc_already_failed(self, mocker): + grpc_future = mock_grpc_future( + mocker, done=True, exception=Exception("Simulated gRPC error") + ) + + future = PineconeGrpcFuture(grpc_future) + + assert future._state == "FINISHED" + assert future.done() + with pytest.raises(Exception, match="Simulated gRPC error"): + future.result() + + def test_wraps_grpc_future_already_running(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert future.running() + assert future._state == "RUNNING" + + def test_wraps_grpc_future_already_cancelled(self, mocker): + grpc_future = mock_grpc_future(mocker, cancelled=True) + future = PineconeGrpcFuture(grpc_future) + + assert future.cancelled() + assert future._state == "CANCELLED" + assert future.done() + with pytest.raises(CancelledError): + future.result() + + def test_wraps_grpc_future_cancel_pending(self, mocker): + grpc_future = mock_grpc_future(mocker) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future._state == "PENDING" + assert future.cancel() + assert future._state == "CANCELLED" + + assert future.cancelled() + assert not future.running() + + # Also cancel the grpc future + grpc_future.cancel.assert_called_once() + + with pytest.raises(CancelledError): + future.result() + + def test_cancel_already_cancelled(self, mocker): + grpc_future = mock_grpc_future(mocker, cancelled=True, done=True) + future = PineconeGrpcFuture(grpc_future) + + assert future.cancelled() + assert future._state == "CANCELLED" + + # Cancel returns True even if the future is already cancelled + assert future.cancel() + + def test_cancel_already_done(self, mocker): + grpc_future = mock_grpc_future(mocker, done=True, result="final result") + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future._state == "FINISHED" + + # Can't cancel a future that is already done + assert future.cancel() is False + + assert future.result() == "final result" + + def test_cancel_already_failed(self, mocker): + grpc_future = mock_grpc_future( + mocker, done=True, exception=Exception("Simulated gRPC error") + ) + future = PineconeGrpcFuture(grpc_future) + + assert future._state == "FINISHED" + assert future.done() + + # Can't cancel a future that is already done + assert future.cancel() is False + + with pytest.raises(Exception, match="Simulated gRPC error"): + future.result() + + def test_cancel_already_running(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert future.running() + assert future._state == "RUNNING" + + # Can't cancel a future that is already running + assert future.cancel() is False + + assert future.running() + assert not future.done() + assert not future.cancelled() + assert future._state == "RUNNING" + + def test_cancel_pending(self, mocker): + grpc_future = mock_grpc_future(mocker) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert not future.running() + assert future._state == "PENDING" + + # Cancel the future + assert future.cancel() + + assert future.cancelled() + assert future.done() + assert not future.running() + assert future._state == "CANCELLED" + + # Marks underlying grpc future as cancelled + grpc_future.cancel.assert_called_once() + + def test_result_success(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert future.running() + assert future._state == "RUNNING" + + # Update the state of the grpc future + grpc_future.done.return_value = True + grpc_future.running.return_value = False + grpc_future.result.return_value = "final result" + + # Trigger the done callback to update the state of the wrapper future + future._sync_state(grpc_future) + + assert future.result() == "final result" + assert future.done() + assert not future.cancelled() + assert not future.running() + + def test_result_exception(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert future.running() + assert future._state == "RUNNING" + + # Update the state of the grpc future + grpc_future.done.return_value = True + grpc_future.running.return_value = False + grpc_future.result.side_effect = Exception("Simulated gRPC error") + + # Trigger the done callback to update the state of the wrapper future + future._sync_state(grpc_future) + + with pytest.raises(Exception, match="Simulated gRPC error"): + future.result() + + assert future.done() + assert not future.cancelled() + assert not future.running() + + def test_result_already_successful(self, mocker): + grpc_future = mock_grpc_future(mocker, done=True, result="final result") + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future.done() + assert not future.running() + assert future._state == "FINISHED" + + assert future.result() == "final result" + + def test_result_already_failed(self, mocker): + grpc_future = mock_grpc_future( + mocker, done=True, exception=Exception("Simulated gRPC error") + ) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future.done() + assert not future.running() + assert future._state == "FINISHED" + + with pytest.raises(Exception, match="Simulated gRPC error"): + future.result() + + def test_result_already_cancelled(self, mocker): + grpc_future = mock_grpc_future(mocker, cancelled=True, done=True) + future = PineconeGrpcFuture(grpc_future) + + assert future.cancelled() + assert future.done() + assert not future.running() + assert future._state == "CANCELLED" + + with pytest.raises(CancelledError): + future.result() + + def test_result_timeout_running(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert future.running() + assert future._state == "RUNNING" + + with pytest.raises(TimeoutError): + future.result(timeout=1) + + def test_result_timeout_pending(self, mocker): + grpc_future = mock_grpc_future(mocker) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert not future.running() + assert future._state == "PENDING" + + with pytest.raises(TimeoutError): + future.result(timeout=1) + + def test_result_default_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker) + future = PineconeGrpcFuture(grpc_future, timeout=1) + + assert not future.cancelled() + assert not future.done() + assert not future.running() + assert future._state == "PENDING" + + with pytest.raises(TimeoutError): + future.result() + + assert not future.cancelled() + assert not future.done() + assert not future.running() + assert future._state == "PENDING" + + def test_result_catch_grpc_exceptions(self, mocker): + grpc_future = mock_grpc_future(mocker) + grpc_future.result.side_effect = FakeGrpcError(mocker) + + future = PineconeGrpcFuture(grpc_future) + + grpc_future.done.return_value = True + future._sync_state(grpc_future) + + assert not future.cancelled() + assert not future.running() + assert future.done() + assert future._state == "FINISHED" + + with pytest.raises(PineconeException, match="Test gRPC error"): + future.result() + + assert isinstance(future.exception(), PineconeException) + + assert not future.cancelled() + assert not future.running() + assert future.done() + assert future._state == "FINISHED" + + def test_exception_when_done_maps_grpc_exception(self, mocker): + grpc_future = mock_grpc_future(mocker, done=True) + grpc_future.exception.return_value = FakeGrpcError(mocker) + + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.running() + assert future.done() + assert future._state == "FINISHED" + + assert isinstance(future.exception(), PineconeException) + + def test_exception_when_done_no_exceptions(self, mocker): + grpc_future = mock_grpc_future(mocker, done=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.running() + assert future.done() + assert future._state == "FINISHED" + + assert future.exception() is None + + def test_exception_when_running_default_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future, timeout=1) + + assert not future.cancelled() + assert future.running() + assert not future.done() + assert future._state == "RUNNING" + + with pytest.raises(TimeoutError): + future.exception() + + def test_exception_when_running_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future.running() + assert not future.done() + assert future._state == "RUNNING" + + with pytest.raises(TimeoutError): + future.exception(timeout=1) + + def test_exception_when_pending_default_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker) + future = PineconeGrpcFuture(grpc_future, timeout=1) + + assert not future.cancelled() + assert not future.running() + assert not future.done() + assert future._state == "PENDING" + + with pytest.raises(TimeoutError): + future.exception() + + def test_exception_when_pending_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker) + + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.running() + assert not future.done() + assert future._state == "PENDING" + + with pytest.raises(TimeoutError): + future.exception(timeout=1) + + def test_concurrent_futures_as_completed(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + + future = PineconeGrpcFuture(grpc_future, timeout=1) + + # Trigger the done callback + grpc_future.done.return_value = True + grpc_future.result.return_value = "success" + future._sync_state(grpc_future) + + from concurrent.futures import as_completed + + for future in as_completed([future], timeout=1): + assert future.result() == "success" + assert future.done() + assert not future.cancelled() + + def test_concurrent_futures_as_completed_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future1 = PineconeGrpcFuture(grpc_future, timeout=3) + + grpc_future2 = mock_grpc_future(mocker, done=True, result="success") + future2 = PineconeGrpcFuture(grpc_future2, timeout=3) + + grpc_future3 = mock_grpc_future(mocker, done=True, cancelled=True) + future3 = PineconeGrpcFuture(grpc_future3, timeout=3) + + from concurrent.futures import as_completed + + completed_count = 0 + with pytest.raises(TimeoutError): + for f in as_completed([future1, future2, future3], timeout=1): + completed_count += 1 + + assert completed_count == 1 + + def test_concurrent_futures_wait_first_completed(self, mocker): + grpc_future1 = mock_grpc_future(mocker, done=True, result="success") + future1 = PineconeGrpcFuture(grpc_future1) + + grpc_future2 = mock_grpc_future(mocker, running=True) + future2 = PineconeGrpcFuture(grpc_future2) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, not_done = wait([future1, future2], timeout=1, return_when=FIRST_COMPLETED) + assert len(done) == 1 + assert len(not_done) == 1 + assert done.pop().result() == "success" + + # order should not matter + done, not_done = wait([future2, future1], timeout=1, return_when=FIRST_COMPLETED) + assert len(done) == 1 + assert len(not_done) == 1 + assert done.pop().result() == "success" + + def test_concurrent_futures_wait_all_completed(self, mocker): + grpc_future1 = mock_grpc_future(mocker, done=True, result="success") + future1 = PineconeGrpcFuture(grpc_future1) + + grpc_future2 = mock_grpc_future(mocker, done=True, result="success") + future2 = PineconeGrpcFuture(grpc_future2) + + from concurrent.futures import wait, ALL_COMPLETED + + done, not_done = wait([future1, future2], timeout=3, return_when=ALL_COMPLETED) + assert len(done) == 2 + assert len(not_done) == 0 + assert all(f.result() == "success" for f in done) + + def test_concurrent_futures_wait_first_exception(self, mocker): + grpc_future1 = mock_grpc_future(mocker) + future1 = PineconeGrpcFuture(grpc_future1) + + grpc_future2 = mock_grpc_future(mocker, done=True) + grpc_future2.exception.return_value = Exception("Simulated gRPC error") + future2 = PineconeGrpcFuture(grpc_future2) + + from concurrent.futures import wait, FIRST_EXCEPTION + + done, not_done = wait([future1, future2], return_when=FIRST_EXCEPTION) + assert len(done) == 1 + assert len(not_done) == 1 + + failed_future = done.pop() + assert isinstance(failed_future.exception(), Exception) + assert failed_future.exception().args == ("Simulated gRPC error",) + + def test_concurrent_futures_wait_timeout(self, mocker): + grpc_future1 = mock_grpc_future(mocker, running=True) + future1 = PineconeGrpcFuture(grpc_future1) + + grpc_future2 = mock_grpc_future(mocker, running=True) + future2 = PineconeGrpcFuture(grpc_future2) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, not_done = wait([future1, future2], timeout=1, return_when=FIRST_COMPLETED) + assert len(done) == 0 + assert len(not_done) == 2 + + def test_concurrent_futures_wait_all_timeout(self, mocker): + grpc_future1 = mock_grpc_future(mocker, running=True) + future1 = PineconeGrpcFuture(grpc_future1) + + grpc_future2 = mock_grpc_future(mocker, running=True) + future2 = PineconeGrpcFuture(grpc_future2) + + from concurrent.futures import wait, ALL_COMPLETED + + done, not_done = wait([future1, future2], timeout=1, return_when=ALL_COMPLETED) + assert len(done) == 0 + assert len(not_done) == 2 From a75805d394404c11bffab5b12c1e79de9ee49bb7 Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Thu, 31 Oct 2024 09:13:44 -0400 Subject: [PATCH 2/8] Add integration tests for async_req with upsert, fetch, delete --- pinecone/grpc/__init__.py | 4 + pinecone/grpc/future.py | 20 ++- pinecone/grpc/index_grpc.py | 16 ++- pinecone/grpc/utils.py | 13 +- tests/integration/data/test_delete_future.py | 30 +++++ tests/integration/data/test_fetch_future.py | 99 +++++++++++++++ tests/integration/data/test_upsert_future.py | 121 +++++++++++++++++++ 7 files changed, 290 insertions(+), 13 deletions(-) create mode 100644 tests/integration/data/test_delete_future.py create mode 100644 tests/integration/data/test_fetch_future.py create mode 100644 tests/integration/data/test_upsert_future.py diff --git a/pinecone/grpc/__init__.py b/pinecone/grpc/__init__.py index 381f1841..a027e897 100644 --- a/pinecone/grpc/__init__.py +++ b/pinecone/grpc/__init__.py @@ -47,20 +47,24 @@ from .index_grpc import GRPCIndex from .pinecone import PineconeGRPC from .config import GRPCClientConfig +from .future import PineconeGrpcFuture from pinecone.core.grpc.protos.vector_service_pb2 import ( Vector as GRPCVector, SparseValues as GRPCSparseValues, Vector, SparseValues, + DeleteResponse as GRPCDeleteResponse, ) __all__ = [ "GRPCIndex", "PineconeGRPC", + "GRPCDeleteResponse", "GRPCClientConfig", "GRPCVector", "GRPCSparseValues", "Vector", "SparseValues", + "PineconeGrpcFuture", ] diff --git a/pinecone/grpc/future.py b/pinecone/grpc/future.py index 5e3b9d39..fa8c4d71 100644 --- a/pinecone/grpc/future.py +++ b/pinecone/grpc/future.py @@ -5,21 +5,26 @@ class PineconeGrpcFuture(ConcurrentFuture): - def __init__(self, grpc_future: GrpcFuture, timeout: Optional[int] = 10): + def __init__( + self, grpc_future: GrpcFuture, timeout: Optional[int] = 10, result_transformer=None + ): super().__init__() self._grpc_future = grpc_future self.default_timeout = timeout # seconds + self.result_transformer = result_transformer + + # Sync initial state, in case the gRPC future is already done + self._sync_state(self._grpc_future) # Add callback to subscribe to updates from the gRPC future self._grpc_future.add_done_callback(self._sync_state) - # Sync initial state, in case the gRPC future is already done - self._sync_state(self._grpc_future) + @property + def grpc_future(self): + return self._grpc_future def _sync_state(self, grpc_future): - # Sync the gRPC future completion to the wrapper future if self.done(): - # Future already done, nothing to do return if grpc_future.cancelled(): @@ -35,6 +40,11 @@ def _sync_state(self, grpc_future): elif grpc_future.running(): self.set_running_or_notify_cancel() + def set_result(self, result): + if self.result_transformer: + result = self.result_transformer(result) + return super().set_result(result) + def cancel(self): self._grpc_future.cancel() return super().cancel() diff --git a/pinecone/grpc/index_grpc.py b/pinecone/grpc/index_grpc.py index 6269c23d..eff8fafe 100644 --- a/pinecone/grpc/index_grpc.py +++ b/pinecone/grpc/index_grpc.py @@ -282,7 +282,11 @@ def delete( return self.runner.run(self.stub.Delete, request, timeout=timeout) def fetch( - self, ids: Optional[List[str]], namespace: Optional[str] = None, **kwargs + self, + ids: Optional[List[str]], + namespace: Optional[str] = None, + async_req: Optional[bool] = False, + **kwargs, ) -> FetchResponse: """ The fetch operation looks up and returns vectors, by ID, from a single namespace. @@ -304,9 +308,13 @@ def fetch( args_dict = self._parse_non_empty_args([("namespace", namespace)]) request = FetchRequest(ids=ids, **args_dict, **kwargs) - response = self.runner.run(self.stub.Fetch, request, timeout=timeout) - json_response = json_format.MessageToDict(response) - return parse_fetch_response(json_response) + + if async_req: + future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout) + return PineconeGrpcFuture(future, result_transformer=parse_fetch_response) + else: + response = self.runner.run(self.stub.Fetch, request, timeout=timeout) + return parse_fetch_response(response) def query( self, diff --git a/pinecone/grpc/utils.py b/pinecone/grpc/utils.py index 99f45460..f85568aa 100644 --- a/pinecone/grpc/utils.py +++ b/pinecone/grpc/utils.py @@ -1,4 +1,7 @@ from typing import Optional +from google.protobuf import json_format +from google.protobuf.message import Message + import uuid from pinecone.core.openapi.data.models import ( @@ -35,10 +38,12 @@ def parse_sparse_values(sparse_values: dict): ) -def parse_fetch_response(response: dict): +def parse_fetch_response(response: Message): + json_response = json_format.MessageToDict(response) + vd = {} - vectors = response.get("vectors", {}) - namespace = response.get("namespace", "") + vectors = json_response.get("vectors", {}) + namespace = json_response.get("namespace", "") for id, vec in vectors.items(): vd[id] = _Vector( @@ -52,7 +57,7 @@ def parse_fetch_response(response: dict): return FetchResponse( vectors=vd, namespace=namespace, - usage=parse_usage(response.get("usage", {})), + usage=parse_usage(json_response.get("usage", {})), _check_type=False, ) diff --git a/tests/integration/data/test_delete_future.py b/tests/integration/data/test_delete_future.py new file mode 100644 index 00000000..ade3a009 --- /dev/null +++ b/tests/integration/data/test_delete_future.py @@ -0,0 +1,30 @@ +import os +import pytest +from pinecone import Vector +from pinecone.grpc import GRPCDeleteResponse +from ..helpers import poll_stats_for_namespace + + +class TestDeleteFuture: + @pytest.mark.skipif( + os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" + ) + def test_delete_future(self, idx, namespace): + idx.upsert( + vectors=[ + Vector(id="id1", values=[0.1, 0.2]), + Vector(id="id2", values=[0.1, 0.2]), + Vector(id="id3", values=[0.1, 0.2]), + ], + namespace=namespace, + ) + poll_stats_for_namespace(idx, namespace, 3) + + delete_one = idx.delete(ids=["id1"], namespace=namespace, async_req=True) + delete_namespace = idx.delete(namespace=namespace, delete_all=True, async_req=True) + + from concurrent.futures import as_completed + + for future in as_completed([delete_one, delete_namespace], timeout=10): + resp = future.result() + assert isinstance(resp, GRPCDeleteResponse) diff --git a/tests/integration/data/test_fetch_future.py b/tests/integration/data/test_fetch_future.py new file mode 100644 index 00000000..b20bfc8a --- /dev/null +++ b/tests/integration/data/test_fetch_future.py @@ -0,0 +1,99 @@ +import os +import pytest +from pinecone.grpc import PineconeGrpcFuture + + +@pytest.mark.skipif( + os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" +) +class TestFetchFuture: + def setup_method(self): + self.expected_dimension = 2 + + def test_fetch_multiple_by_id(self, idx, namespace): + target_namespace = namespace + + results = idx.fetch(ids=["1", "2", "4"], namespace=target_namespace, async_req=True) + assert isinstance(results, PineconeGrpcFuture) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, _ = wait([results], return_when=FIRST_COMPLETED) + + results = done.pop().result() + assert results.usage is not None + assert results.usage["read_units"] is not None + assert results.usage["read_units"] > 0 + + assert results.namespace == target_namespace + assert len(results.vectors) == 3 + assert results.vectors["1"].id == "1" + assert results.vectors["2"].id == "2" + # Metadata included, if set + assert results.vectors["1"].metadata is None + assert results.vectors["2"].metadata is None + assert results.vectors["4"].metadata is not None + assert results.vectors["4"].metadata["genre"] == "action" + assert results.vectors["4"].metadata["runtime"] == 120 + # Values included + assert results.vectors["1"].values is not None + assert len(results.vectors["1"].values) == self.expected_dimension + + def test_fetch_single_by_id(self, idx, namespace): + target_namespace = namespace + + future = idx.fetch(ids=["1"], namespace=target_namespace, async_req=True) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, _ = wait([future], return_when=FIRST_COMPLETED) + results = done.pop().result() + + assert results.namespace == target_namespace + assert len(results.vectors) == 1 + assert results.vectors["1"].id == "1" + assert results.vectors["1"].metadata is None + assert results.vectors["1"].values is not None + assert len(results.vectors["1"].values) == self.expected_dimension + + def test_fetch_nonexistent_id(self, idx, namespace): + target_namespace = namespace + + # Fetch id that is missing + future = idx.fetch(ids=["100"], namespace=target_namespace, async_req=True) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, _ = wait([future], return_when=FIRST_COMPLETED) + results = done.pop().result() + + assert results.namespace == target_namespace + assert len(results.vectors) == 0 + + def test_fetch_nonexistent_namespace(self, idx): + target_namespace = "nonexistent-namespace" + + # Fetch from namespace with no vectors + future = idx.fetch(ids=["1"], namespace=target_namespace, async_req=True) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, _ = wait([future], return_when=FIRST_COMPLETED) + results = done.pop().result() + + assert results.namespace == target_namespace + assert len(results.vectors) == 0 + + def test_fetch_unspecified_namespace(self, idx): + # Fetch without specifying namespace gives default namespace results + future = idx.fetch(ids=["1", "4"], async_req=True) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, _ = wait([future], return_when=FIRST_COMPLETED) + results = done.pop().result() + + assert results.namespace == "" + assert results.vectors["1"].id == "1" + assert results.vectors["1"].values is not None + assert results.vectors["4"].metadata is not None diff --git a/tests/integration/data/test_upsert_future.py b/tests/integration/data/test_upsert_future.py new file mode 100644 index 00000000..724aea1d --- /dev/null +++ b/tests/integration/data/test_upsert_future.py @@ -0,0 +1,121 @@ +import pytest +import os +from pinecone import Vector, PineconeException +from ..helpers import poll_stats_for_namespace +from .utils import embedding_values + + +class TestUpsertWithAsyncReq: + @pytest.mark.skipif( + os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" + ) + def test_upsert_to_namespace(self, idx, namespace): + target_namespace = namespace + + # Upsert with tuples + upsert1 = idx.upsert( + vectors=[ + ("1", embedding_values()), + ("2", embedding_values()), + ("3", embedding_values()), + ], + namespace=target_namespace, + async_req=True, + ) + + # Upsert with objects + upsert2 = idx.upsert( + vectors=[ + Vector(id="4", values=embedding_values()), + Vector(id="5", values=embedding_values()), + Vector(id="6", values=embedding_values()), + ], + namespace=target_namespace, + async_req=True, + ) + + # Upsert with dict + upsert3 = idx.upsert( + vectors=[ + {"id": "7", "values": embedding_values()}, + {"id": "8", "values": embedding_values()}, + {"id": "9", "values": embedding_values()}, + ], + namespace=target_namespace, + async_req=True, + ) + + poll_stats_for_namespace(idx, target_namespace, 9) + + # Check the vector count reflects some data has been upserted + stats = idx.describe_index_stats() + assert stats.total_vector_count >= 9 + assert stats.namespaces[target_namespace].vector_count == 9 + + # Use returned futures + from concurrent.futures import as_completed + + total_upserted = 0 + for future in as_completed([upsert1, upsert2, upsert3], timeout=10): + total_upserted += future.result().upserted_count + + assert total_upserted == 9 + + @pytest.mark.skipif( + os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" + ) + def test_upsert_to_namespace_when_failed_req(self, idx, namespace): + target_namespace = namespace + + # Upsert with tuples + upsert1 = idx.upsert( + vectors=[ + ("1", embedding_values()), + ("2", embedding_values()), + ("3", embedding_values()), + ], + namespace=target_namespace, + async_req=True, + ) + + # Upsert with objects + wrong_dimension = 10 + upsert2 = idx.upsert( + vectors=[ + Vector(id="4", values=embedding_values(wrong_dimension)), + Vector(id="5", values=embedding_values(wrong_dimension)), + Vector(id="6", values=embedding_values(wrong_dimension)), + ], + namespace=target_namespace, + async_req=True, + ) + + # Upsert with dict + upsert3 = idx.upsert( + vectors=[ + {"id": "7", "values": embedding_values()}, + {"id": "8", "values": embedding_values()}, + {"id": "9", "values": embedding_values()}, + ], + namespace=target_namespace, + async_req=True, + ) + + from concurrent.futures import wait, ALL_COMPLETED + + done, not_done = wait([upsert1, upsert2, upsert3], timeout=10, return_when=ALL_COMPLETED) + + assert len(done) == 3 + assert len(not_done) == 0 + + total_upserted = 0 + for future in done: + if future.exception(): + assert future is upsert2 + assert isinstance(future.exception(), PineconeException) + assert "Vector dimension 10 does not match the dimension of the index 2" in str( + future.exception() + ) + else: + total_upserted += future.result().upserted_count + assert total_upserted == 6 From 54c2fa33d4aa98bc73007ae84928945425688eef Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Thu, 31 Oct 2024 09:44:51 -0400 Subject: [PATCH 3/8] Fixes for mypy --- pinecone/grpc/future.py | 22 ++++++++++++++-------- pinecone/grpc/index_grpc.py | 2 +- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pinecone/grpc/future.py b/pinecone/grpc/future.py index fa8c4d71..40a1b28a 100644 --- a/pinecone/grpc/future.py +++ b/pinecone/grpc/future.py @@ -6,12 +6,15 @@ class PineconeGrpcFuture(ConcurrentFuture): def __init__( - self, grpc_future: GrpcFuture, timeout: Optional[int] = 10, result_transformer=None + self, grpc_future: GrpcFuture, timeout: Optional[int] = None, result_transformer=None ): super().__init__() self._grpc_future = grpc_future - self.default_timeout = timeout # seconds - self.result_transformer = result_transformer + self._result_transformer = result_transformer + if timeout is not None: + self._default_timeout = timeout # seconds + else: + self._default_timeout = 5 # seconds # Sync initial state, in case the gRPC future is already done self._sync_state(self._grpc_future) @@ -29,11 +32,11 @@ def _sync_state(self, grpc_future): if grpc_future.cancelled(): self.cancel() - elif grpc_future.exception(timeout=self.default_timeout): + elif grpc_future.exception(timeout=self._default_timeout): self.set_exception(grpc_future.exception()) elif grpc_future.done(): try: - result = grpc_future.result(timeout=self.default_timeout) + result = grpc_future.result(timeout=self._default_timeout) self.set_result(result) except Exception as e: self.set_exception(e) @@ -41,7 +44,7 @@ def _sync_state(self, grpc_future): self.set_running_or_notify_cancel() def set_result(self, result): - if self.result_transformer: + if self._result_transformer: result = self.result_transformer(result) return super().set_result(result) @@ -66,8 +69,11 @@ def result(self, timeout=None): except RpcError as e: raise self._wrap_rpc_exception(e) from e - def _timeout(self, timeout: Optional[int]) -> int: - return timeout if timeout is not None else self.default_timeout + def _timeout(self, timeout: Optional[int] = None) -> int: + if timeout is not None: + return timeout + else: + return self._default_timeout def _wrap_rpc_exception(self, e): if e._state and e._state.debug_error_string: diff --git a/pinecone/grpc/index_grpc.py b/pinecone/grpc/index_grpc.py index eff8fafe..317a0fc5 100644 --- a/pinecone/grpc/index_grpc.py +++ b/pinecone/grpc/index_grpc.py @@ -287,7 +287,7 @@ def fetch( namespace: Optional[str] = None, async_req: Optional[bool] = False, **kwargs, - ) -> FetchResponse: + ) -> Union[FetchResponse, PineconeGrpcFuture]: """ The fetch operation looks up and returns vectors, by ID, from a single namespace. The returned vectors include the vector data and/or metadata. From b26f05e335953ac2c7156402ae144b241766ae00 Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Thu, 31 Oct 2024 09:59:25 -0400 Subject: [PATCH 4/8] Fix broken unit test --- tests/unit_grpc/test_futures.py | 12 ++++++++++++ tests/unit_grpc/test_grpc_index_upsert.py | 20 +++++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/tests/unit_grpc/test_futures.py b/tests/unit_grpc/test_futures.py index 593c4c79..1f8e70c0 100644 --- a/tests/unit_grpc/test_futures.py +++ b/tests/unit_grpc/test_futures.py @@ -379,6 +379,18 @@ def test_exception_when_pending_timeout(self, mocker): with pytest.raises(TimeoutError): future.exception(timeout=1) + def test_add_done_callback(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + callback = mocker.MagicMock() + future.add_done_callback(callback) + + grpc_future.done.return_value = True + future._sync_state(grpc_future) + + callback.assert_called_once_with(future) + def test_concurrent_futures_as_completed(self, mocker): grpc_future = mock_grpc_future(mocker, running=True) diff --git a/tests/unit_grpc/test_grpc_index_upsert.py b/tests/unit_grpc/test_grpc_index_upsert.py index cd65b7de..a0449323 100644 --- a/tests/unit_grpc/test_grpc_index_upsert.py +++ b/tests/unit_grpc/test_grpc_index_upsert.py @@ -19,9 +19,27 @@ class MockUpsertDelegate: def __init__(self, upsert_response: UpsertResponse): self.response = upsert_response - def result(self, timeout): + def result(self, timeout=None): return self.response + def cancelled(self): + return False + + def cancel(self): + pass + + def exception(self, timeout=None): + return None + + def done(self): + return True + + def running(self): + return False + + def add_done_callback(self, callback): + pass + @pytest.fixture def expected_vec1(vals1): From 8f3adcdc34258fc446f941c13ede99dadba47af3 Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Mon, 4 Nov 2024 10:59:55 -0500 Subject: [PATCH 5/8] Test fixes --- pinecone/grpc/future.py | 2 +- tests/integration/data/test_delete_future.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pinecone/grpc/future.py b/pinecone/grpc/future.py index 40a1b28a..dc87de2d 100644 --- a/pinecone/grpc/future.py +++ b/pinecone/grpc/future.py @@ -45,7 +45,7 @@ def _sync_state(self, grpc_future): def set_result(self, result): if self._result_transformer: - result = self.result_transformer(result) + result = self._result_transformer(result) return super().set_result(result) def cancel(self): diff --git a/tests/integration/data/test_delete_future.py b/tests/integration/data/test_delete_future.py index ade3a009..33dc044d 100644 --- a/tests/integration/data/test_delete_future.py +++ b/tests/integration/data/test_delete_future.py @@ -2,14 +2,16 @@ import pytest from pinecone import Vector from pinecone.grpc import GRPCDeleteResponse -from ..helpers import poll_stats_for_namespace +from ..helpers import poll_stats_for_namespace, random_string class TestDeleteFuture: @pytest.mark.skipif( os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client" ) - def test_delete_future(self, idx, namespace): + def test_delete_future(self, idx): + namespace = random_string(10) + idx.upsert( vectors=[ Vector(id="id1", values=[0.1, 0.2]), From 28dbadd4a79346ec691c6cd5b4215fd5ff794aea Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Mon, 4 Nov 2024 11:15:03 -0500 Subject: [PATCH 6/8] Fix grpc test imports --- tests/integration/data/test_delete_future.py | 4 +++- tests/integration/data/test_fetch_future.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integration/data/test_delete_future.py b/tests/integration/data/test_delete_future.py index 33dc044d..3ebea445 100644 --- a/tests/integration/data/test_delete_future.py +++ b/tests/integration/data/test_delete_future.py @@ -1,9 +1,11 @@ import os import pytest from pinecone import Vector -from pinecone.grpc import GRPCDeleteResponse from ..helpers import poll_stats_for_namespace, random_string +if os.environ.get("USE_GRPC") == "true": + from pinecone.grpc import GRPCDeleteResponse + class TestDeleteFuture: @pytest.mark.skipif( diff --git a/tests/integration/data/test_fetch_future.py b/tests/integration/data/test_fetch_future.py index b20bfc8a..19b09ac6 100644 --- a/tests/integration/data/test_fetch_future.py +++ b/tests/integration/data/test_fetch_future.py @@ -1,6 +1,8 @@ import os import pytest -from pinecone.grpc import PineconeGrpcFuture + +if os.environ.get("USE_GRPC") == "true": + from pinecone.grpc import PineconeGrpcFuture @pytest.mark.skipif( From 4312f8ad140a499b28c3c510ebf27b1782899729 Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Mon, 4 Nov 2024 12:53:11 -0500 Subject: [PATCH 7/8] Remove commented code --- pinecone/grpc/future.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pinecone/grpc/future.py b/pinecone/grpc/future.py index dc87de2d..d98e1e84 100644 --- a/pinecone/grpc/future.py +++ b/pinecone/grpc/future.py @@ -81,12 +81,6 @@ def _wrap_rpc_exception(self, e): else: return PineconeException("Unknown GRPC error") - # def __repr__(self): - # return super().__repr__() - - # def __str__(self) -> str: - # return super().__repr__() - def __del__(self): self._grpc_future.cancel() self = None # release the reference to the grpc future From ff9a9dea0fb19f0e8b2a3d80b59e41bf9c6d6365 Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Mon, 4 Nov 2024 15:42:16 -0500 Subject: [PATCH 8/8] Update unit test with grpc.Future interface --- tests/unit_grpc/test_grpc_index_upsert.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/unit_grpc/test_grpc_index_upsert.py b/tests/unit_grpc/test_grpc_index_upsert.py index a0449323..2eba655e 100644 --- a/tests/unit_grpc/test_grpc_index_upsert.py +++ b/tests/unit_grpc/test_grpc_index_upsert.py @@ -13,9 +13,10 @@ SparseValues, ) from pinecone.grpc.utils import dict_to_proto_struct +from grpc import Future as GrpcFuture -class MockUpsertDelegate: +class MockUpsertDelegate(GrpcFuture): def __init__(self, upsert_response: UpsertResponse): self.response = upsert_response @@ -40,6 +41,9 @@ def running(self): def add_done_callback(self, callback): pass + def traceback(self, timeout=None): + pass + @pytest.fixture def expected_vec1(vals1):