Skip to content

Commit

Permalink
refactor broadcasting to support except and restrict (also accept…
Browse files Browse the repository at this point in the history
…ing lists) (#692)
  • Loading branch information
hhaensel authored Nov 28, 2023
1 parent e0259ac commit 68a80ec
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 31 deletions.
11 changes: 8 additions & 3 deletions src/WebChannels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ Pushes `msg` (and `payload`) to all the clients subscribed to the channels in `c
function broadcast(channels::Union{ChannelName,Vector{ChannelName}},
msg::String,
payload::Union{Dict,Nothing} = nothing;
except::Union{HTTP.WebSockets.WebSocket,Nothing,UInt} = nothing) :: Bool
except::Union{Nothing,UInt,Vector{UInt}} = nothing,
restrict::Union{Nothing,UInt,Vector{UInt}} = nothing) :: Bool
isa(channels, Array) || (channels = ChannelName[channels])

isempty(SUBSCRIPTIONS) && return false
Expand All @@ -225,8 +226,12 @@ function broadcast(channels::Union{ChannelName,Vector{ChannelName}},
for channel in channels
haskey(SUBSCRIPTIONS, channel) || throw(ChannelNotFoundException(channel))

for client in SUBSCRIPTIONS[channel]
except !== nothing && client == id(except) && continue
ids = restrict === nothing ? SUBSCRIPTIONS[channel] : intersect(SUBSCRIPTIONS[channel], restrict)
for client in ids
if except !== nothing
except isa UInt && client == except && continue
except isa Vector{UInt} && client except && continue
end
HTTP.WebSockets.isclosed(CLIENTS[client].client) && continue

try
Expand Down
45 changes: 17 additions & 28 deletions src/WebThreads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -216,41 +216,28 @@ end
"""
Pushes `msg` (and `payload`) to all the clients subscribed to the channels in `channels`.
"""
function broadcast(channels::Union{ChannelName,Vector{ChannelName}}, msg::String;
except::Union{HTTP.WebSockets.WebSocket,Nothing,UInt} = nothing) :: Bool
isa(channels, Array) || (channels = ChannelName[channels])
function broadcast(channels::Union{ChannelName,Vector{ChannelName}}, msg::String, payload::Union{Dict,Nothing} = nothing;
except::Union{Nothing,UInt,Vector{UInt}} = nothing,
restrict::Union{Nothing,UInt,Vector{UInt}} = nothing) :: Bool
isa(channels, Array) || (channels = [channels])

for channel in channels
if ! haskey(SUBSCRIPTIONS, channel)
@debug(Genie.WebChannels.ChannelNotFoundException(channel))
continue
end

for client in SUBSCRIPTIONS[channel]
except !== nothing && client == except && continue

try
message(client, msg)
catch ex
@error ex
ids = restrict === nothing ? SUBSCRIPTIONS[channel] : intersect(SUBSCRIPTIONS[channel], restrict)
for client in ids
if except !== nothing
except isa UInt && client == except && continue
except isa Vector{UInt} && client except && continue
end
end
end

true
end
function broadcast(channels::Union{ChannelName,Vector{ChannelName}}, msg::String, payload::Dict) :: Bool
isa(channels, Array) || (channels = [channels])

for channel in channels
if ! haskey(SUBSCRIPTIONS, channel)
@debug(Genie.WebChannels.ChannelNotFoundException(channel))
continue
end

for client in SUBSCRIPTIONS[channel]
try
message(client, ChannelMessage(channel, client, msg, payload) |> Renderer.Json.JSONParser.json)
payload !== nothing ?
message(client, ChannelMessage(channel, client, msg, payload) |> Renderer.Json.JSONParser.json) :
message(client, msg)
catch ex
@error ex
end
Expand All @@ -264,10 +251,12 @@ end
"""
Pushes `msg` (and `payload`) to all the clients subscribed to all the channels.
"""
function broadcast(msg::String, payload::Union{Dict,Nothing} = nothing) :: Bool
function broadcast(msg::String, payload::Union{Dict,Nothing} = nothing;
except::Union{Nothing,UInt,Vector{UInt}} = nothing,
restrict::Union{Nothing,UInt,Vector{UInt}} = nothing) :: Bool
payload === nothing ?
broadcast(collect(keys(SUBSCRIPTIONS)), msg) :
broadcast(collect(keys(SUBSCRIPTIONS)), msg, payload)
broadcast(collect(keys(SUBSCRIPTIONS)), msg; except, restrict) :
broadcast(collect(keys(SUBSCRIPTIONS)), msg, payload; except, restrict)
end


Expand Down

0 comments on commit 68a80ec

Please sign in to comment.