diff --git a/pinecone/grpc/future.py b/pinecone/grpc/future.py index a4ea56b1..5e3b9d39 100644 --- a/pinecone/grpc/future.py +++ b/pinecone/grpc/future.py @@ -1,34 +1,76 @@ -from grpc._channel import _MultiThreadedRendezvous +from concurrent.futures import Future as ConcurrentFuture +from typing import Optional +from grpc import Future as GrpcFuture, RpcError from pinecone.exceptions.exceptions import PineconeException -class PineconeGrpcFuture: - def __init__(self, delegate): - self._delegate = delegate +class PineconeGrpcFuture(ConcurrentFuture): + def __init__(self, grpc_future: GrpcFuture, timeout: Optional[int] = 10): + super().__init__() + self._grpc_future = grpc_future + self.default_timeout = timeout # seconds - def cancel(self): - return self._delegate.cancel() + # Add callback to subscribe to updates from the gRPC future + self._grpc_future.add_done_callback(self._sync_state) + + # Sync initial state, in case the gRPC future is already done + self._sync_state(self._grpc_future) + + def _sync_state(self, grpc_future): + # Sync the gRPC future completion to the wrapper future + if self.done(): + # Future already done, nothing to do + return - def cancelled(self): - return self._delegate.cancelled() + if grpc_future.cancelled(): + self.cancel() + elif grpc_future.exception(timeout=self.default_timeout): + self.set_exception(grpc_future.exception()) + elif grpc_future.done(): + try: + result = grpc_future.result(timeout=self.default_timeout) + self.set_result(result) + except Exception as e: + self.set_exception(e) + elif grpc_future.running(): + self.set_running_or_notify_cancel() - def running(self): - return self._delegate.running() + def cancel(self): + self._grpc_future.cancel() + return super().cancel() - def done(self): - return self._delegate.done() + def exception(self, timeout=None): + exception = super().exception(timeout=self._timeout(timeout)) + if isinstance(exception, RpcError): + return self._wrap_rpc_exception(exception) + return exception - def add_done_callback(self, fun): - return self._delegate.add_done_callback(fun) + def traceback(self, timeout=None): + # This is not part of the ConcurrentFuture interface, but keeping it for + # backward compatibility + return self._grpc_future.traceback(timeout=self._timeout(timeout)) def result(self, timeout=None): try: - return self._delegate.result(timeout=timeout) - except _MultiThreadedRendezvous as e: - raise PineconeException(e._state.debug_error_string) from e + return super().result(timeout=self._timeout(timeout)) + except RpcError as e: + raise self._wrap_rpc_exception(e) from e - def exception(self, timeout=None): - return self._delegate.exception(timeout=timeout) + def _timeout(self, timeout: Optional[int]) -> int: + return timeout if timeout is not None else self.default_timeout - def traceback(self, timeout=None): - return self._delegate.traceback(timeout=timeout) + def _wrap_rpc_exception(self, e): + if e._state and e._state.debug_error_string: + return PineconeException(e._state.debug_error_string) + else: + return PineconeException("Unknown GRPC error") + + # def __repr__(self): + # return super().__repr__() + + # def __str__(self) -> str: + # return super().__repr__() + + def __del__(self): + self._grpc_future.cancel() + self = None # release the reference to the grpc future diff --git a/tests/unit_grpc/test_futures.py b/tests/unit_grpc/test_futures.py new file mode 100644 index 00000000..593c4c79 --- /dev/null +++ b/tests/unit_grpc/test_futures.py @@ -0,0 +1,494 @@ +import pytest +from pinecone.grpc.future import PineconeGrpcFuture +from pinecone.exceptions import PineconeException + +import grpc +from concurrent.futures import CancelledError, TimeoutError + + +def mock_grpc_future( + mocker, done=False, cancelled=False, exception=None, running=False, result=None +): + grpc_future = mocker.MagicMock() + grpc_future.cancelled.return_value = cancelled + grpc_future.done.return_value = done + grpc_future.exception.return_value = exception + grpc_future.running.return_value = running + grpc_future.result.return_value = result + return grpc_future + + +class FakeGrpcError(grpc.RpcError): + def __init__(self, mocker): + self._state = mocker.Mock() + self._state.debug_error_string = "Test gRPC error" + + +class TestPineconeGrpcFuture: + def test_wraps_grpc_future_already_done(self, mocker): + grpc_future = mock_grpc_future(mocker, done=True, result="final result") + + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future._state == "FINISHED" + assert future.done() + assert future.result() == "final result" + + def test_wraps_grpc_already_failed(self, mocker): + grpc_future = mock_grpc_future( + mocker, done=True, exception=Exception("Simulated gRPC error") + ) + + future = PineconeGrpcFuture(grpc_future) + + assert future._state == "FINISHED" + assert future.done() + with pytest.raises(Exception, match="Simulated gRPC error"): + future.result() + + def test_wraps_grpc_future_already_running(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert future.running() + assert future._state == "RUNNING" + + def test_wraps_grpc_future_already_cancelled(self, mocker): + grpc_future = mock_grpc_future(mocker, cancelled=True) + future = PineconeGrpcFuture(grpc_future) + + assert future.cancelled() + assert future._state == "CANCELLED" + assert future.done() + with pytest.raises(CancelledError): + future.result() + + def test_wraps_grpc_future_cancel_pending(self, mocker): + grpc_future = mock_grpc_future(mocker) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future._state == "PENDING" + assert future.cancel() + assert future._state == "CANCELLED" + + assert future.cancelled() + assert not future.running() + + # Also cancel the grpc future + grpc_future.cancel.assert_called_once() + + with pytest.raises(CancelledError): + future.result() + + def test_cancel_already_cancelled(self, mocker): + grpc_future = mock_grpc_future(mocker, cancelled=True, done=True) + future = PineconeGrpcFuture(grpc_future) + + assert future.cancelled() + assert future._state == "CANCELLED" + + # Cancel returns True even if the future is already cancelled + assert future.cancel() + + def test_cancel_already_done(self, mocker): + grpc_future = mock_grpc_future(mocker, done=True, result="final result") + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future._state == "FINISHED" + + # Can't cancel a future that is already done + assert future.cancel() is False + + assert future.result() == "final result" + + def test_cancel_already_failed(self, mocker): + grpc_future = mock_grpc_future( + mocker, done=True, exception=Exception("Simulated gRPC error") + ) + future = PineconeGrpcFuture(grpc_future) + + assert future._state == "FINISHED" + assert future.done() + + # Can't cancel a future that is already done + assert future.cancel() is False + + with pytest.raises(Exception, match="Simulated gRPC error"): + future.result() + + def test_cancel_already_running(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert future.running() + assert future._state == "RUNNING" + + # Can't cancel a future that is already running + assert future.cancel() is False + + assert future.running() + assert not future.done() + assert not future.cancelled() + assert future._state == "RUNNING" + + def test_cancel_pending(self, mocker): + grpc_future = mock_grpc_future(mocker) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert not future.running() + assert future._state == "PENDING" + + # Cancel the future + assert future.cancel() + + assert future.cancelled() + assert future.done() + assert not future.running() + assert future._state == "CANCELLED" + + # Marks underlying grpc future as cancelled + grpc_future.cancel.assert_called_once() + + def test_result_success(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert future.running() + assert future._state == "RUNNING" + + # Update the state of the grpc future + grpc_future.done.return_value = True + grpc_future.running.return_value = False + grpc_future.result.return_value = "final result" + + # Trigger the done callback to update the state of the wrapper future + future._sync_state(grpc_future) + + assert future.result() == "final result" + assert future.done() + assert not future.cancelled() + assert not future.running() + + def test_result_exception(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert future.running() + assert future._state == "RUNNING" + + # Update the state of the grpc future + grpc_future.done.return_value = True + grpc_future.running.return_value = False + grpc_future.result.side_effect = Exception("Simulated gRPC error") + + # Trigger the done callback to update the state of the wrapper future + future._sync_state(grpc_future) + + with pytest.raises(Exception, match="Simulated gRPC error"): + future.result() + + assert future.done() + assert not future.cancelled() + assert not future.running() + + def test_result_already_successful(self, mocker): + grpc_future = mock_grpc_future(mocker, done=True, result="final result") + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future.done() + assert not future.running() + assert future._state == "FINISHED" + + assert future.result() == "final result" + + def test_result_already_failed(self, mocker): + grpc_future = mock_grpc_future( + mocker, done=True, exception=Exception("Simulated gRPC error") + ) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future.done() + assert not future.running() + assert future._state == "FINISHED" + + with pytest.raises(Exception, match="Simulated gRPC error"): + future.result() + + def test_result_already_cancelled(self, mocker): + grpc_future = mock_grpc_future(mocker, cancelled=True, done=True) + future = PineconeGrpcFuture(grpc_future) + + assert future.cancelled() + assert future.done() + assert not future.running() + assert future._state == "CANCELLED" + + with pytest.raises(CancelledError): + future.result() + + def test_result_timeout_running(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert future.running() + assert future._state == "RUNNING" + + with pytest.raises(TimeoutError): + future.result(timeout=1) + + def test_result_timeout_pending(self, mocker): + grpc_future = mock_grpc_future(mocker) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.done() + assert not future.running() + assert future._state == "PENDING" + + with pytest.raises(TimeoutError): + future.result(timeout=1) + + def test_result_default_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker) + future = PineconeGrpcFuture(grpc_future, timeout=1) + + assert not future.cancelled() + assert not future.done() + assert not future.running() + assert future._state == "PENDING" + + with pytest.raises(TimeoutError): + future.result() + + assert not future.cancelled() + assert not future.done() + assert not future.running() + assert future._state == "PENDING" + + def test_result_catch_grpc_exceptions(self, mocker): + grpc_future = mock_grpc_future(mocker) + grpc_future.result.side_effect = FakeGrpcError(mocker) + + future = PineconeGrpcFuture(grpc_future) + + grpc_future.done.return_value = True + future._sync_state(grpc_future) + + assert not future.cancelled() + assert not future.running() + assert future.done() + assert future._state == "FINISHED" + + with pytest.raises(PineconeException, match="Test gRPC error"): + future.result() + + assert isinstance(future.exception(), PineconeException) + + assert not future.cancelled() + assert not future.running() + assert future.done() + assert future._state == "FINISHED" + + def test_exception_when_done_maps_grpc_exception(self, mocker): + grpc_future = mock_grpc_future(mocker, done=True) + grpc_future.exception.return_value = FakeGrpcError(mocker) + + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.running() + assert future.done() + assert future._state == "FINISHED" + + assert isinstance(future.exception(), PineconeException) + + def test_exception_when_done_no_exceptions(self, mocker): + grpc_future = mock_grpc_future(mocker, done=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.running() + assert future.done() + assert future._state == "FINISHED" + + assert future.exception() is None + + def test_exception_when_running_default_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future, timeout=1) + + assert not future.cancelled() + assert future.running() + assert not future.done() + assert future._state == "RUNNING" + + with pytest.raises(TimeoutError): + future.exception() + + def test_exception_when_running_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert future.running() + assert not future.done() + assert future._state == "RUNNING" + + with pytest.raises(TimeoutError): + future.exception(timeout=1) + + def test_exception_when_pending_default_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker) + future = PineconeGrpcFuture(grpc_future, timeout=1) + + assert not future.cancelled() + assert not future.running() + assert not future.done() + assert future._state == "PENDING" + + with pytest.raises(TimeoutError): + future.exception() + + def test_exception_when_pending_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker) + + future = PineconeGrpcFuture(grpc_future) + + assert not future.cancelled() + assert not future.running() + assert not future.done() + assert future._state == "PENDING" + + with pytest.raises(TimeoutError): + future.exception(timeout=1) + + def test_concurrent_futures_as_completed(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + + future = PineconeGrpcFuture(grpc_future, timeout=1) + + # Trigger the done callback + grpc_future.done.return_value = True + grpc_future.result.return_value = "success" + future._sync_state(grpc_future) + + from concurrent.futures import as_completed + + for future in as_completed([future], timeout=1): + assert future.result() == "success" + assert future.done() + assert not future.cancelled() + + def test_concurrent_futures_as_completed_timeout(self, mocker): + grpc_future = mock_grpc_future(mocker, running=True) + future1 = PineconeGrpcFuture(grpc_future, timeout=3) + + grpc_future2 = mock_grpc_future(mocker, done=True, result="success") + future2 = PineconeGrpcFuture(grpc_future2, timeout=3) + + grpc_future3 = mock_grpc_future(mocker, done=True, cancelled=True) + future3 = PineconeGrpcFuture(grpc_future3, timeout=3) + + from concurrent.futures import as_completed + + completed_count = 0 + with pytest.raises(TimeoutError): + for f in as_completed([future1, future2, future3], timeout=1): + completed_count += 1 + + assert completed_count == 1 + + def test_concurrent_futures_wait_first_completed(self, mocker): + grpc_future1 = mock_grpc_future(mocker, done=True, result="success") + future1 = PineconeGrpcFuture(grpc_future1) + + grpc_future2 = mock_grpc_future(mocker, running=True) + future2 = PineconeGrpcFuture(grpc_future2) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, not_done = wait([future1, future2], timeout=1, return_when=FIRST_COMPLETED) + assert len(done) == 1 + assert len(not_done) == 1 + assert done.pop().result() == "success" + + # order should not matter + done, not_done = wait([future2, future1], timeout=1, return_when=FIRST_COMPLETED) + assert len(done) == 1 + assert len(not_done) == 1 + assert done.pop().result() == "success" + + def test_concurrent_futures_wait_all_completed(self, mocker): + grpc_future1 = mock_grpc_future(mocker, done=True, result="success") + future1 = PineconeGrpcFuture(grpc_future1) + + grpc_future2 = mock_grpc_future(mocker, done=True, result="success") + future2 = PineconeGrpcFuture(grpc_future2) + + from concurrent.futures import wait, ALL_COMPLETED + + done, not_done = wait([future1, future2], timeout=3, return_when=ALL_COMPLETED) + assert len(done) == 2 + assert len(not_done) == 0 + assert all(f.result() == "success" for f in done) + + def test_concurrent_futures_wait_first_exception(self, mocker): + grpc_future1 = mock_grpc_future(mocker) + future1 = PineconeGrpcFuture(grpc_future1) + + grpc_future2 = mock_grpc_future(mocker, done=True) + grpc_future2.exception.return_value = Exception("Simulated gRPC error") + future2 = PineconeGrpcFuture(grpc_future2) + + from concurrent.futures import wait, FIRST_EXCEPTION + + done, not_done = wait([future1, future2], return_when=FIRST_EXCEPTION) + assert len(done) == 1 + assert len(not_done) == 1 + + failed_future = done.pop() + assert isinstance(failed_future.exception(), Exception) + assert failed_future.exception().args == ("Simulated gRPC error",) + + def test_concurrent_futures_wait_timeout(self, mocker): + grpc_future1 = mock_grpc_future(mocker, running=True) + future1 = PineconeGrpcFuture(grpc_future1) + + grpc_future2 = mock_grpc_future(mocker, running=True) + future2 = PineconeGrpcFuture(grpc_future2) + + from concurrent.futures import wait, FIRST_COMPLETED + + done, not_done = wait([future1, future2], timeout=1, return_when=FIRST_COMPLETED) + assert len(done) == 0 + assert len(not_done) == 2 + + def test_concurrent_futures_wait_all_timeout(self, mocker): + grpc_future1 = mock_grpc_future(mocker, running=True) + future1 = PineconeGrpcFuture(grpc_future1) + + grpc_future2 = mock_grpc_future(mocker, running=True) + future2 = PineconeGrpcFuture(grpc_future2) + + from concurrent.futures import wait, ALL_COMPLETED + + done, not_done = wait([future1, future2], timeout=1, return_when=ALL_COMPLETED) + assert len(done) == 0 + assert len(not_done) == 2