Skip to content

Commit

Permalink
Implement concurrent.futures interface for PineconeGrpcFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
jhamon committed Oct 31, 2024
1 parent 36373a1 commit edfe31d
Show file tree
Hide file tree
Showing 2 changed files with 557 additions and 21 deletions.
84 changes: 63 additions & 21 deletions pinecone/grpc/future.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,76 @@
from grpc._channel import _MultiThreadedRendezvous
from concurrent.futures import Future as ConcurrentFuture
from typing import Optional
from grpc import Future as GrpcFuture, RpcError
from pinecone.exceptions.exceptions import PineconeException


class PineconeGrpcFuture:
def __init__(self, delegate):
self._delegate = delegate
class PineconeGrpcFuture(ConcurrentFuture):
def __init__(self, grpc_future: GrpcFuture, timeout: Optional[int] = 10):
super().__init__()
self._grpc_future = grpc_future
self.default_timeout = timeout # seconds

def cancel(self):
return self._delegate.cancel()
# Add callback to subscribe to updates from the gRPC future
self._grpc_future.add_done_callback(self._sync_state)

# Sync initial state, in case the gRPC future is already done
self._sync_state(self._grpc_future)

def _sync_state(self, grpc_future):
# Sync the gRPC future completion to the wrapper future
if self.done():
# Future already done, nothing to do
return

def cancelled(self):
return self._delegate.cancelled()
if grpc_future.cancelled():
self.cancel()
elif grpc_future.exception(timeout=self.default_timeout):
self.set_exception(grpc_future.exception())
elif grpc_future.done():
try:
result = grpc_future.result(timeout=self.default_timeout)
self.set_result(result)
except Exception as e:
self.set_exception(e)
elif grpc_future.running():
self.set_running_or_notify_cancel()

def running(self):
return self._delegate.running()
def cancel(self):
self._grpc_future.cancel()
return super().cancel()

def done(self):
return self._delegate.done()
def exception(self, timeout=None):
exception = super().exception(timeout=self._timeout(timeout))
if isinstance(exception, RpcError):
return self._wrap_rpc_exception(exception)
return exception

def add_done_callback(self, fun):
return self._delegate.add_done_callback(fun)
def traceback(self, timeout=None):
# This is not part of the ConcurrentFuture interface, but keeping it for
# backward compatibility
return self._grpc_future.traceback(timeout=self._timeout(timeout))

def result(self, timeout=None):
try:
return self._delegate.result(timeout=timeout)
except _MultiThreadedRendezvous as e:
raise PineconeException(e._state.debug_error_string) from e
return super().result(timeout=self._timeout(timeout))
except RpcError as e:
raise self._wrap_rpc_exception(e) from e

def exception(self, timeout=None):
return self._delegate.exception(timeout=timeout)
def _timeout(self, timeout: Optional[int]) -> int:
return timeout if timeout is not None else self.default_timeout

def traceback(self, timeout=None):
return self._delegate.traceback(timeout=timeout)
def _wrap_rpc_exception(self, e):
if e._state and e._state.debug_error_string:
return PineconeException(e._state.debug_error_string)
else:
return PineconeException("Unknown GRPC error")

# def __repr__(self):
# return super().__repr__()

# def __str__(self) -> str:
# return super().__repr__()

def __del__(self):
self._grpc_future.cancel()
self = None # release the reference to the grpc future
Loading

0 comments on commit edfe31d

Please sign in to comment.