Skip to content

Commit

Permalink
Simplify implementation
Browse files Browse the repository at this point in the history
Instead of using a priority queue, just use the event loop to schedule
retries in the future. This significantly simplifies the implementation
and makes it much more like the original.

Note that we still do have a semaphore that ensures that no more than 1K
inputs are in flight (i.e., sent to the server but not completed).
  • Loading branch information
rohansingh committed Dec 6, 2024
1 parent d18cba1 commit 6ed9306
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 242 deletions.
72 changes: 0 additions & 72 deletions modal/_utils/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,75 +760,3 @@ async def async_chain(*generators: AsyncGenerator[T, None]) -> AsyncGenerator[T,
logger.exception(f"Error closing async generator: {e}")
if first_exception is not None:
raise first_exception


class TimedPriorityQueue(asyncio.PriorityQueue[tuple[float, int, Union[T, None]]]):
"""
A priority queue that schedules items to be processed at specific timestamps.
"""

def __init__(self, maxsize: int = 0):
super().__init__(maxsize=maxsize)
self.condition = asyncio.Condition()
self.nonce = 0

async def put_with_timestamp(self, timestamp: float, item: Union[T, None]):
"""
Add an item to the queue to be processed at a specific timestamp.
"""
self.nonce += 1
await super().put((timestamp, self.nonce, item))

async with self.condition:
self.condition.notify_all() # notify any waiting coroutines

async def get_next(self) -> Union[T, None]:
"""
Get the next item from the queue that is ready to be processed.
"""
while True:
async with self.condition:
while self.empty():
await self.condition.wait()

# peek at the next item
timestamp, nonce, item = await super().get()
now = time.time()

if timestamp > now:
# not ready yet, calculate sleep time
sleep_time = timestamp - now
self.put_nowait((timestamp, nonce, item)) # put it back

# wait until either the timeout or a new item is added
try:
await asyncio.wait_for(self.condition.wait(), timeout=sleep_time)
except asyncio.TimeoutError:
continue
else:
return item

async def batch(self, max_batch_size=100, debounce_time=0.015) -> AsyncGenerator[list[T], None]:
"""
Read from the queue but return lists of items when queue is large.
Treats a None value as the end of queue items.
"""
batch: list[T] = []
while True:
try:
item: Union[T, None] = await asyncio.wait_for(self.get_next(), timeout=debounce_time)

if item is None:
if batch:
yield batch
return
batch.append(item)

if len(batch) >= max_batch_size:
yield batch
batch = []
except asyncio.TimeoutError:
if batch:
yield batch
batch = []
Loading

0 comments on commit 6ed9306

Please sign in to comment.