From 68a80ecbbd34c63f76c4f7953973b69d9ce54a0e Mon Sep 17 00:00:00 2001 From: hhaensel <31985040+hhaensel@users.noreply.github.com> Date: Tue, 28 Nov 2023 13:44:40 +0100 Subject: [PATCH] refactor broadcasting to support `except` and `restrict` (also accepting lists) (#692) --- src/WebChannels.jl | 11 ++++++++--- src/WebThreads.jl | 45 +++++++++++++++++---------------------------- 2 files changed, 25 insertions(+), 31 deletions(-) diff --git a/src/WebChannels.jl b/src/WebChannels.jl index 17fca608c..3fe691e31 100755 --- a/src/WebChannels.jl +++ b/src/WebChannels.jl @@ -216,7 +216,8 @@ Pushes `msg` (and `payload`) to all the clients subscribed to the channels in `c function broadcast(channels::Union{ChannelName,Vector{ChannelName}}, msg::String, payload::Union{Dict,Nothing} = nothing; - except::Union{HTTP.WebSockets.WebSocket,Nothing,UInt} = nothing) :: Bool + except::Union{Nothing,UInt,Vector{UInt}} = nothing, + restrict::Union{Nothing,UInt,Vector{UInt}} = nothing) :: Bool isa(channels, Array) || (channels = ChannelName[channels]) isempty(SUBSCRIPTIONS) && return false @@ -225,8 +226,12 @@ function broadcast(channels::Union{ChannelName,Vector{ChannelName}}, for channel in channels haskey(SUBSCRIPTIONS, channel) || throw(ChannelNotFoundException(channel)) - for client in SUBSCRIPTIONS[channel] - except !== nothing && client == id(except) && continue + ids = restrict === nothing ? SUBSCRIPTIONS[channel] : intersect(SUBSCRIPTIONS[channel], restrict) + for client in ids + if except !== nothing + except isa UInt && client == except && continue + except isa Vector{UInt} && client ∈ except && continue + end HTTP.WebSockets.isclosed(CLIENTS[client].client) && continue try diff --git a/src/WebThreads.jl b/src/WebThreads.jl index 5c3ee06fd..097b69bab 100644 --- a/src/WebThreads.jl +++ b/src/WebThreads.jl @@ -216,9 +216,10 @@ end """ Pushes `msg` (and `payload`) to all the clients subscribed to the channels in `channels`. """ -function broadcast(channels::Union{ChannelName,Vector{ChannelName}}, msg::String; - except::Union{HTTP.WebSockets.WebSocket,Nothing,UInt} = nothing) :: Bool - isa(channels, Array) || (channels = ChannelName[channels]) +function broadcast(channels::Union{ChannelName,Vector{ChannelName}}, msg::String, payload::Union{Dict,Nothing} = nothing; + except::Union{Nothing,UInt,Vector{UInt}} = nothing, + restrict::Union{Nothing,UInt,Vector{UInt}} = nothing) :: Bool + isa(channels, Array) || (channels = [channels]) for channel in channels if ! haskey(SUBSCRIPTIONS, channel) @@ -226,31 +227,17 @@ function broadcast(channels::Union{ChannelName,Vector{ChannelName}}, msg::String continue end - for client in SUBSCRIPTIONS[channel] - except !== nothing && client == except && continue - - try - message(client, msg) - catch ex - @error ex + ids = restrict === nothing ? SUBSCRIPTIONS[channel] : intersect(SUBSCRIPTIONS[channel], restrict) + for client in ids + if except !== nothing + except isa UInt && client == except && continue + except isa Vector{UInt} && client ∈ except && continue end - end - end - - true -end -function broadcast(channels::Union{ChannelName,Vector{ChannelName}}, msg::String, payload::Dict) :: Bool - isa(channels, Array) || (channels = [channels]) - - for channel in channels - if ! haskey(SUBSCRIPTIONS, channel) - @debug(Genie.WebChannels.ChannelNotFoundException(channel)) - continue - end - for client in SUBSCRIPTIONS[channel] try - message(client, ChannelMessage(channel, client, msg, payload) |> Renderer.Json.JSONParser.json) + payload !== nothing ? + message(client, ChannelMessage(channel, client, msg, payload) |> Renderer.Json.JSONParser.json) : + message(client, msg) catch ex @error ex end @@ -264,10 +251,12 @@ end """ Pushes `msg` (and `payload`) to all the clients subscribed to all the channels. """ -function broadcast(msg::String, payload::Union{Dict,Nothing} = nothing) :: Bool +function broadcast(msg::String, payload::Union{Dict,Nothing} = nothing; + except::Union{Nothing,UInt,Vector{UInt}} = nothing, + restrict::Union{Nothing,UInt,Vector{UInt}} = nothing) :: Bool payload === nothing ? - broadcast(collect(keys(SUBSCRIPTIONS)), msg) : - broadcast(collect(keys(SUBSCRIPTIONS)), msg, payload) + broadcast(collect(keys(SUBSCRIPTIONS)), msg; except, restrict) : + broadcast(collect(keys(SUBSCRIPTIONS)), msg, payload; except, restrict) end