Skip to content

Commit

Permalink
added tryReceiveAsync, receiveAllAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
Jack Tang committed Dec 5, 2024
1 parent 4cc1ca3 commit 0438d9d
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 13 deletions.
65 changes: 65 additions & 0 deletions examples/ex11_async_router_dealer.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import std/asyncdispatch
import std/sequtils
import std/strutils
import ../zmq

const N_WORKER = 3
const host = "tcp://localhost:5572"

proc worker(): Future[void] {.async.} =
let socket = zmq.connect(host, DEALER)
socket.sendAll("READY")
var isRunning = true
while isRunning:
let multiparts = await socket.receiveAllAsync()
echo "worker receive ", multiparts
let command = multiparts[0]
case command:
of "JOB":
let x = multiparts[1]
let y = parseInt(x)*2
socket.sendAll("DONE", $x, $y)
of "KILL":
isRunning = false
socket.sendAll("END")

proc router(): Future[void] {.async.} =
let socket = zmq.listen(host, ROUTER)
var jobs = toSeq(1..10)
var nWorker = 0
var isRunning = true
while isRunning:
let multiparts = await socket.receiveAllAsync()
echo "router receive ", multiparts
let workerId = multiparts[0]
let command = multiparts[1]
case command:
of "READY":
nWorker += 1
if jobs.len > 0:
socket.sendAll(workerId, "JOB", $jobs.pop())
else:
socket.sendAll(workerId, "KILL")
of "END":
nWorker -= 1
if nWorker == 0:
# stop router if no workers
isRunning = false
of "DONE":
let x = multiparts[2]
let y = multiparts[3]
assert parseInt(x)*2 == parseInt(y)
if jobs.len > 0:
socket.sendAll(workerId, "JOB", $jobs.pop())
else:
socket.sendAll(workerId, "KILL")
else:
raise newException(CatchableError, "unknown command")

when isMainModule:
echo "ex11_async_router_dealer.nim"
asyncCheck router()
for i in 1..N_WORKER:
asyncCheck worker()
while hasPendingOperations():
poll()
46 changes: 33 additions & 13 deletions zmq/asynczmq.nim
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,7 @@ proc pollAsync*(poller: AsyncZPoller, timeout: int = 2) : Future[int] =

result.complete(r)

proc receiveAsync*(conn: ZConnection): Future[string] =
## Similar to `receive()`, but `receiveAsync()` allows other async tasks to run.
## `receiveAsync()` allows other async tasks to run in those cases.
##
## This will not work in some case because it depends on ZMQ_FD which is not necessarily the 'true' FD of the socket
##
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
let fut = newFuture[string]("receiveAsync")
result = fut
let sock = conn.socket

template receiveAsyncCallbackTemplate(fut: Future, sock: ZSocket, recv, cb) =
proc cb(fd: AsyncFD): bool {.closure, gcsafe.} =
# the cb should work on the low level socket and not the ZConnection object
result = true
Expand All @@ -116,15 +106,45 @@ proc receiveAsync*(conn: ZConnection): Future[string] =
else:
# ready to read
unregister(fd)
fut.complete sock.receive(DONTWAIT)
fut.complete recv(sock, DONTWAIT)
except:
unregister(fd)
fut.fail getCurrentException()

proc receiveAsync*(conn: ZConnection): Future[string] =
## Similar to `receive()`, but `receiveAsync()` allows other async tasks to run.
## `receiveAsync()` allows other async tasks to run in those cases.
##
## This will not work in some case because it depends on ZMQ_FD which is not necessarily the 'true' FD of the socket
##
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
let fut = newFuture[string]("receiveAsync")
result = fut
receiveAsyncCallbackTemplate(fut, conn.socket, receive, cb)
let fd = getsockopt[cint](conn, ZSockOptions.FD).AsyncFD
register(fd)
discard cb(fd)

proc tryReceiveAsync*(conn: ZConnection): Future[tuple[msgAvailable: bool, moreAvailable: bool, msg: string]] =
## Async version of `tryReceive()`
let fut = newFuture[tuple[msgAvailable: bool, moreAvailable: bool, msg: string]]("tryReceiveAsync")
result = fut
receiveAsyncCallbackTemplate(fut, conn.socket, tryReceive, cb)
let fd = getsockopt[cint](conn, ZSockOptions.FD).AsyncFD
register(fd)
discard cb(fd)

proc receiveAllAsync*(conn: ZConnection): Future[seq[string]] {.async.} =
## async version for `receiveAll()`
var expectMessage = true
while expectMessage:
let (msgAvailable, moreAvailable, msg) = await tryReceiveAsync(conn)
if msgAvailable:
result.add msg
expectMessage = moreAvailable
else:
expectMessage = false

proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWAIT): Future[void] =
## `send()` is blocking for some connection types (e.g. PUSH, DEALER).
## `sendAsync()` allows other async tasks to run in those cases.
Expand All @@ -134,11 +154,11 @@ proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWA
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
let fut = newFuture[void]("sendAsync")
result = fut
let sock = conn.socket

let status = getsockopt[cint](conn, ZSockOptions.EVENTS)
if (status and ZMQ_POLLOUT) == 0:
# wait until queue available
let sock = conn.socket
proc cb(fd: AsyncFD): bool {.closure, gcsafe.} =
result = true

Expand Down

0 comments on commit 0438d9d

Please sign in to comment.