forked from mjwestcott/runnel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.py
29 lines (19 loc) · 768 Bytes
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from datetime import datetime
from runnel import App, Record
# Run this app from the CLI via `$ runnel worker example:app`
app = App(name="myapp", redis_url="redis://127.0.0.1")
# Specify event types using the Record class.
class Order(Record):
order_id: int
created_at: datetime
amount: float
orders = app.stream("orders", record=Order, partition_by="order_id")
# Every 4 seconds, send an example record to the stream.
@app.timer(interval=4)
async def sender():
await orders.send(Order(order_id=1, created_at=datetime.utcnow(), amount=9.99))
# Iterate over a continuous stream of events in your processors.
@app.processor(orders)
async def printer(events):
async for order in events.records():
print(f"processed {order.amount}")