Skip to content

Commit

Permalink
revert timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
matt-bernstein committed Apr 19, 2024
1 parent 58c686a commit 825ce54
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion adala/environments/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ async def restore(self):
async def save(self):
raise NotImplementedError("Save is not supported in Kafka environment")

async def message_receiver(self, consumer: AIOKafkaConsumer, timeout: int = 30):
async def message_receiver(self, consumer: AIOKafkaConsumer, timeout: int = 3):
await consumer.start()
try:
while True:
try:
# Wait for the next message with a timeout
msg = await asyncio.wait_for(consumer.getone(), timeout=timeout)
print_text(f"Received message: {msg.value}")
yield msg.value
except asyncio.TimeoutError:
print_text(
Expand All @@ -77,8 +78,10 @@ async def message_sender(
try:
for record in data:
await producer.send_and_wait(topic, value=record)
print_text(f"Sent message: {record} to {topic=}")
finally:
await producer.stop()
print_text(f"No more messages for {topic=}")

async def get_next_batch(self, data_iterator, batch_size: int) -> List[Dict]:
batch = []
Expand Down

0 comments on commit 825ce54

Please sign in to comment.