Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Oct 22, 2024
1 parent bb021a8 commit 14b7d27
Showing 1 changed file with 4 additions and 6 deletions.
10 changes: 4 additions & 6 deletions pynumaflow/sourcetransformer/servicer/servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, handler: SourceTransformCallable, multiproc: bool = False):
self.__transform_handler: SourceTransformCallable = handler
# This indicates whether the grpc server attached is multiproc or not
self.multiproc = multiproc
# create a thread pool with 50 worker threads
# create a thread pool for executing UDF code
self.executor = ThreadPoolExecutor(max_workers=NUM_THREADS_DEFAULT)

def SourceTransformFn(
Expand All @@ -50,9 +50,6 @@ def SourceTransformFn(
Applies a function to each datum element.
The pascal case function name comes from the generated transform_pb2_grpc.py file.
"""

# proto repeated field(keys) is of type google._upb._message.RepeatedScalarContainer
# we need to explicitly convert it to list
try:
# The first message to be received should be a valid handshake
req = next(request_iterator)
Expand All @@ -64,7 +61,8 @@ def SourceTransformFn(
# result queue to stream messages from the user code back to the client
result_queue = SyncIterator()

# Reader thread to keep reading from the request iterator and once done close it.
# Reader thread to keep reading from the request iterator and schedule
# execution for each of them
reader_thread = threading.Thread(
target=self._process_requests, args=(context, request_iterator, result_queue)
)
Expand All @@ -80,7 +78,7 @@ def SourceTransformFn(
# return the result
yield res

# wait for the threads to cleanup
# wait for the threads to clean-up
reader_thread.join()
self.executor.shutdown(cancel_futures=True)

Expand Down

0 comments on commit 14b7d27

Please sign in to comment.