Skip to content

Commit

Permalink
feat: broadcast to others
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Oct 13, 2023
1 parent d2e4b14 commit 9220ad4
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master

- Add support for broadcast options (e.g., `exclude_socket`). ([@palkan][])

- Add `batch_broadcasts` option to automatically batch broadcasts for code wrapped in Rails executor. ([@palkan][])

## 1.4.1 (2023-09-27)
Expand Down
27 changes: 25 additions & 2 deletions docs/getting_started.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Getting Started with AnyCable on Rails

AnyCable initially was designed for Rails applications only.
AnyCable can be used as a drop-in replacement for the Action Cable server in Rails applications. It supports most Action Cable features (see [Compatibility](./compatibility.md) for more) and can be used with any Action Cable client. Moreover, AnyCable brings additional power-ups for your real-time features, such as [streams history support](../guides/reliable_streams.md) and API extensions (see [below](#action-cable-extensions)).

> See also the [demo](https://github.com/anycable/anycable_rails_demo/pull/2) of migrating from Action Cable to AnyCable.
Expand Down Expand Up @@ -95,7 +95,7 @@ Or you can use the environment variables (or anything else supported by [anyway_
AnyCable supports publishing [broadcast messages in batches](../ruby/broadcast_adapters.md#batching) (to reduce the number of round-trips and ensure delivery order). You can enable automatic batching of broadcasts by setting `ANYCABLE_BROADCAST_BATCHING=true` (or `broadcast_batching: true` in the config file).

Auto-batching uses [Rails executor](https://guides.rubyonrails.org/threading_and_code_execution.html#executor) under the hood, so broadcasts are aggregated within Rails _units of work_, such as HTTP requests, background jobs, etc.
Auto-batching uses [Rails executor]() under the hood, so broadcasts are aggregated within Rails _units of work_, such as HTTP requests, background jobs, etc.

### Server installation

Expand Down Expand Up @@ -183,6 +183,29 @@ AnyCable automatically integrates with Rails 7+ error reporting interface (`Rail

For earlier Rails versions, see [docs](../ruby/exceptions.md).

## Action Cable extensions

### Broadcast to others

AnyCable provides a functionality to deliver broadcasts to all clients except from the one initiated the action (e.g., when you need to broadcast a message to all users in a chat room except the one who sent the message).

> **NOTE:** This feature is not available in Action Cable. It relies on [Action Cable protocol extensions](../misc/action_cable_protocol.md) currently only supported by AnyCable.

To do so, you need to obtain a unique socket identifier. For example, using [AnyCable JS client](https://github.com/anycable/anycable-client), you can access it via the `cable.sessionId` property.

Then, you must attach this identifier to HTTP request as a `X-Socket-ID` header value. AnyCable Rails uses this value to populate the `AnyCable::Rails.current_socket_id` value. If this value is set, you can implement broadcastint to other using one of the following methods:

- Calling `ActionCable.server.broadcast stream, data, to_others: true`
- Calling `MyChannel.broadcast_to stream, data, to_others: true`

Finally, if you perform broadcasts inderectly, you can wrap the code with `AnyCable::Rails.broadcasting_to_others` to enable this feature. For example, when using Turbo Streams:

```ruby
AnyCable::Rails.broadcasting_to_others do
Turbo::StreamsChannel.broadcast_remove_to workspace, target: item
end
```

## Development and test

AnyCable is [compatible](compatibility.md) with the original Action Cable implementation; thus you can continue using Action Cable for development and tests.
Expand Down
7 changes: 5 additions & 2 deletions lib/action_cable/subscription_adapter/any_cable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ class AnyCable < Base
def initialize(*)
end

def broadcast(channel, payload)
::AnyCable.broadcast(channel, payload)
def broadcast(channel, payload, **options)
options.merge!(::AnyCable::Rails.current_broadcast_options || {})
to_others = options.delete(:to_others)
options[:exclude_socket] ||= ::AnyCable::Rails.current_socket_id if to_others
::AnyCable.broadcast(channel, payload, **options.compact)
end

def subscribe(*)
Expand Down
27 changes: 25 additions & 2 deletions lib/anycable/rails.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "anycable/rails/rack"

require "globalid"
require "active_support/core_ext/module/attribute_accessors_per_thread"

module AnyCable
# Rails handler for AnyCable
Expand All @@ -14,6 +15,9 @@ module Rails

ADAPTER_ALIASES = %w[any_cable anycable].freeze

thread_mattr_accessor :current_socket_id
thread_mattr_accessor :current_broadcast_options

class << self
def enabled?
adapter = ::ActionCable.server.config.cable&.fetch("adapter", nil)
Expand All @@ -24,6 +28,25 @@ def compatible_adapter?(adapter)
ADAPTER_ALIASES.include?(adapter)
end

def with_socket_id(socket_id)
old_socket_id, self.current_socket_id = current_socket_id, socket_id
yield
ensure
self.current_socket_id = old_socket_id
end

def with_broadcast_options(**options)
old_options = current_broadcast_options
self.current_broadcast_options = options.reverse_merge(old_options || {})
yield
ensure
self.current_broadcast_options = old_options
end

def broadcasting_to_others(&block)
with_broadcast_options(to_others: true, &block)
end

# Serialize connection/channel state variable to string
# using GlobalID where possible or JSON (if json: true)
def serialize(obj, json: false)
Expand All @@ -50,9 +73,9 @@ def deserialize(str, json: false)
end

module Extension
def broadcast(channel, payload)
def broadcast(...)
super
::AnyCable.broadcast(channel, payload)
::AnyCable.broadcast(...)
end
end

Expand Down
23 changes: 23 additions & 0 deletions lib/anycable/rails/action_cable_ext/broadcast_options.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

require "action_cable"

ActionCable::Server::Base.prepend(Module.new do
def broadcast(channel, payload, **options)
return super if options.empty?

AnyCable::Rails.with_broadcast_options(**options) do
super(channel, payload)
end
end
end)

ActionCable::Channel::Base.singleton_class.prepend(Module.new do
def broadcast_to(target, payload, **options)
return super if options.empty?

AnyCable::Rails.with_broadcast_options(**options) do
super(target, payload)
end
end
end)
3 changes: 2 additions & 1 deletion lib/anycable/rails/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
persistent_session_enabled: false,
embedded: false,
http_rpc_mount_path: nil,
batch_broadcasts: false
batch_broadcasts: false,
socket_id_header: "X-Socket-ID"
)
AnyCable::Config.ignore_options :access_logs_disabled, :persistent_session_enabled
8 changes: 4 additions & 4 deletions lib/anycable/rails/middlewares/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ def initialize(executor)
end

def call(method, message, metadata)
sid = metadata["sid"]

if ::Rails.respond_to?(:error)
executor.wrap do
sid = metadata["sid"]

::Rails.error.record(context: {method: method, payload: message.to_h, sid: sid}) do
yield
Rails.with_socket_id(sid) { yield }
end
end
else
executor.wrap { yield }
executor.wrap { Rails.with_socket_id(sid) { yield } }
end
end
end
Expand Down
8 changes: 8 additions & 0 deletions lib/anycable/rails/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "anycable/rails/action_cable_ext/connection"
require "anycable/rails/action_cable_ext/channel"
require "anycable/rails/action_cable_ext/remote_connections"
require "anycable/rails/action_cable_ext/broadcast_options"

require "anycable/rails/channel_state"
require "anycable/rails/connection_factory"
Expand Down Expand Up @@ -70,6 +71,13 @@ class Railtie < ::Rails::Railtie # :nodoc:
end
end

initializer "anycable.socket_id_tracking" do
ActiveSupport.on_load(:action_controller) do
require "anycable/rails/socket_id_tracking"
include AnyCable::Rails::SocketIdTracking
end
end

initializer "anycable.connection_factory", after: "action_cable.set_configs" do |app|
ActiveSupport.on_load(:action_cable) do
app.config.to_prepare do
Expand Down
19 changes: 19 additions & 0 deletions lib/anycable/rails/socket_id_tracking.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module AnyCable
module Rails
module SocketIdTracking
extend ActiveSupport::Concern

included do
around_action :anycable_tracking_socket_id
end

private

def anycable_tracking_socket_id(&block)
Rails.with_socket_id(request.headers[AnyCable.config.socket_id_header], &block)
end
end
end
end
3 changes: 2 additions & 1 deletion spec/dummy/app/controllers/broadcasts_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ class BroadcastsController < ApplicationController
around_action :maybe_disable_auto_batching

def create
options = params[:to_others] ? {to_others: true} : {}
params[:count].to_i.times do |num|
ActionCable.server.broadcast "test", {count: num + 1}
ActionCable.server.broadcast "test", {count: num + 1}, **options
end

head :created
Expand Down
66 changes: 66 additions & 0 deletions spec/integrations/broadcast_to_others_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# frozen_string_literal: true

require "spec_helper"
require "action_controller/test_case"

supported = AnyCable.method(:broadcast).arity != 2

describe "broadcast to others", skip: !supported do
include ActionDispatch::Integration::Runner
include ActionDispatch::IntegrationTest::Behavior

# Delegates to `Rails.application`.
def app
::Rails.application
end

before { allow(AnyCable.broadcast_adapter).to receive(:raw_broadcast) }

it "adds exclude_socket meta if X-Socket-ID header is provided" do
post "/broadcasts", params: {count: 1, to_others: true, disable_auto_batching: true}, headers: {"X-Socket-ID" => "134"}

expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).once
expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).with(
{
stream: "test",
data: {count: 1}.to_json,
meta: {exclude_socket: "134"}
}.to_json
)
end

it "doesn't add meta if header is missing" do
post "/broadcasts", params: {count: 1, to_others: true, disable_auto_batching: true}

expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).once
expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).with(
{
stream: "test",
data: {count: 1}.to_json
}.to_json
)
end

context "with custom header" do
around do |example|
was_value = AnyCable.config.socket_id_header
AnyCable.config.socket_id_header = "X-My-Socket-ID"
example.run
ensure
AnyCable.config.socket_id_header = was_value
end

specify do
post "/broadcasts", params: {count: 1, to_others: true, disable_auto_batching: true}, headers: {"X-My-Socket-ID" => "134"}

expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).once
expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).with(
{
stream: "test",
data: {count: 1}.to_json,
meta: {exclude_socket: "134"}
}.to_json
)
end
end
end
47 changes: 47 additions & 0 deletions spec/lib/anycable/rails/channel_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# frozen_string_literal: true

require "spec_helper"

supported = AnyCable.method(:broadcast).arity != 2

describe ActionCable::Channel::Base, skip: !supported do
before { allow(AnyCable.broadcast_adapter).to receive(:raw_broadcast) }

describe ".broadcast_to" do
context "with to_others: true" do
it "adds exclude_socket meta if current_socket_id is defined" do
AnyCable::Rails.with_socket_id("456") do
TestChannel.broadcast_to("test", {foo: "bar"}, to_others: true)
end

expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).once
expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).with(
{
stream: "test:test",
data: {foo: "bar"}.to_json,
meta: {exclude_socket: "456"}
}.to_json
)
end
end

context "when broadcasting_to_others is set" do
specify do
AnyCable::Rails.with_socket_id("456") do
AnyCable::Rails.broadcasting_to_others do
TestChannel.broadcast_to("test", {foo: "baz"})
end
end

expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).once
expect(AnyCable.broadcast_adapter).to have_received(:raw_broadcast).with(
{
stream: "test:test",
data: {foo: "baz"}.to_json,
meta: {exclude_socket: "456"}
}.to_json
)
end
end
end
end

0 comments on commit 9220ad4

Please sign in to comment.