diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index be83368..0a4ce15 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,7 +3,7 @@ name: Build on: push: branches: - - master + - master pull_request: workflow_dispatch: @@ -28,37 +28,35 @@ jobs: fail-fast: false matrix: ruby: ["3.3"] - gemfile: ["railsmaster"] - # include: - # - ruby: "3.3" - # gemfile: "railsmaster" - # - ruby: "3.2" - # gemfile: "anycablemaster" - # - ruby: "3.2" - # gemfile: "rails8" + gemfile: ["rails7", "rails8"] + next: ["0", "1"] + include: + - ruby: "3.2" + gemfile: "anycablemaster" + next: "0" steps: - - uses: actions/checkout@v4 - - name: Install system deps - run: | - sudo apt-get update - sudo apt-get install libsqlite3-dev - - uses: ruby/setup-ruby@v1 - with: - ruby-version: ${{ matrix.ruby }} - bundler-cache: true - - name: Install Coveralls reporter - run: | - curl -L https://github.com/coverallsapp/coverage-reporter/releases/latest/download/coveralls-linux.tar.gz | tar zxv - - name: Run RSpec - continue-on-error: ${{ matrix.allow_failure }} - run: | - bundle exec rake spec - ./coveralls -p --job-flag=ruby-${{ matrix.ruby }}-${{ matrix.gemfile }} - - name: Run compatibility specs - continue-on-error: ${{ matrix.allow_failure }} - run: | - bundle exec rake spec:compatibility - ./coveralls -p --job-flag=compatibility-${{ matrix.ruby }}-${{ matrix.gemfile }} + - uses: actions/checkout@v4 + - name: Install system deps + run: | + sudo apt-get update + sudo apt-get install libsqlite3-dev + - uses: ruby/setup-ruby@v1 + with: + ruby-version: ${{ matrix.ruby }} + bundler-cache: true + - name: Install Coveralls reporter + run: | + curl -L https://github.com/coverallsapp/coverage-reporter/releases/latest/download/coveralls-linux.tar.gz | tar zxv + - name: Run RSpec + continue-on-error: ${{ matrix.allow_failure }} + run: | + bundle exec rake spec + ./coveralls -p --job-flag=ruby-${{ matrix.ruby }}-${{ matrix.gemfile }} + - name: Run compatibility specs + continue-on-error: ${{ matrix.allow_failure }} + run: | + bundle exec rake spec:compatibility + ./coveralls -p --job-flag=compatibility-${{ matrix.ruby }}-${{ matrix.gemfile }} anyt: if: ${{ !contains(github.event.head_commit.message, '[ci skip tests]') }} @@ -75,18 +73,18 @@ jobs: ports: ["6379:6379"] options: --health-cmd="redis-cli ping" --health-interval 1s --health-timeout 3s --health-retries 30 steps: - - uses: actions/checkout@v4 - - name: Install system deps - run: | - sudo apt-get update - sudo apt-get install libsqlite3-dev - - uses: ruby/setup-ruby@v1 - with: - ruby-version: 3.3 - bundler-cache: true - - name: Run conformance tests with Anyt - run: | - bundle exec anyt --self-check --require=etc/anyt/*.rb + - uses: actions/checkout@v4 + - name: Install system deps + run: | + sudo apt-get update + sudo apt-get install libsqlite3-dev + - uses: ruby/setup-ruby@v1 + with: + ruby-version: 3.3 + bundler-cache: true + - name: Run conformance tests with Anyt + run: | + bundle exec anyt --self-check --require=etc/anyt/*.rb coverage: needs: rspec @@ -94,9 +92,9 @@ jobs: env: COVERALLS_REPO_TOKEN: ${{ secrets.github_token }} steps: - - name: Install Coveralls reporter - run: | - curl -L https://github.com/coverallsapp/coverage-reporter/releases/latest/download/coveralls-linux.tar.gz | tar zxv - - name: Finilize Coveralls build - run: | - ./coveralls -d + - name: Install Coveralls reporter + run: | + curl -L https://github.com/coverallsapp/coverage-reporter/releases/latest/download/coveralls-linux.tar.gz | tar zxv + - name: Finilize Coveralls build + run: | + ./coveralls -d diff --git a/Gemfile b/Gemfile index 73a2c77..613bd86 100644 --- a/Gemfile +++ b/Gemfile @@ -4,6 +4,14 @@ gemspec name: "anycable-rails" gem "debug", platform: :mri +if ENV["NEXT_ACTION_CABLE"] == "1" + if File.directory?(File.join(__dir__, "..", "actioncable-next")) + gem "actioncable-next", path: "../actioncable-next", require: false + else + gem "actioncable-next", require: false + end +end + local_gemfile = "#{File.dirname(__FILE__)}/Gemfile.local" eval_gemfile "gemfiles/rubocop.gemfile" diff --git a/Gemfile._local b/Gemfile._local new file mode 100755 index 0000000..c753f63 --- /dev/null +++ b/Gemfile._local @@ -0,0 +1,17 @@ +path "../../rails" do + gem 'activerecord' + gem 'actioncable' + gem 'activejob' + gem 'activesupport' + gem 'railties' +end +gem 'anycable', path: '../anycable' +path '../../rspec' do + gem 'rspec-core' + gem 'rspec-support' + gem 'rspec-expectations' + gem 'rspec-rails' +end + +gem "anyt", path: "../anyt" +gem "puma" diff --git a/anycable-rails-core.gemspec b/anycable-rails-core.gemspec index 3bd6435..2db86b5 100644 --- a/anycable-rails-core.gemspec +++ b/anycable-rails-core.gemspec @@ -27,6 +27,6 @@ Gem::Specification.new do |spec| spec.required_ruby_version = ">= 2.7" spec.add_dependency "anycable-core", "~> 1.5.0" - spec.add_dependency "actioncable", "> 7.2" + spec.add_dependency "actioncable", ">= 7.0", "< 9.0" spec.add_dependency "globalid" end diff --git a/gemfiles/anycablemaster.gemfile b/gemfiles/anycablemaster.gemfile index 8451a7f..1921419 100644 --- a/gemfiles/anycablemaster.gemfile +++ b/gemfiles/anycablemaster.gemfile @@ -1,6 +1,6 @@ source "https://rubygems.org" -gem "rails", ">= 8.0" +gem "rails", "~> 7.0" gem "rspec-rails" gem "anycable", git: "https://github.com/anycable/anycable.git", branch: "master" gem "sqlite3" diff --git a/gemfiles/rails7.gemfile b/gemfiles/rails7.gemfile new file mode 100644 index 0000000..60c2b13 --- /dev/null +++ b/gemfiles/rails7.gemfile @@ -0,0 +1,9 @@ +source "https://rubygems.org" + +gem "actioncable", "~> 7.0" +gem "activerecord" +gem "activejob" +gem "rspec-rails" +gem "sqlite3" + +gemspec path: "..", name: "anycable-rails" diff --git a/gemfiles/rails8.gemfile b/gemfiles/rails8.gemfile index 56d5f54..79afe0f 100644 --- a/gemfiles/rails8.gemfile +++ b/gemfiles/rails8.gemfile @@ -1,8 +1,6 @@ source "https://rubygems.org" -gem "actioncable", "~> 8.0" -gem "activerecord" -gem "activejob" +gem "rails", "~> 8.0.0.beta1" gem "rspec-rails" gem "sqlite3" diff --git a/gemfiles/railsmaster.gemfile b/gemfiles/railsmaster.gemfile index 2e0f897..27da8f0 100644 --- a/gemfiles/railsmaster.gemfile +++ b/gemfiles/railsmaster.gemfile @@ -1,13 +1,7 @@ source "https://rubygems.org" -gem "rails", git: "https://github.com/palkan/rails.git", branch: "refactor/action-cable-server-adapterization" -gem "rspec-rails", git: "https://github.com/palkan/rspec-rails.git", branch: "feat/actioncable-v8" - -gem "rspec-core", git: "https://github.com/rspec/rspec-core.git" -gem "rspec-support", git: "https://github.com/rspec/rspec-support.git" -gem "rspec-expectations", git: "https://github.com/rspec/rspec-expectations.git" -gem "rspec-mocks", git: "https://github.com/rspec/rspec-mocks.git" - +gem "rails", git: "https://github.com/rails/rails.git", branch: "main" +gem "rspec-rails" gem "sqlite3" gemspec path: "..", name: "anycable-rails" diff --git a/lib/anycable/rails/action_cable_ext/channel.rb b/lib/anycable/rails/action_cable_ext/channel.rb index 1d59937..b104723 100644 --- a/lib/anycable/rails/action_cable_ext/channel.rb +++ b/lib/anycable/rails/action_cable_ext/channel.rb @@ -3,34 +3,77 @@ require "action_cable" ActionCable::Channel::Base.prepend(Module.new do - # Whispering support - def whispers_to(broadcasting) + def subscribe_to_channel + super unless anycabled? && !@__anycable_subscribing__ + end + + def handle_subscribe + @__anycable_subscribing__ = true + subscribe_to_channel + ensure + @__anycable_subscribing__ = false + end + + def start_periodic_timers + super unless anycabled? + end + + def stop_periodic_timers + super unless anycabled? + end + + def stream_from(broadcasting, _callback = nil, **opts) + whispering = opts.delete(:whisper) return super unless anycabled? - connection.anycable_socket.whisper identifier, broadcasting + broadcasting = String(broadcasting) + + connection.anycable_socket.subscribe identifier, broadcasting + if whispering + connection.anycable_socket.whisper identifier, broadcasting + end + end + + def stream_for(model, callback = nil, **opts, &block) + stream_from(broadcasting_for(model), callback || block, **opts) end - # Unsubscribing relies on the channel state (which is not persistent in AnyCable). - # Thus, we pretend that the stream is registered to make Action Cable do its unsubscribing job. def stop_stream_from(broadcasting) - streams[broadcasting] = true if anycabled? - super + return super unless anycabled? + + connection.anycable_socket.unsubscribe identifier, broadcasting end - # For AnyCable, unsubscribing from all streams is a separate operation, - # so we use a special constant to indicate it. def stop_all_streams - if anycabled? - streams.clear - streams[AnyCable::Rails::Server::PubSub::ALL_STREAMS] = true - end - super - end + return super unless anycabled? - # Make rejected status accessible from outside - def rejected? = subscription_rejected? + connection.anycable_socket.unsubscribe_from_all identifier + end private - def anycabled? = connection.anycabled? + def anycabled? + # Use instance variable check here for testing compatibility + connection.instance_variable_defined?(:@anycable_socket) + end +end) + +# Handle $pubsub channel in Subscriptions +ActionCable::Connection::Subscriptions.prepend(Module.new do + # The contents are mostly copied from the original, + # there is no good way to configure channels mapping due to #safe_constantize + # and the layers of JSON + # https://github.com/rails/rails/blob/main/actioncable/lib/action_cable/connection/subscriptions.rb + def add(data) + id_key = data["identifier"] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + + return if subscriptions.key?(id_key) + + return super unless id_options[:channel] == "$pubsub" + + subscription = AnyCable::Rails::PubSubChannel.new(connection, id_key, id_options) + subscriptions[id_key] = subscription + subscription.subscribe_to_channel + end end) diff --git a/lib/anycable/rails/action_cable_ext/connection.rb b/lib/anycable/rails/action_cable_ext/connection.rb index 1d70c20..9322e0a 100644 --- a/lib/anycable/rails/action_cable_ext/connection.rb +++ b/lib/anycable/rails/action_cable_ext/connection.rb @@ -5,25 +5,86 @@ ActionCable::Connection::Base.include(AnyCable::Rails::Connections::SerializableIdentification) ActionCable::Connection::Base.prepend(Module.new do - def anycabled? - anycable_socket - end + attr_reader :anycable_socket + attr_accessor :anycable_request_builder - # Allow overriding #subscriptions to use a custom implementation - attr_writer :subscriptions + # In AnyCable, we lazily populate env by passing it through the middleware chain, + # so we access it via #request + def env + return super unless anycabled? - # Alias for the #socket which is only set within AnyCable RPC context - attr_accessor :anycable_socket + request.env + end - # Enhance #send_welcome_message to include sid if present - def send_welcome_message - transmit({ - type: ActionCable::INTERNAL[:message_types][:welcome], - sid: env["anycable.sid"] - }.compact) + def anycabled? + @anycable_socket end - def subscribe_to_internal_channel - super unless anycabled? + private + + def request + return super unless anycabled? + + @request ||= anycable_request_builder.build_rack_request(@env) end end) + +# Backport command callbacks: https://github.com/rails/rails/pull/44696 +unless ActionCable::Connection::Base.respond_to?(:before_command) + ActionCable::Connection::Base.include ActiveSupport::Callbacks + ActionCable::Connection::Base.define_callbacks :command + ActionCable::Connection::Base.extend(Module.new do + def before_command(*methods, &block) + set_callback(:command, :before, *methods, &block) + end + + def after_command(*methods, &block) + set_callback(:command, :after, *methods, &block) + end + + def around_command(*methods, &block) + set_callback(:command, :around, *methods, &block) + end + end) + + ActionCable::Connection::Base.prepend(Module.new do + def dispatch_websocket_message(websocket_message) + return super unless websocket.alive? + + handle_channel_command(decode(websocket_message)) + end + + def handle_channel_command(payload) + run_callbacks :command do + subscriptions.execute_command payload + end + end + end) +end + +# Trigger autoload +test_case_defined = false + +begin + ActionCable::Connection::TestCase # rubocop:disable Lint/Void + test_case_defined = true +rescue NameError +end + +# Backport: https://github.com/rails/rails/pull/45445 +if test_case_defined && !ActionCable::Connection::TestConnection.method_defined?(:transmissions) + ActionCable::Connection::TestConnection.prepend(Module.new do + attr_reader :transmissions + + def initialize(*) + super + + @transmissions = [] + @subscriptions = ActionCable::Connection::Subscriptions.new(self) + end + + def transmit(cable_message) + transmissions << cable_message.with_indifferent_access + end + end) +end diff --git a/lib/anycable/rails/connection.rb b/lib/anycable/rails/connection.rb index 9f19679..4d708f7 100644 --- a/lib/anycable/rails/connection.rb +++ b/lib/anycable/rails/connection.rb @@ -4,193 +4,161 @@ module AnyCable module Rails - class Current < ActiveSupport::CurrentAttributes - attribute :identifier - end - - # Wrap ActionCable.server to provide a custom executor - # and a pubsub adapter - class Server < SimpleDelegator - # Implements an executor inteface - class Executor - class NoopTimer - def shutdown = nil - end - - NOOP_TIMER = NoopTimer.new.freeze - - def post(...) - raise NonImplementedError, "Executor#post is not implemented in AnyCable context" + # Enhance Action Cable connection + using(Module.new do + refine ActionCable::Connection::Base do + attr_writer :env, :websocket, :logger, :coder, + :subscriptions, :serialized_ids, :cached_ids, :server, + :anycable_socket + + # Using public :send_welcome_message causes stack level too deep 🤷🏻‍♂️ + def send_welcome_message + transmit({ + type: ActionCable::INTERNAL[:message_types][:welcome], + sid: env["anycable.sid"] + }.compact) end - def timer(...) = NOOP_TIMER - end - - # A signleton executor for all connections - EXECUTOR = Executor.new.freeze - - # PubSub adapter to manage streams configuration - # for the underlying socket - class PubSub - private attr_reader :socket - - ALL_STREAMS = Data.define(:to_str).new("all") - - def initialize(socket) = @socket = socket - - def subscribe(channel, _message_callback, success_callback = nil) - socket.subscribe identifier, channel - success_callback&.call - end - - def unsubscribe(channel, _message_callback) - if channel == ALL_STREAMS - socket.unsubscribe_from_all identifier - else - socket.unsubscribe identifier, channel - end + def public_request + request end - - private - - def identifier = Current.identifier end - attr_accessor :pubsub, :executor - - def self.for(server, socket) - new(server).tap do |srv| - srv.executor = EXECUTOR - srv.pubsub = PubSub.new(socket) + refine ActionCable::Channel::Base do + def rejected? + subscription_rejected? end end - end - - class Connection - class Subscriptions < ::ActionCable::Connection::Subscriptions - # Wrap the original #execute_command to pre-initialize the channel for unsubscribe/message and - # return true/false to indicate successful/unsuccessful subscription. - def execute_command(data) - cmd = data["command"] - - # We need the current channel identifier in pub/sub - Current.identifier = data["identifier"] - - load(data["identifier"]) unless cmd == "subscribe" - super + refine ActionCable::Connection::Subscriptions do + # Find or add a subscription to the list + def fetch(identifier) + add("identifier" => identifier) unless subscriptions[identifier] - return true unless cmd == "subscribe" - - subscription = subscriptions[data["identifier"]] - !(subscription.nil? || subscription.rejected?) - end - - # Restore channels from the list of identifiers and the state - def restore(subscriptions, istate) - subscriptions.each do |id| - channel = load(id) - channel.__istate__ = ActiveSupport::JSON.decode(istate[id]) if istate[id] + unless subscriptions[identifier] + raise "Channel not found: #{ActiveSupport::JSON.decode(identifier).fetch("channel")}" end - end - # Find or create a channel for a given identifier - def load(identifier) - return subscriptions[identifier] if subscriptions[identifier] - - subscription = subscription_from_identifier(identifier) - raise "Channel not found: #{ActiveSupport::JSON.decode(identifier).fetch("channel")}" unless subscription - - subscriptions[identifier] = subscription + subscriptions[identifier] end end + end) + class Connection # We store logger tags in the connection state to be able # to re-use them in the subsequent calls LOG_TAGS_IDENTIFIER = "__ltags__" - attr_reader :socket, :server - delegate :identifiers_json, to: :conn - delegate :cstate, :istate, to: :socket - def initialize(connection_class, socket, identifiers: nil, subscriptions: nil, server: ::ActionCable.server) - server = Server.for(server, socket) + attr_reader :socket, :logger + def initialize(connection_class, socket, identifiers: nil, subscriptions: nil) @socket = socket - @server = server - # TODO: Move protocol to socket.env as "anycable.protocol" - @protocol = "actioncable-v1-json" logger_tags = fetch_logger_tags_from_state - @logger = ActionCable::Server::TaggedLoggerProxy.new(AnyCable.logger, tags: logger_tags) - - @conn = connection_class.new(server, self) - conn.subscriptions = Subscriptions.new(conn) - conn.identifiers_json = identifiers - conn.anycable_socket = socket - conn.subscriptions.restore(subscriptions, socket.istate) if subscriptions + @logger = ActionCable::Connection::TaggedLoggerProxy.new(AnyCable.logger, tags: logger_tags) + + # Instead of calling #initialize, + # we allocate an instance and setup all the required components manually + @conn = connection_class.allocate + # Required to access config (for access origin checks) + conn.server = ActionCable.server + conn.logger = logger + conn.anycable_socket = conn.websocket = socket + conn.env = socket.env + conn.coder = ActiveSupport::JSON + conn.subscriptions = ActionCable::Connection::Subscriptions.new(conn) + conn.serialized_ids = {} + conn.serialized_ids = ActiveSupport::JSON.decode(identifiers) if identifiers + conn.cached_ids = {} + conn.anycable_request_builder = self + + return unless subscriptions + + # Pre-initialize channels (for disconnect) + subscriptions.each do |id| + channel = conn.subscriptions.fetch(id) + next unless socket.istate[id] + + channel.__istate__ = ActiveSupport::JSON.decode(socket.istate[id]) + end end - # == AnyCable RPC interface [BEGIN] == def handle_open logger.info started_request_message if access_logs? - return close unless allow_request_origin? + verify_origin! || return - conn.handle_open + conn.connect if conn.respond_to?(:connect) - # Commit log tags to the connection state socket.cstate.write(LOG_TAGS_IDENTIFIER, logger.tags.to_json) unless logger.tags.empty? - socket.closed? + conn.send_welcome_message + rescue ::ActionCable::Connection::Authorization::UnauthorizedError + reject_request( + ActionCable::INTERNAL[:disconnect_reasons]&.[](:unauthorized) || "unauthorized" + ) end def handle_close - conn.handle_close - close + logger.info finished_request_message if access_logs? + + conn.subscriptions.unsubscribe_from_all + conn.disconnect if conn.respond_to?(:disconnect) true end def handle_channel_command(identifier, command, data) - conn.handle_incoming({"command" => command, "identifier" => identifier, "data" => data}) - end - # == AnyCable RPC interface [END] == - - # == Action Cable socket interface [BEGIN] - attr_reader :protocol, :logger - - def request - @request ||= begin - env = socket.env - environment = ::Rails.application.env_config.merge(env) if defined?(::Rails.application) && ::Rails.application - AnyCable::Rails::Rack.app.call(environment) if environment - - ActionDispatch::Request.new(environment || env) + conn.run_callbacks :command do + # We cannot use subscriptions#execute_command here, + # since we MUST return true of false, depending on the status + # of execution + channel = conn.subscriptions.fetch(identifier) + case command + when "subscribe" + channel.handle_subscribe + !channel.rejected? + when "unsubscribe" + conn.subscriptions.remove_subscription(channel) + true + when "message" + channel.perform_action ActiveSupport::JSON.decode(data) + true + else + false + end end + # Support rescue_from + # https://github.com/rails/rails/commit/d2571e560c62116f60429c933d0c41a0e249b58b + rescue Exception => e # rubocop:disable Lint/RescueException + rescue_with_handler(e) || raise + false end - delegate :env, to: :request + def build_rack_request(env) + environment = ::Rails.application.env_config.merge(env) if defined?(::Rails.application) && ::Rails.application + AnyCable::Rails::Rack.app.call(environment) if environment - def transmit(data) - socket.transmit ActiveSupport::JSON.encode(data) + ActionDispatch::Request.new(environment || env) end - def close(...) - return if socket.closed? - logger.info finished_request_message if access_logs? - socket.close(...) + def action_cable_connection + conn end - def perform_work(receiver, method_name, *args) - raise ArgumentError, "Performing work is not supported within AnyCable" - end - # == Action Cable socket interface [END] - private attr_reader :conn + def reject_request(reason, reconnect = false) + logger.info finished_request_message("Rejected") if access_logs? + conn.close( + reason: reason, + reconnect: reconnect + ) + end + def fetch_logger_tags_from_state socket.cstate.read(LOG_TAGS_IDENTIFIER).yield_self do |raw_tags| next [] unless raw_tags @@ -205,22 +173,39 @@ def started_request_message ) end - def finished_request_message + def finished_request_message(reason = "Closed") format( - 'Finished "%s"%s for %s at %s', - request.filtered_path, " [AnyCable]", request.ip, Time.now.to_s + 'Finished "%s"%s for %s at %s (%s)', + request.filtered_path, " [AnyCable]", request.ip, Time.now.to_s, reason ) end - def allow_request_origin? + def verify_origin! return true unless socket.env.key?("HTTP_ORIGIN") - server.allow_request_origin?(socket.env) + return true if conn.send(:allow_request_origin?) + + reject_request( + ActionCable::INTERNAL[:disconnect_reasons]&.[](:invalid_request) || "invalid_request" + ) + false end def access_logs? AnyCable.config.access_logs_disabled == false end + + def request + conn.public_request + end + + def request_loaded? + conn.instance_variable_defined?(:@request) + end + + def rescue_with_handler(e) + conn.rescue_with_handler(e) if conn.respond_to?(:rescue_with_handler) + end end end end diff --git a/lib/anycable/rails/connection_factory.rb b/lib/anycable/rails/connection_factory.rb index a8d1268..473c7a2 100644 --- a/lib/anycable/rails/connection_factory.rb +++ b/lib/anycable/rails/connection_factory.rb @@ -1,6 +1,25 @@ # frozen_string_literal: true -require "anycable/rails/connection" +require "action_cable" + +begin + ActionCable::Server::Socket +rescue +end + +if defined?(ActionCable::Server::Socket) + require "anycable/rails/next/connection" + require "anycable/rails/next/action_cable_ext/connection" + require "anycable/rails/next/action_cable_ext/channel" +else + require "anycable/rails/connection" + + require "anycable/rails/action_cable_ext/connection" + require "anycable/rails/action_cable_ext/channel" +end + +require "anycable/rails/action_cable_ext/remote_connections" +require "anycable/rails/action_cable_ext/broadcast_options" module AnyCable module Rails diff --git a/lib/anycable/rails/connections/persistent_session.rb b/lib/anycable/rails/connections/persistent_session.rb index 7212669..4d7daf6 100644 --- a/lib/anycable/rails/connections/persistent_session.rb +++ b/lib/anycable/rails/connections/persistent_session.rb @@ -2,6 +2,11 @@ require "anycable/rails/connections/session_proxy" +if defined?(ActionCable::Server::Socket) + require "anycable/rails/next/connection/persistent_session" + return +end + module AnyCable module Rails module Connections @@ -14,9 +19,10 @@ def handle_channel_command(*) super.tap { commit_session! } end - def request - @request ||= super.tap do |req| - next unless socket.session + def build_rack_request(env) + return super unless socket.session + + super.tap do |req| req.env[::Rack::RACK_SESSION] = SessionProxy.new(req.env[::Rack::RACK_SESSION], socket.session) end @@ -25,7 +31,7 @@ def request private def commit_session! - return unless defined?(@request) && request.session.respond_to?(:loaded?) && request.session.loaded? + return unless request_loaded? && request.session.respond_to?(:loaded?) && request.session.loaded? socket.session = request.session.to_json end diff --git a/lib/anycable/rails/next/action_cable_ext/channel.rb b/lib/anycable/rails/next/action_cable_ext/channel.rb new file mode 100644 index 0000000..1d59937 --- /dev/null +++ b/lib/anycable/rails/next/action_cable_ext/channel.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +require "action_cable" + +ActionCable::Channel::Base.prepend(Module.new do + # Whispering support + def whispers_to(broadcasting) + return super unless anycabled? + + connection.anycable_socket.whisper identifier, broadcasting + end + + # Unsubscribing relies on the channel state (which is not persistent in AnyCable). + # Thus, we pretend that the stream is registered to make Action Cable do its unsubscribing job. + def stop_stream_from(broadcasting) + streams[broadcasting] = true if anycabled? + super + end + + # For AnyCable, unsubscribing from all streams is a separate operation, + # so we use a special constant to indicate it. + def stop_all_streams + if anycabled? + streams.clear + streams[AnyCable::Rails::Server::PubSub::ALL_STREAMS] = true + end + super + end + + # Make rejected status accessible from outside + def rejected? = subscription_rejected? + + private + + def anycabled? = connection.anycabled? +end) diff --git a/lib/anycable/rails/next/action_cable_ext/connection.rb b/lib/anycable/rails/next/action_cable_ext/connection.rb new file mode 100644 index 0000000..1d70c20 --- /dev/null +++ b/lib/anycable/rails/next/action_cable_ext/connection.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require "action_cable" +require "anycable/rails/connections/serializable_identification" + +ActionCable::Connection::Base.include(AnyCable::Rails::Connections::SerializableIdentification) +ActionCable::Connection::Base.prepend(Module.new do + def anycabled? + anycable_socket + end + + # Allow overriding #subscriptions to use a custom implementation + attr_writer :subscriptions + + # Alias for the #socket which is only set within AnyCable RPC context + attr_accessor :anycable_socket + + # Enhance #send_welcome_message to include sid if present + def send_welcome_message + transmit({ + type: ActionCable::INTERNAL[:message_types][:welcome], + sid: env["anycable.sid"] + }.compact) + end + + def subscribe_to_internal_channel + super unless anycabled? + end +end) diff --git a/lib/anycable/rails/next/connection.rb b/lib/anycable/rails/next/connection.rb new file mode 100644 index 0000000..9f19679 --- /dev/null +++ b/lib/anycable/rails/next/connection.rb @@ -0,0 +1,226 @@ +# frozen_string_literal: true + +require "action_cable" + +module AnyCable + module Rails + class Current < ActiveSupport::CurrentAttributes + attribute :identifier + end + + # Wrap ActionCable.server to provide a custom executor + # and a pubsub adapter + class Server < SimpleDelegator + # Implements an executor inteface + class Executor + class NoopTimer + def shutdown = nil + end + + NOOP_TIMER = NoopTimer.new.freeze + + def post(...) + raise NonImplementedError, "Executor#post is not implemented in AnyCable context" + end + + def timer(...) = NOOP_TIMER + end + + # A signleton executor for all connections + EXECUTOR = Executor.new.freeze + + # PubSub adapter to manage streams configuration + # for the underlying socket + class PubSub + private attr_reader :socket + + ALL_STREAMS = Data.define(:to_str).new("all") + + def initialize(socket) = @socket = socket + + def subscribe(channel, _message_callback, success_callback = nil) + socket.subscribe identifier, channel + success_callback&.call + end + + def unsubscribe(channel, _message_callback) + if channel == ALL_STREAMS + socket.unsubscribe_from_all identifier + else + socket.unsubscribe identifier, channel + end + end + + private + + def identifier = Current.identifier + end + + attr_accessor :pubsub, :executor + + def self.for(server, socket) + new(server).tap do |srv| + srv.executor = EXECUTOR + srv.pubsub = PubSub.new(socket) + end + end + end + + class Connection + class Subscriptions < ::ActionCable::Connection::Subscriptions + # Wrap the original #execute_command to pre-initialize the channel for unsubscribe/message and + # return true/false to indicate successful/unsuccessful subscription. + def execute_command(data) + cmd = data["command"] + + # We need the current channel identifier in pub/sub + Current.identifier = data["identifier"] + + load(data["identifier"]) unless cmd == "subscribe" + + super + + return true unless cmd == "subscribe" + + subscription = subscriptions[data["identifier"]] + !(subscription.nil? || subscription.rejected?) + end + + # Restore channels from the list of identifiers and the state + def restore(subscriptions, istate) + subscriptions.each do |id| + channel = load(id) + channel.__istate__ = ActiveSupport::JSON.decode(istate[id]) if istate[id] + end + end + + # Find or create a channel for a given identifier + def load(identifier) + return subscriptions[identifier] if subscriptions[identifier] + + subscription = subscription_from_identifier(identifier) + raise "Channel not found: #{ActiveSupport::JSON.decode(identifier).fetch("channel")}" unless subscription + + subscriptions[identifier] = subscription + end + end + + # We store logger tags in the connection state to be able + # to re-use them in the subsequent calls + LOG_TAGS_IDENTIFIER = "__ltags__" + + attr_reader :socket, :server + + delegate :identifiers_json, to: :conn + delegate :cstate, :istate, to: :socket + + def initialize(connection_class, socket, identifiers: nil, subscriptions: nil, server: ::ActionCable.server) + server = Server.for(server, socket) + + @socket = socket + @server = server + # TODO: Move protocol to socket.env as "anycable.protocol" + @protocol = "actioncable-v1-json" + + logger_tags = fetch_logger_tags_from_state + @logger = ActionCable::Server::TaggedLoggerProxy.new(AnyCable.logger, tags: logger_tags) + + @conn = connection_class.new(server, self) + conn.subscriptions = Subscriptions.new(conn) + conn.identifiers_json = identifiers + conn.anycable_socket = socket + conn.subscriptions.restore(subscriptions, socket.istate) if subscriptions + end + + # == AnyCable RPC interface [BEGIN] == + def handle_open + logger.info started_request_message if access_logs? + + return close unless allow_request_origin? + + conn.handle_open + + # Commit log tags to the connection state + socket.cstate.write(LOG_TAGS_IDENTIFIER, logger.tags.to_json) unless logger.tags.empty? + + socket.closed? + end + + def handle_close + conn.handle_close + close + true + end + + def handle_channel_command(identifier, command, data) + conn.handle_incoming({"command" => command, "identifier" => identifier, "data" => data}) + end + # == AnyCable RPC interface [END] == + + # == Action Cable socket interface [BEGIN] + attr_reader :protocol, :logger + + def request + @request ||= begin + env = socket.env + environment = ::Rails.application.env_config.merge(env) if defined?(::Rails.application) && ::Rails.application + AnyCable::Rails::Rack.app.call(environment) if environment + + ActionDispatch::Request.new(environment || env) + end + end + + delegate :env, to: :request + + def transmit(data) + socket.transmit ActiveSupport::JSON.encode(data) + end + + def close(...) + return if socket.closed? + logger.info finished_request_message if access_logs? + socket.close(...) + end + + def perform_work(receiver, method_name, *args) + raise ArgumentError, "Performing work is not supported within AnyCable" + end + # == Action Cable socket interface [END] + + private + + attr_reader :conn + + def fetch_logger_tags_from_state + socket.cstate.read(LOG_TAGS_IDENTIFIER).yield_self do |raw_tags| + next [] unless raw_tags + ActiveSupport::JSON.decode(raw_tags) + end + end + + def started_request_message + format( + 'Started "%s"%s for %s at %s', + request.filtered_path, " [AnyCable]", request.ip, Time.now.to_s + ) + end + + def finished_request_message + format( + 'Finished "%s"%s for %s at %s', + request.filtered_path, " [AnyCable]", request.ip, Time.now.to_s + ) + end + + def allow_request_origin? + return true unless socket.env.key?("HTTP_ORIGIN") + + server.allow_request_origin?(socket.env) + end + + def access_logs? + AnyCable.config.access_logs_disabled == false + end + end + end +end diff --git a/lib/anycable/rails/next/connection/persistent_session.rb b/lib/anycable/rails/next/connection/persistent_session.rb new file mode 100644 index 0000000..7212669 --- /dev/null +++ b/lib/anycable/rails/next/connection/persistent_session.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +require "anycable/rails/connections/session_proxy" + +module AnyCable + module Rails + module Connections + module PersistentSession + def handle_open + super.tap { commit_session! } + end + + def handle_channel_command(*) + super.tap { commit_session! } + end + + def request + @request ||= super.tap do |req| + next unless socket.session + req.env[::Rack::RACK_SESSION] = + SessionProxy.new(req.env[::Rack::RACK_SESSION], socket.session) + end + end + + private + + def commit_session! + return unless defined?(@request) && request.session.respond_to?(:loaded?) && request.session.loaded? + + socket.session = request.session.to_json + end + end + end + end +end + +AnyCable::Rails::Connection.prepend( + AnyCable::Rails::Connections::PersistentSession +) diff --git a/lib/anycable/rails/railtie.rb b/lib/anycable/rails/railtie.rb index af64696..c0ebdd9 100644 --- a/lib/anycable/rails/railtie.rb +++ b/lib/anycable/rails/railtie.rb @@ -1,10 +1,5 @@ # frozen_string_literal: true -require "anycable/rails/action_cable_ext/connection" -require "anycable/rails/action_cable_ext/channel" -require "anycable/rails/action_cable_ext/remote_connections" -require "anycable/rails/action_cable_ext/broadcast_options" - require "anycable/rails/channel_state" require "anycable/rails/connection_factory" diff --git a/spec/integrations/action_cable_testing_spec.rb b/spec/integrations/action_cable_testing_spec.rb index 9832895..1324f93 100644 --- a/spec/integrations/action_cable_testing_spec.rb +++ b/spec/integrations/action_cable_testing_spec.rb @@ -44,6 +44,10 @@ def connection_class end context "connection tests" do + let(:transmissions) do + defined?(socket) ? socket.transmissions : connection.transmissions + end + specify "connect" do connect "/cable?token=123", session: {username: user.name} @@ -72,7 +76,7 @@ def connection_class } ) - expect(socket.transmissions.last["type"]).to eq("confirm_subscription") + expect(transmissions.last["type"]).to eq("confirm_subscription") expect(ApplicationCable::Connection.events_log.last[:source]).to eq "command" end end diff --git a/spec/lib/anycable/rails/ext/jwt_spec.rb b/spec/lib/anycable/rails/ext/jwt_spec.rb index 7f57520..601fdfb 100644 --- a/spec/lib/anycable/rails/ext/jwt_spec.rb +++ b/spec/lib/anycable/rails/ext/jwt_spec.rb @@ -74,12 +74,31 @@ def connect it "rejects with token_expired reason when expired" do token = AnyCable::JWT.encode({user: user}, expires_at: 1.minute.ago) - expect { connect params: {joken: token} }.to raise_error(AnyCable::JWT::ExpiredSignature) + # Action Cable Next + if defined?(testserver) + expect { connect params: {joken: token} }.to raise_error(AnyCable::JWT::ExpiredSignature) - # now we can re-use the same connection info and call #handle_open directly - conn = self.class.connection_class.new(testserver, socket) - conn.handle_open + # now we can re-use the same connection info and call #handle_open directly + conn = self.class.connection_class.new(testserver, socket) + conn.handle_open - expect(socket.transmissions.last["reason"]).to eq "token_expired" + expect(socket.transmissions.last["reason"]).to eq "token_expired" + else + req = ActionDispatch::TestRequest.create({"QUERY_STRING" => "joken=#{token}", "PATH_INFO" => "/cable"}) + conn = AnyCableTestConnection.allocate + + ws = double("websocket") + allow(ws).to receive(:alive?) { true } + expect(ws).to receive(:close) + + allow(conn).to receive(:websocket) { ws } + + conn.singleton_class.include(ActionCable::Connection::TestConnection) + conn.send(:initialize, req) + + conn.handle_open + + expect(conn.transmissions.last["reason"]).to eq "token_expired" + end end end diff --git a/spec/lib/anycable/rails/ext/signed_streams_spec.rb b/spec/lib/anycable/rails/ext/signed_streams_spec.rb index aa25749..f534e83 100644 --- a/spec/lib/anycable/rails/ext/signed_streams_spec.rb +++ b/spec/lib/anycable/rails/ext/signed_streams_spec.rb @@ -21,9 +21,33 @@ def connection_class AnyCable.config.streams_secret = was_secret end - let(:conn) { connect } + let(:legacy_conn) do + req = ActionDispatch::TestRequest.create({"PATH_INFO" => "/cable"}) + conn = AnyCableTestConnection.allocate + + ws = double("websocket") + allow(ws).to receive(:alive?) { true } + allow(ws).to receive(:close) + allow(conn).to receive(:websocket) { ws } + + conn.singleton_class.include(ActionCable::Connection::TestConnection) + conn.send(:initialize, req) + conn + end + + before do + next if defined?(socket) + + allow_any_instance_of(AnyCable::Rails::PubSubChannel).to receive(:stream_from) + end + + let(:conn) { defined?(socket) ? connect : legacy_conn } + + let(:transmissions) do + defined?(socket) ? socket.transmissions : conn.transmissions + end - let(:transmission) { socket.transmissions.last } + let(:transmission) { transmissions.last } let(:user) { User.create!(name: "jack") } diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 1bbc4a0..19f4253 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -10,6 +10,7 @@ end require "rspec/rails" +require "action_cable/next/rspec" if ENV["NEXT_ACTION_CABLE"] == "1" Rails.application.eager_load!