Skip to content

Commit

Permalink
Move logic from AsyncResult.from_publisher to Task._get_result
Browse files Browse the repository at this point in the history
  • Loading branch information
imranariffin committed Jul 28, 2024
1 parent ed611a8 commit 146f8f6
Showing 1 changed file with 12 additions and 17 deletions.
29 changes: 12 additions & 17 deletions src/aiotaskq/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,6 @@ def __init__(
self.result = result
self.error = error

@classmethod
async def from_publisher(cls, task_id: str) -> "AsyncResult":
"""Return the result of the task once finished."""
from aiotaskq.serde import Serialization # pylint: disable=import-outside-toplevel

pubsub_ = PubSub.get(url=Config.broker_url(), poll_interval_s=0.01)
async with pubsub_ as pubsub: # pylint: disable=not-async-context-manager
await pubsub.subscribe(Constants.results_channel_template().format(task_id=task_id))
message: PollResponse = await pubsub.poll()

logger.debug("Message: %s", message)

result_serialized: bytes = message["data"]
result_: "AsyncResult" = Serialization.deserialize(cls, result_serialized)
return result_

def get(self) -> RT | Exception:
"""Return the result of the task once finished."""
if self.error is not None:
Expand Down Expand Up @@ -197,8 +181,19 @@ async def publish(self) -> None:
await pubsub.publish(Constants.tasks_channel(), message=message)

async def _get_result(self) -> RT:
from aiotaskq.serde import Serialization # pylint: disable=import-outside-toplevel

logger.debug("Retrieving result for task [task_id=%s]", self.id)
async_result: AsyncResult[RT] = await AsyncResult.from_publisher(task_id=self.id)
pubsub_ = PubSub.get(url=Config.broker_url(), poll_interval_s=0.01)
async with pubsub_ as pubsub: # pylint: disable=not-async-context-manager
await pubsub.subscribe(Constants.results_channel_template().format(task_id=self.id))
message: PollResponse = await pubsub.poll()

logger.debug("Message: %s", message)

result_serialized: bytes = message["data"]
async_result: AsyncResult[RT] = Serialization.deserialize(AsyncResult, result_serialized)

result: RT | Exception = async_result.get()
if isinstance(result, Exception):
raise result
Expand Down

0 comments on commit 146f8f6

Please sign in to comment.