From cfbeb7e6e8f1f053b1cfeb62307e45cb8bad68f1 Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Thu, 12 Oct 2023 19:39:29 -0700 Subject: [PATCH] feat: broadcast to others --- CHANGELOG.md | 2 + docs/getting_started.md | 12 ++++- .../subscription_adapter/any_cable.rb | 7 ++- lib/anycable/rails.rb | 27 ++++++++++- .../action_cable_ext/broadcast_options.rb | 23 +++++++++ lib/anycable/rails/middlewares/executor.rb | 8 ++-- lib/anycable/rails/railtie.rb | 8 ++++ lib/anycable/rails/socket_id_tracking.rb | 19 ++++++++ .../app/controllers/broadcasts_controller.rb | 3 +- spec/integrations/broadcast_to_others_spec.rb | 43 +++++++++++++++++ spec/lib/anycable/rails/channel_spec.rb | 47 +++++++++++++++++++ 11 files changed, 188 insertions(+), 11 deletions(-) create mode 100644 lib/anycable/rails/action_cable_ext/broadcast_options.rb create mode 100644 lib/anycable/rails/socket_id_tracking.rb create mode 100644 spec/integrations/broadcast_to_others_spec.rb create mode 100644 spec/lib/anycable/rails/channel_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index c3a974a..c541b8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## master +- Add support for broadcast options (e.g., `exclude_socket`). ([@palkan][]) + - Add `batch_broadcasts` option to automatically batch broadcasts for code wrapped in Rails executor. ([@palkan][]) ## 1.4.1 (2023-09-27) diff --git a/docs/getting_started.md b/docs/getting_started.md index f5c4d86..a95344e 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -1,6 +1,6 @@ # Getting Started with AnyCable on Rails -AnyCable initially was designed for Rails applications only. +AnyCable can be used as a drop-in replacement for the Action Cable server in Rails applications. It supports most Action Cable features (see [Compatibility](./compatibility.md) for more) and can be used with any Action Cable client. Moreover, AnyCable brings additional power-ups for your real-time features, such as [streams history support](../guides/reliable_streams.md) and API extensensions (see below). > See also the [demo](https://github.com/anycable/anycable_rails_demo/pull/2) of migrating from Action Cable to AnyCable. @@ -95,7 +95,7 @@ Or you can use the environment variables (or anything else supported by [anyway_ AnyCable supports publishing [broadcast messages in batches](../ruby/broadcast_adapters.md#batching) (to reduce the number of round-trips and ensure delivery order). You can enable automatic batching of broadcasts by setting `ANYCABLE_BROADCAST_BATCHING=true` (or `broadcast_batching: true` in the config file). -Auto-batching uses [Rails executor](https://guides.rubyonrails.org/threading_and_code_execution.html#executor) under the hood, so broadcasts are aggregated within Rails _units of work_, such as HTTP requests, background jobs, etc. +Auto-batching uses [Rails executor]() under the hood, so broadcasts are aggregated within Rails _units of work_, such as HTTP requests, background jobs, etc. ### Server installation @@ -183,6 +183,14 @@ AnyCable automatically integrates with Rails 7+ error reporting interface (`Rail For earlier Rails versions, see [docs](../ruby/exceptions.md). +## Action Cable extensions + +### Broadcast to others + +AnyCable provides a functionality to deliver broadcasts to all clients except from the one initiated the action (e.g., when you need to broadcast a message to all users in a chat room except the one who sent the message). + +TBD + ## Development and test AnyCable is [compatible](compatibility.md) with the original Action Cable implementation; thus you can continue using Action Cable for development and tests. diff --git a/lib/action_cable/subscription_adapter/any_cable.rb b/lib/action_cable/subscription_adapter/any_cable.rb index 880fce5..7014fda 100644 --- a/lib/action_cable/subscription_adapter/any_cable.rb +++ b/lib/action_cable/subscription_adapter/any_cable.rb @@ -18,8 +18,11 @@ class AnyCable < Base def initialize(*) end - def broadcast(channel, payload) - ::AnyCable.broadcast(channel, payload) + def broadcast(channel, payload, **options) + options.merge!(::AnyCable::Rails.current_broadcast_options || {}) + to_others = options.delete(:to_others) + options[:exclude_socket] ||= ::AnyCable::Rails.current_socket_id if to_others + ::AnyCable.broadcast(channel, payload, **options.compact) end def subscribe(*) diff --git a/lib/anycable/rails.rb b/lib/anycable/rails.rb index c66585a..f518318 100644 --- a/lib/anycable/rails.rb +++ b/lib/anycable/rails.rb @@ -6,6 +6,7 @@ require "anycable/rails/rack" require "globalid" +require "active_support/core_ext/module/attribute_accessors_per_thread" module AnyCable # Rails handler for AnyCable @@ -14,6 +15,9 @@ module Rails ADAPTER_ALIASES = %w[any_cable anycable].freeze + thread_mattr_accessor :current_socket_id + thread_mattr_accessor :current_broadcast_options + class << self def enabled? adapter = ::ActionCable.server.config.cable&.fetch("adapter", nil) @@ -24,6 +28,25 @@ def compatible_adapter?(adapter) ADAPTER_ALIASES.include?(adapter) end + def with_socket_id(socket_id) + old_socket_id, self.current_socket_id = current_socket_id, socket_id + yield + ensure + self.current_socket_id = old_socket_id + end + + def with_broadcast_options(**options) + old_options = current_broadcast_options + self.current_broadcast_options = options.reverse_merge(old_options || {}) + yield + ensure + self.current_broadcast_options = old_options + end + + def broadcasting_to_others(&block) + with_broadcast_options(to_others: true, &block) + end + # Serialize connection/channel state variable to string # using GlobalID where possible or JSON (if json: true) def serialize(obj, json: false) @@ -50,9 +73,9 @@ def deserialize(str, json: false) end module Extension - def broadcast(channel, payload) + def broadcast(...) super - ::AnyCable.broadcast(channel, payload) + ::AnyCable.broadcast(...) end end diff --git a/lib/anycable/rails/action_cable_ext/broadcast_options.rb b/lib/anycable/rails/action_cable_ext/broadcast_options.rb new file mode 100644 index 0000000..11bb2ab --- /dev/null +++ b/lib/anycable/rails/action_cable_ext/broadcast_options.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +require "action_cable" + +ActionCable::Server::Base.prepend(Module.new do + def broadcast(channel, payload, **options) + return super if options.empty? + + AnyCable::Rails.with_broadcast_options(**options) do + super(channel, payload) + end + end +end) + +ActionCable::Channel::Base.singleton_class.prepend(Module.new do + def broadcast_to(target, payload, **options) + return super if options.empty? + + AnyCable::Rails.with_broadcast_options(**options) do + super(target, payload) + end + end +end) diff --git a/lib/anycable/rails/middlewares/executor.rb b/lib/anycable/rails/middlewares/executor.rb index d647082..b2391af 100644 --- a/lib/anycable/rails/middlewares/executor.rb +++ b/lib/anycable/rails/middlewares/executor.rb @@ -13,16 +13,16 @@ def initialize(executor) end def call(method, message, metadata) + sid = metadata["sid"] + if ::Rails.respond_to?(:error) executor.wrap do - sid = metadata["sid"] - ::Rails.error.record(context: {method: method, payload: message.to_h, sid: sid}) do - yield + Rails.with_socket_id(sid) { yield } end end else - executor.wrap { yield } + executor.wrap { Rails.with_socket_id(sid) { yield } } end end end diff --git a/lib/anycable/rails/railtie.rb b/lib/anycable/rails/railtie.rb index ff782d7..8acf12a 100644 --- a/lib/anycable/rails/railtie.rb +++ b/lib/anycable/rails/railtie.rb @@ -3,6 +3,7 @@ require "anycable/rails/action_cable_ext/connection" require "anycable/rails/action_cable_ext/channel" require "anycable/rails/action_cable_ext/remote_connections" +require "anycable/rails/action_cable_ext/broadcast_options" require "anycable/rails/channel_state" require "anycable/rails/connection_factory" @@ -70,6 +71,13 @@ class Railtie < ::Rails::Railtie # :nodoc: end end + initializer "anycable.socket_id_tracking" do + ActiveSupport.on_load(:action_controller) do + require "anycable/rails/socket_id_tracking" + include AnyCable::Rails::SocketIdTracking + end + end + initializer "anycable.connection_factory", after: "action_cable.set_configs" do |app| ActiveSupport.on_load(:action_cable) do app.config.to_prepare do diff --git a/lib/anycable/rails/socket_id_tracking.rb b/lib/anycable/rails/socket_id_tracking.rb new file mode 100644 index 0000000..fca2bad --- /dev/null +++ b/lib/anycable/rails/socket_id_tracking.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module AnyCable + module Rails + module SocketIdTracking + extend ActiveSupport::Concern + + included do + around_action :anycable_tracking_socket_id + end + + private + + def anycable_tracking_socket_id(&block) + Rails.with_socket_id(request.headers["X-Socket-Id"], &block) + end + end + end +end diff --git a/spec/dummy/app/controllers/broadcasts_controller.rb b/spec/dummy/app/controllers/broadcasts_controller.rb index 1021607..71fd61f 100644 --- a/spec/dummy/app/controllers/broadcasts_controller.rb +++ b/spec/dummy/app/controllers/broadcasts_controller.rb @@ -4,8 +4,9 @@ class BroadcastsController < ApplicationController around_action :maybe_disable_auto_batching def create + options = params[:to_others] ? {to_others: true} : {} params[:count].to_i.times do |num| - ActionCable.server.broadcast "test", {count: num + 1} + ActionCable.server.broadcast "test", {count: num + 1}, **options end head :created diff --git a/spec/integrations/broadcast_to_others_spec.rb b/spec/integrations/broadcast_to_others_spec.rb new file mode 100644 index 0000000..e1633b4 --- /dev/null +++ b/spec/integrations/broadcast_to_others_spec.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +require "spec_helper" +require "action_controller/test_case" + +supported = AnyCable.method(:broadcast).arity != 2 + +describe "broadcast to others", skip: !supported do + include ActionDispatch::Integration::Runner + include ActionDispatch::IntegrationTest::Behavior + + # Delegates to `Rails.application`. + def app + ::Rails.application + end + + before { allow(AnyCable.broadcast_adapter).to receive(:raw_broadcast) } + + it "adds exclude_socket meta if X-Socket-ID header is provided" do + post "/broadcasts", params: {count: 1, to_others: true, disable_auto_batching: true}, headers: {"X-Socket-ID" => "134"} + + expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).once + expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).with( + { + stream: "test", + data: {count: 1}.to_json, + meta: {exclude_socket: "134"} + }.to_json + ) + end + + it "doesn't add meta if header is missing" do + post "/broadcasts", params: {count: 1, to_others: true, disable_auto_batching: true} + + expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).once + expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).with( + { + stream: "test", + data: {count: 1}.to_json + }.to_json + ) + end +end diff --git a/spec/lib/anycable/rails/channel_spec.rb b/spec/lib/anycable/rails/channel_spec.rb new file mode 100644 index 0000000..a180866 --- /dev/null +++ b/spec/lib/anycable/rails/channel_spec.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +require "spec_helper" + +supported = AnyCable.method(:broadcast).arity != 2 + +describe ActionCable::Channel::Base, skip: !supported do + before { allow(AnyCable.broadcast_adapter).to receive(:raw_broadcast) } + + describe ".broadcast_to" do + context "with to_others: true" do + it "adds exclude_socket meta if current_socket_id is defined" do + AnyCable::Rails.with_socket_id("456") do + TestChannel.broadcast_to("test", {foo: "bar"}, to_others: true) + end + + expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).once + expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).with( + { + stream: "test:test", + data: {foo: "bar"}.to_json, + meta: {exclude_socket: "456"} + }.to_json + ) + end + end + + context "when broadcasting_to_others is set" do + specify do + AnyCable::Rails.with_socket_id("456") do + AnyCable::Rails.broadcasting_to_others do + TestChannel.broadcast_to("test", {foo: "baz"}) + end + end + + expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).once + expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).with( + { + stream: "test:test", + data: {foo: "baz"}.to_json, + meta: {exclude_socket: "456"} + }.to_json + ) + end + end + end +end