Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement message queue for WebChannels #703

Merged
merged 2 commits into from
Feb 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions src/WebChannels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import Genie, Genie.Renderer

const ClientId = UInt # web socket hash
const ChannelName = String
const MESSAGE_QUEUE = Dict{UInt, Tuple{
Channel{Tuple{String, Channel{Nothing}}},
Task}
}()

struct ChannelNotFoundException <: Exception
name::ChannelName
Expand Down Expand Up @@ -105,10 +109,14 @@ end
Unsubscribes a web socket client `ws` from `channel`.
"""
function unsubscribe(ws::HTTP.WebSockets.WebSocket, channel::ChannelName) :: ChannelClientsCollection
haskey(CLIENTS, id(ws)) && deleteat!(CLIENTS[id(ws)].channels, CLIENTS[id(ws)].channels .== channel)
pop_subscription(id(ws), channel)
client = id(ws)

haskey(CLIENTS, client) && deleteat!(CLIENTS[client].channels, CLIENTS[client].channels .== channel)
pop_subscription(client, channel)
delete_queue!(MESSAGE_QUEUE, client)

@debug "Unsubscribed: $(id(ws)) ($(Dates.now()))"

@debug "Unsubscribed: $(client) ($(Dates.now()))"
CLIENTS
end
function unsubscribe(channel_client::ChannelClient, channel::ChannelName) :: ChannelClientsCollection
Expand Down Expand Up @@ -285,16 +293,52 @@ end
"""
Writes `msg` to web socket for `client`.
"""
function message(client::ClientId, msg::String)
ws = Genie.WebChannels.CLIENTS[client].client
# setup a reply channel
myfuture = Channel{Nothing}(1)

# retrieve the message queue or set it up if not present
q, _ = get!(MESSAGE_QUEUE, client) do
queue = Channel{Tuple{String, Channel{Nothing}}}(10)
handler = @async while true
message, future = take!(queue)
try
Sockets.send(ws, message)
finally
put!(future, nothing)
end
end
queue, handler
end

put!(q, (msg, myfuture))

take!(myfuture) # Wait until the message is processed
end
function message(client::ChannelClient, msg::String) :: Int
message(client.client, msg)
end
function message(ws::HTTP.WebSockets.WebSocket, msg::String) :: Int
message(id(ws), msg)
end

function message_unsafe(ws::HTTP.WebSockets.WebSocket, msg::String) :: Int
Sockets.send(ws, msg)
end
function message(client::ClientId, msg::String) :: Int
message(CLIENTS[client].client, msg)
function message_unsafe(client::ClientId, msg::String) :: Int
message_unsafe(CLIENTS[client].client, msg)
end
function message(client::ChannelClient, msg::String) :: Int
message(client.client, msg)
function message_unsafe(client::ChannelClient, msg::String) :: Int
message_unsafe(client.client, msg)
end

function delete_queue!(d::Dict, client::UInt)
queue, handler = pop!(MESSAGE_QUEUE, client, (nothing, nothing))
if queue !== nothing
@async Base.throwto(handler, InterruptException())
end
end

"""
Encodes `msg` in Base64 and tags it with `Genie.config.webchannels_base64_marker`.
Expand Down
Loading