diff --git a/CHANGELOG.md b/CHANGELOG.md index 34b4602..8abe0bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## master +- Refactored Action Cable patching to preserve original functionality and avoid monkey-patching collisions. ([@palkan][]) + ## 1.2.1 (2021-01-31) - Add a temporary fix to be compatible with `sentry-rails`. ([@palkan][]) diff --git a/Gemfile b/Gemfile index 9818356..c01b02d 100644 --- a/Gemfile +++ b/Gemfile @@ -2,7 +2,7 @@ source 'https://rubygems.org' gemspec -gem "pry-byebug", platform: :mri +gem "debug", platform: :mri local_gemfile = "#{File.dirname(__FILE__)}/Gemfile.local" @@ -11,7 +11,8 @@ eval_gemfile "gemfiles/rubocop.gemfile" if File.exist?(local_gemfile) eval(File.read(local_gemfile)) # rubocop:disable Lint/Eval else - gem 'sqlite3', '~> 1.3' - gem 'actioncable', '~> 6.0' - gem 'activerecord' + gem 'actioncable', '~> 7.0' end + +gem 'sqlite3', '~> 1.3' +gem 'activerecord' diff --git a/lib/anycable/rails/action_cable_ext/channel.rb b/lib/anycable/rails/action_cable_ext/channel.rb new file mode 100644 index 0000000..19c9814 --- /dev/null +++ b/lib/anycable/rails/action_cable_ext/channel.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +require "action_cable/channel" + +ActionCable::Channel::Base.prepend(Module.new do + def subscribe_to_channel(force: false) + return if anycabled? && !force + super() + end + + def handle_subscribe + subscribe_to_channel(force: true) + end + + def start_periodic_timers + super unless anycabled? + end + + def stop_periodic_timers + super unless anycabled? + end + + def stream_from(broadcasting, _callback = nil, _options = {}) + return super unless anycabled? + + connection.anycable_socket.subscribe identifier, broadcasting + end + + def stop_stream_from(broadcasting) + return super unless anycabled? + + connection.anycable_socket.unsubscribe identifier, broadcasting + end + + def stop_all_streams + return super unless anycabled? + + connection.anycable_socket.unsubscribe_from_all identifier + end + + private + + def anycabled? + # Use instance variable check here for testing compatibility + connection.instance_variable_defined?(:@anycable_socket) + end +end) diff --git a/lib/anycable/rails/action_cable_ext/connection.rb b/lib/anycable/rails/action_cable_ext/connection.rb new file mode 100644 index 0000000..769c870 --- /dev/null +++ b/lib/anycable/rails/action_cable_ext/connection.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require "action_cable/connection" +require "anycable/rails/connections/serializable_identification" + +ActionCable::Connection::Base.include(AnyCable::Rails::Connections::SerializableIdentification) +ActionCable::Connection::Base.prepend(Module.new do + attr_reader :anycable_socket + attr_accessor :anycable_request_builder + + # In AnyCable, we lazily populate env by passing it through the middleware chain, + # so we access it via #request + def env + return super unless anycabled? + + request.env + end + + def anycabled? + @anycable_socket + end + + private + + def request + return super unless anycabled? + + @request ||= anycable_request_builder.build_rack_request(@env) + end +end) diff --git a/lib/anycable/rails/actioncable/remote_connections.rb b/lib/anycable/rails/action_cable_ext/remote_connections.rb similarity index 72% rename from lib/anycable/rails/actioncable/remote_connections.rb rename to lib/anycable/rails/action_cable_ext/remote_connections.rb index 08a86d6..c58ebf5 100644 --- a/lib/anycable/rails/actioncable/remote_connections.rb +++ b/lib/anycable/rails/action_cable_ext/remote_connections.rb @@ -2,7 +2,7 @@ require "action_cable/remote_connections" -ActionCable::RemoteConnections::RemoteConnection.include(ActionCable::Connection::SerializableIdentification) +ActionCable::RemoteConnections::RemoteConnection.include(AnyCable::Rails::Connections::SerializableIdentification) ActionCable::RemoteConnections::RemoteConnection.prepend(Module.new do def disconnect(reconnect: true) diff --git a/lib/anycable/rails/actioncable/channel.rb b/lib/anycable/rails/actioncable/channel.rb deleted file mode 100644 index f5c8ed1..0000000 --- a/lib/anycable/rails/actioncable/channel.rb +++ /dev/null @@ -1,40 +0,0 @@ -# frozen_string_literal: true - -require "action_cable/channel" - -module ActionCable - module Channel - class Base # :nodoc: - alias_method :handle_subscribe, :subscribe_to_channel - - public :handle_subscribe, :subscription_rejected? - - # Action Cable calls this method from inside the Subscriptions#add - # method, which we use to initialize the channel instance. - # We don't want to invoke `subscribed` callbacks every time we do that. - def subscribe_to_channel - # noop - end - - def start_periodic_timers - # noop - end - - def stop_periodic_timers - # noop - end - - def stream_from(broadcasting, _callback = nil, _options = {}) - connection.socket.subscribe identifier, broadcasting - end - - def stop_stream_from(broadcasting) - connection.socket.unsubscribe identifier, broadcasting - end - - def stop_all_streams - connection.socket.unsubscribe_from_all identifier - end - end - end -end diff --git a/lib/anycable/rails/actioncable/connection.rb b/lib/anycable/rails/actioncable/connection.rb deleted file mode 100644 index 357a7f5..0000000 --- a/lib/anycable/rails/actioncable/connection.rb +++ /dev/null @@ -1,220 +0,0 @@ -# frozen_string_literal: true - -require "action_cable/connection" -require "anycable/rails/actioncable/connection/serializable_identification" -require "anycable/rails/refinements/subscriptions" -require "anycable/rails/actioncable/channel" -require "anycable/rails/actioncable/remote_connections" -require "anycable/rails/session_proxy" - -module ActionCable - module Connection - class Base # :nodoc: - # We store logger tags in the connection state to be able - # to re-use them in the subsequent calls - LOG_TAGS_IDENTIFIER = "__ltags__" - - using AnyCable::Refinements::Subscriptions - - include SerializableIdentification - - attr_reader :socket - - alias_method :anycable_socket, :socket - - delegate :env, :session, to: :request - - class << self - def call(socket, **options) - new(socket, nil, **options) - end - end - - def initialize(socket, env, identifiers: "{}", subscriptions: nil) - if env - # If env is set, then somehow we're in the context of Action Cable - # Return and print a warning in #process - @request = ActionDispatch::Request.new(env) - return - end - - @ids = ActiveSupport::JSON.decode(identifiers) - - @ltags = socket.cstate.read(LOG_TAGS_IDENTIFIER).yield_self do |raw_tags| - next unless raw_tags - ActiveSupport::JSON.decode(raw_tags) - end - - @cached_ids = {} - @coder = ActiveSupport::JSON - @socket = socket - @subscriptions = ActionCable::Connection::Subscriptions.new(self) - - return unless subscriptions - - # Initialize channels (for disconnect) - subscriptions.each do |id| - channel = @subscriptions.fetch(id) - next unless socket.istate[id] - - channel.__istate__ = ActiveSupport::JSON.decode(socket.istate[id]) - end - end - - def process - # Use Rails logger here to print to stdout in development - logger.error invalid_request_message - logger.info finished_request_message - [404, {"Content-Type" => "text/plain"}, ["Page not found"]] - end - - def invalid_request_message - "You're trying to connect to Action Cable server while using AnyCable. " \ - "See https://docs.anycable.io/troubleshooting?id=server-raises-an-argumenterror-exception-when-client-tries-to-connect" - end - - def handle_open - logger.info started_request_message if access_logs? - - verify_origin! || return - - connect if respond_to?(:connect) - - socket.cstate.write(LOG_TAGS_IDENTIFIER, fetch_ltags.to_json) - - send_welcome_message - rescue ActionCable::Connection::Authorization::UnauthorizedError - reject_request( - ActionCable::INTERNAL[:disconnect_reasons]&.[](:unauthorized) || "unauthorized" - ) - end - - def handle_close - logger.info finished_request_message if access_logs? - - subscriptions.unsubscribe_from_all - disconnect if respond_to?(:disconnect) - true - end - - def handle_channel_command(identifier, command, data) - channel = subscriptions.fetch(identifier) - case command - when "subscribe" - channel.handle_subscribe - !channel.subscription_rejected? - when "unsubscribe" - subscriptions.remove_subscription(channel) - true - when "message" - channel.perform_action ActiveSupport::JSON.decode(data) - true - else - false - end - end - # rubocop:enable Metrics/MethodLength - - def close(reason: nil, reconnect: nil) - transmit( - type: ActionCable::INTERNAL[:message_types].fetch(:disconnect, "disconnect"), - reason: reason, - reconnect: reconnect - ) - socket.close - end - - def transmit(cable_message) - socket.transmit encode(cable_message) - end - - def logger - @logger ||= TaggedLoggerProxy.new(AnyCable.logger, tags: ltags || []) - end - - def request - @request ||= build_rack_request - end - - private - - attr_reader :ids, :ltags - - def started_request_message - format( - 'Started "%s"%s for %s at %s', - request.filtered_path, " [AnyCable]", request.ip, Time.now.to_s - ) - end - - def finished_request_message(reason = "Closed") - format( - 'Finished "%s"%s for %s at %s (%s)', - request.filtered_path, " [AnyCable]", request.ip, Time.now.to_s, reason - ) - end - - def access_logs? - AnyCable.config.access_logs_disabled == false - end - - def fetch_ltags - if instance_variable_defined?(:@logger) - logger.tags - else - ltags - end - end - - def server - ActionCable.server - end - - def verify_origin! - return true unless socket.env.key?("HTTP_ORIGIN") - - return true if allow_request_origin? - - reject_request( - ActionCable::INTERNAL[:disconnect_reasons]&.[](:invalid_request) || "invalid_request" - ) - false - end - - def reject_request(reason, reconnect = false) - logger.info finished_request_message("Rejected") if access_logs? - close( - reason: reason, - reconnect: reconnect - ) - end - - def build_rack_request - environment = Rails.application.env_config.merge(socket.env) - AnyCable::Rails::Rack.app.call(environment) - - ActionDispatch::Request.new(environment) - end - - def request_loaded? - instance_variable_defined?(:@request) - end - end - # rubocop:enable Metrics/ClassLength - end -end - -# Support rescue_from -# https://github.com/rails/rails/commit/d2571e560c62116f60429c933d0c41a0e249b58b -if ActionCable::Connection::Base.respond_to?(:rescue_from) - ActionCable::Connection::Base.prepend(Module.new do - def handle_channel_command(*) - super - rescue Exception => e # rubocop:disable Lint/RescueException - rescue_with_handler(e) || raise - false - end - end) -end - -require "anycable/rails/actioncable/testing" if ::Rails.env.test? diff --git a/lib/anycable/rails/actioncable/connection/persistent_session.rb b/lib/anycable/rails/actioncable/connection/persistent_session.rb deleted file mode 100644 index c141e37..0000000 --- a/lib/anycable/rails/actioncable/connection/persistent_session.rb +++ /dev/null @@ -1,34 +0,0 @@ -# frozen_string_literal: true - -module ActionCable - module Connection - module PersistentSession - def handle_open - super.tap { commit_session! } - end - - def handle_channel_command(*) - super.tap { commit_session! } - end - - def build_rack_request - return super unless socket.session - - super.tap do |req| - req.env[::Rack::RACK_SESSION] = - AnyCable::Rails::SessionProxy.new(req.env[::Rack::RACK_SESSION], socket.session) - end - end - - def commit_session! - return unless request_loaded? && request.session.respond_to?(:loaded?) && request.session.loaded? - - socket.session = request.session.to_json - end - end - end -end - -::ActionCable::Connection::Base.prepend( - ::ActionCable::Connection::PersistentSession -) diff --git a/lib/anycable/rails/actioncable/connection/serializable_identification.rb b/lib/anycable/rails/actioncable/connection/serializable_identification.rb deleted file mode 100644 index 7524554..0000000 --- a/lib/anycable/rails/actioncable/connection/serializable_identification.rb +++ /dev/null @@ -1,42 +0,0 @@ -# frozen_string_literal: true - -module ActionCable - module Connection - module SerializableIdentification - extend ActiveSupport::Concern - - class_methods do - def identified_by(*identifiers) - super - Array(identifiers).each do |identifier| - define_method(identifier) do - instance_variable_get(:"@#{identifier}") || fetch_identifier(identifier) - end - end - end - end - - # Generate identifiers info. - # Converts GlobalID compatible vars to corresponding global IDs params. - def identifiers_hash - identifiers.each_with_object({}) do |id, acc| - obj = instance_variable_get("@#{id}") - next unless obj - - acc[id] = AnyCable::Rails.serialize(obj) - end.compact - end - - def identifiers_json - identifiers_hash.to_json - end - - # Fetch identifier and deserialize if neccessary - def fetch_identifier(name) - @cached_ids[name] ||= @cached_ids.fetch(name) do - AnyCable::Rails.deserialize(ids[name.to_s]) - end - end - end - end -end diff --git a/lib/anycable/rails/actioncable/testing.rb b/lib/anycable/rails/actioncable/testing.rb deleted file mode 100644 index 87f7292..0000000 --- a/lib/anycable/rails/actioncable/testing.rb +++ /dev/null @@ -1,33 +0,0 @@ -# frozen_string_literal: true - -# This file contains patches to Action Cable testing modules - -# Trigger autoload (if constant is defined) -begin - ActionCable::Channel::TestCase # rubocop:disable Lint/Void - ActionCable::Connection::TestCase -rescue NameError - return -end - -ActionCable::Channel::ChannelStub.prepend(Module.new do - def subscribe_to_channel - handle_subscribe - end -end) - -ActionCable::Channel::ConnectionStub.prepend(Module.new do - def socket - @socket ||= AnyCable::Socket.new(env: AnyCable::Env.new(url: "http://test.host", headers: {})) - end - - alias_method :anycable_socket, :socket -end) - -ActionCable::Connection::TestConnection.prepend(Module.new do - def initialize(request) - @request = request - @cached_ids = {} - super - end -end) diff --git a/lib/anycable/rails/channel_state.rb b/lib/anycable/rails/channel_state.rb index 5ab12b7..daa5603 100644 --- a/lib/anycable/rails/channel_state.rb +++ b/lib/anycable/rails/channel_state.rb @@ -10,11 +10,11 @@ def state_attr_accessor(*names) class_eval <<~RUBY, __FILE__, __LINE__ + 1 def #{name} return @#{name} if instance_variable_defined?(:@#{name}) - @#{name} = AnyCable::Rails.deserialize(__istate__["#{name}"], json: true) if connection.anycable_socket + @#{name} = AnyCable::Rails.deserialize(__istate__["#{name}"], json: true) if anycabled? end def #{name}=(val) - __istate__["#{name}"] = AnyCable::Rails.serialize(val, json: true) if connection.anycable_socket + __istate__["#{name}"] = AnyCable::Rails.serialize(val, json: true) if anycabled? instance_variable_set(:@#{name}, val) end RUBY @@ -41,7 +41,7 @@ def self.included(base) attr_writer :__istate__ def __istate__ - @__istate__ ||= connection.socket.istate + @__istate__ ||= connection.anycable_socket.istate end end @@ -53,11 +53,11 @@ def state_attr_accessor(*names) class_eval <<~RUBY, __FILE__, __LINE__ + 1 def #{name} return @#{name} if instance_variable_defined?(:@#{name}) - @#{name} = AnyCable::Rails.deserialize(__cstate__["#{name}"], json: true) if anycable_socket + @#{name} = AnyCable::Rails.deserialize(__cstate__["#{name}"], json: true) if anycabled? end def #{name}=(val) - __cstate__["#{name}"] = AnyCable::Rails.serialize(val, json: true) if anycable_socket + __cstate__["#{name}"] = AnyCable::Rails.serialize(val, json: true) if anycabled? instance_variable_set(:@#{name}, val) end RUBY @@ -84,7 +84,7 @@ def self.included(base) attr_writer :__cstate__ def __cstate__ - @__cstate__ ||= socket.cstate + @__cstate__ ||= anycable_socket.cstate end end end diff --git a/lib/anycable/rails/connection.rb b/lib/anycable/rails/connection.rb new file mode 100644 index 0000000..2c938fc --- /dev/null +++ b/lib/anycable/rails/connection.rb @@ -0,0 +1,203 @@ +# frozen_string_literal: true + +require "action_cable/connection" +require "action_cable/channel" + +module AnyCable + module Rails + # Enhance Action Cable connection + using(Module.new do + refine ActionCable::Connection::Base do + attr_writer :env, :websocket, :logger, :coder, + :subscriptions, :serialized_ids, :cached_ids, :server, + :anycable_socket + + # Using public :send_welcome_message causes stack level too deep 🤷🏻‍♂️ + def public_send_welcome_message + send_welcome_message + end + + def public_request + request + end + end + + refine ActionCable::Channel::Base do + def rejected? + subscription_rejected? + end + end + + refine ActionCable::Connection::Subscriptions do + # Find or add a subscription to the list + def fetch(identifier) + add("identifier" => identifier) unless subscriptions[identifier] + + unless subscriptions[identifier] + raise "Channel not found: #{ActiveSupport::JSON.decode(identifier).fetch("channel")}" + end + + subscriptions[identifier] + end + end + end) + + class Connection + # We store logger tags in the connection state to be able + # to re-use them in the subsequent calls + LOG_TAGS_IDENTIFIER = "__ltags__" + + delegate :identifiers_json, to: :conn + + attr_reader :socket, :logger + + def initialize(connection_class, socket, identifiers: nil, subscriptions: nil) + @socket = socket + + logger_tags = fetch_logger_tags_from_state + @logger = ActionCable::Connection::TaggedLoggerProxy.new(AnyCable.logger, tags: logger_tags) + + # Instead of calling #initialize, + # we allocate an instance and setup all the required components manually + @conn = connection_class.allocate + # Required to access config (for access origin checks) + conn.server = ActionCable.server + conn.logger = logger + conn.anycable_socket = conn.websocket = socket + conn.env = socket.env + conn.coder = ActiveSupport::JSON + conn.subscriptions = ActionCable::Connection::Subscriptions.new(conn) + conn.serialized_ids = {} + conn.serialized_ids = ActiveSupport::JSON.decode(identifiers) if identifiers + conn.cached_ids = {} + conn.anycable_request_builder = self + + return unless subscriptions + + # Pre-initialize channels (for disconnect) + subscriptions.each do |id| + channel = conn.subscriptions.fetch(id) + next unless socket.istate[id] + + channel.__istate__ = ActiveSupport::JSON.decode(socket.istate[id]) + end + end + + def handle_open + logger.info started_request_message if access_logs? + + verify_origin! || return + + conn.connect if conn.respond_to?(:connect) + + socket.cstate.write(LOG_TAGS_IDENTIFIER, logger.tags.to_json) unless logger.tags.empty? + + conn.public_send_welcome_message + rescue ::ActionCable::Connection::Authorization::UnauthorizedError + reject_request( + ActionCable::INTERNAL[:disconnect_reasons]&.[](:unauthorized) || "unauthorized" + ) + end + + def handle_close + logger.info finished_request_message if access_logs? + + conn.subscriptions.unsubscribe_from_all + conn.disconnect if conn.respond_to?(:disconnect) + true + end + + def handle_channel_command(identifier, command, data) + # We cannot use subscriptions#execute_command here, + # since we MUST return true of false, depending on the status + # of execution + channel = conn.subscriptions.fetch(identifier) + case command + when "subscribe" + channel.handle_subscribe + !channel.rejected? + when "unsubscribe" + conn.subscriptions.remove_subscription(channel) + true + when "message" + channel.perform_action ActiveSupport::JSON.decode(data) + true + else + false + end + # Support rescue_from + # https://github.com/rails/rails/commit/d2571e560c62116f60429c933d0c41a0e249b58b + rescue Exception => e # rubocop:disable Lint/RescueException + rescue_with_handler(e) || raise + false + end + + def build_rack_request(env) + environment = ::Rails.application.env_config.merge(env) if defined?(::Rails.application) && ::Rails.application + AnyCable::Rails::Rack.app.call(environment) if environment + + ActionDispatch::Request.new(environment || env) + end + + private + + attr_reader :conn + + def reject_request(reason, reconnect = false) + logger.info finished_request_message("Rejected") if access_logs? + conn.close( + reason: reason, + reconnect: reconnect + ) + end + + def fetch_logger_tags_from_state + socket.cstate.read(LOG_TAGS_IDENTIFIER).yield_self do |raw_tags| + next [] unless raw_tags + ActiveSupport::JSON.decode(raw_tags) + end + end + + def started_request_message + format( + 'Started "%s"%s for %s at %s', + request.filtered_path, " [AnyCable]", request.ip, Time.now.to_s + ) + end + + def finished_request_message(reason = "Closed") + format( + 'Finished "%s"%s for %s at %s (%s)', + request.filtered_path, " [AnyCable]", request.ip, Time.now.to_s, reason + ) + end + + def verify_origin! + return true unless socket.env.key?("HTTP_ORIGIN") + + return true if conn.send(:allow_request_origin?) + + reject_request( + ActionCable::INTERNAL[:disconnect_reasons]&.[](:invalid_request) || "invalid_request" + ) + false + end + + def access_logs? + AnyCable.config.access_logs_disabled == false + end + + def request + conn.public_request + end + + def request_loaded? + conn.instance_variable_defined?(:@request) + end + + def rescue_with_handler(e) + conn.rescue_with_handler(e) if conn.respond_to?(:rescue_with_handler) + end + end + end +end diff --git a/lib/anycable/rails/connection_factory.rb b/lib/anycable/rails/connection_factory.rb new file mode 100644 index 0000000..9f2d0f3 --- /dev/null +++ b/lib/anycable/rails/connection_factory.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require "anycable/rails/connection" + +module AnyCable + module Rails + module ConnectionFactory + def self.call(socket, **options) + # TODO: Introduce a router to support multiple backends + connection_class = ActionCable.server.config.connection_class.call + AnyCable::Rails::Connection.new(connection_class, socket, **options) + end + end + end +end diff --git a/lib/anycable/rails/connections/persistent_session.rb b/lib/anycable/rails/connections/persistent_session.rb new file mode 100644 index 0000000..b7171a9 --- /dev/null +++ b/lib/anycable/rails/connections/persistent_session.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +require "anycable/rails/connections/session_proxy" + +module AnyCable + module Rails + module Connections + module PersistentSession + def handle_open + super.tap { commit_session! } + end + + def handle_channel_command(*) + super.tap { commit_session! } + end + + def build_rack_request(env) + return super unless socket.session + + super.tap do |req| + req.env[::Rack::RACK_SESSION] = + SessionProxy.new(req.env[::Rack::RACK_SESSION], socket.session) + end + end + + private + + def commit_session! + return unless request_loaded? && request.session.respond_to?(:loaded?) && request.session.loaded? + + socket.session = request.session.to_json + end + end + end + end +end + +AnyCable::Rails::Connection.prepend( + AnyCable::Rails::Connections::PersistentSession +) diff --git a/lib/anycable/rails/connections/serializable_identification.rb b/lib/anycable/rails/connections/serializable_identification.rb new file mode 100644 index 0000000..c4a1012 --- /dev/null +++ b/lib/anycable/rails/connections/serializable_identification.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module AnyCable + module Rails + module Connections + module SerializableIdentification + extend ActiveSupport::Concern + + class_methods do + def identified_by(*identifiers) + super + Array(identifiers).each do |identifier| + define_method(identifier) do + instance_variable_get(:"@#{identifier}") || fetch_identifier(identifier) + end + end + end + end + + # Generate identifiers info. + # Converts GlobalID compatible vars to corresponding global IDs params. + def identifiers_hash + identifiers.each_with_object({}) do |id, acc| + obj = instance_variable_get("@#{id}") + next unless obj + + acc[id] = AnyCable::Rails.serialize(obj) + end.compact + end + + def identifiers_json + identifiers_hash.to_json + end + + # Fetch identifier and deserialize if neccessary + def fetch_identifier(name) + return unless @cached_ids + + @cached_ids[name] ||= @cached_ids.fetch(name) do + AnyCable::Rails.deserialize(@serialized_ids[name.to_s]) + end + end + end + end + end +end diff --git a/lib/anycable/rails/connections/session_proxy.rb b/lib/anycable/rails/connections/session_proxy.rb new file mode 100644 index 0000000..90e4d6a --- /dev/null +++ b/lib/anycable/rails/connections/session_proxy.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +module AnyCable + module Rails + module Connections + # Wrap `request.session` to lazily load values provided + # in the RPC call (set by the previous calls) + class SessionProxy + attr_reader :rack_session, :socket_session + + def initialize(rack_session, socket_session) + @rack_session = rack_session + @socket_session = JSON.parse(socket_session).with_indifferent_access + end + + %i[has_key? [] []= fetch delete dig].each do |mid| + class_eval <<~CODE, __FILE__, __LINE__ + 1 + def #{mid}(*args, **kwargs, &block) + restore_key! args.first + rack_session.#{mid}(*args, **kwargs, &block) + end + CODE + end + + alias_method :include?, :has_key? + alias_method :key?, :has_key? + + %i[update merge! to_hash].each do |mid| + class_eval <<~CODE, __FILE__, __LINE__ + 1 + def #{mid}(*args, **kwargs, &block) + restore! + rack_session.#{mid}(*args, **kwargs, &block) + end + CODE + end + + alias_method :to_h, :to_hash + + def keys + rack_session.keys + socket_session.keys + end + + # Delegate both publuc and private methods to rack_session + def respond_to_missing?(name, include_private = false) + return false if name == :marshal_dump || name == :_dump + rack_session.respond_to?(name, include_private) || super + end + + def method_missing(method, *args, &block) + if rack_session.respond_to?(method, true) + rack_session.send(method, *args, &block) + else + super + end + end + + # This method is used by StimulusReflex to obtain `@by` + def instance_variable_get(name) + super || rack_session.instance_variable_get(name) + end + + private + + def restore! + socket_session.keys.each(&method(:restore_key!)) + end + + def restore_key!(key) + return unless socket_session.key?(key) + val = socket_session.delete(key) + rack_session[key] = + if val.is_a?(String) + GlobalID::Locator.locate(val) || val + else + val + end + end + end + end + end +end diff --git a/lib/anycable/rails/railtie.rb b/lib/anycable/rails/railtie.rb index 08c0dde..3a7b705 100644 --- a/lib/anycable/rails/railtie.rb +++ b/lib/anycable/rails/railtie.rb @@ -1,6 +1,11 @@ # frozen_string_literal: true +require "anycable/rails/action_cable_ext/connection" +require "anycable/rails/action_cable_ext/channel" +require "anycable/rails/action_cable_ext/remote_connections" + require "anycable/rails/channel_state" +require "anycable/rails/connection_factory" module AnyCable module Rails @@ -43,30 +48,16 @@ class Railtie < ::Rails::Railtie # :nodoc: initializer "anycable.connection_factory", after: "action_cable.set_configs" do |app| ActiveSupport.on_load(:action_cable) do - # Add AnyCable patch method stub (we use it in ChannelState to distinguish between Action Cable and AnyCable) - # NOTE: Method could be already defined if patch was loaded manually - ActionCable::Connection::Base.attr_reader(:anycable_socket) unless ActionCable::Connection::Base.method_defined?(:anycable_socket) - app.config.to_prepare do - AnyCable.connection_factory = ActionCable.server.config.connection_class.call + AnyCable.connection_factory = AnyCable::Rails::ConnectionFactory end - if AnyCable::Rails.enabled? - require "anycable/rails/actioncable/connection" - if AnyCable.config.persistent_session_enabled - require "anycable/rails/actioncable/connection/persistent_session" - end + if AnyCable::Rails.enabled? && AnyCable.config.persistent_session_enabled + require "anycable/rails/connections/persistent_session" end end end - # Temp hack to fix Sentry vs AnyCable incompatibility - # See https://github.com/anycable/anycable-rails/issues/165 - initializer "anycable.sentry_hack", after: :"sentry.extend_action_cable" do - next unless defined?(::Sentry::Rails::ActionCableExtensions::Connection) - Sentry::Rails::ActionCableExtensions::Connection.send :public, :handle_open, :handle_close - end - # Since Rails 6.1 if respond_to?(:server) server do diff --git a/lib/anycable/rails/refinements/subscriptions.rb b/lib/anycable/rails/refinements/subscriptions.rb deleted file mode 100644 index c0b39f5..0000000 --- a/lib/anycable/rails/refinements/subscriptions.rb +++ /dev/null @@ -1,20 +0,0 @@ -# frozen_string_literal: true - -module AnyCable - module Refinements - module Subscriptions # :nodoc: - refine ActionCable::Connection::Subscriptions do - # Find or add a subscription to the list - def fetch(identifier) - add("identifier" => identifier) unless subscriptions[identifier] - - unless subscriptions[identifier] - raise "Channel not found: #{ActiveSupport::JSON.decode(identifier).fetch("channel")}" - end - - subscriptions[identifier] - end - end - end - end -end diff --git a/lib/anycable/rails/session_proxy.rb b/lib/anycable/rails/session_proxy.rb deleted file mode 100644 index 3c39718..0000000 --- a/lib/anycable/rails/session_proxy.rb +++ /dev/null @@ -1,79 +0,0 @@ -# frozen_string_literal: true - -module AnyCable - module Rails - # Wrap `request.session` to lazily load values provided - # in the RPC call (set by the previous calls) - class SessionProxy - attr_reader :rack_session, :socket_session - - def initialize(rack_session, socket_session) - @rack_session = rack_session - @socket_session = JSON.parse(socket_session).with_indifferent_access - end - - %i[has_key? [] []= fetch delete dig].each do |mid| - class_eval <<~CODE, __FILE__, __LINE__ + 1 - def #{mid}(*args, **kwargs, &block) - restore_key! args.first - rack_session.#{mid}(*args, **kwargs, &block) - end - CODE - end - - alias_method :include?, :has_key? - alias_method :key?, :has_key? - - %i[update merge! to_hash].each do |mid| - class_eval <<~CODE, __FILE__, __LINE__ + 1 - def #{mid}(*args, **kwargs, &block) - restore! - rack_session.#{mid}(*args, **kwargs, &block) - end - CODE - end - - alias_method :to_h, :to_hash - - def keys - rack_session.keys + socket_session.keys - end - - # Delegate both publuc and private methods to rack_session - def respond_to_missing?(name, include_private = false) - return false if name == :marshal_dump || name == :_dump - rack_session.respond_to?(name, include_private) || super - end - - def method_missing(method, *args, &block) - if rack_session.respond_to?(method, true) - rack_session.send(method, *args, &block) - else - super - end - end - - # This method is used by StimulusReflex to obtain `@by` - def instance_variable_get(name) - super || rack_session.instance_variable_get(name) - end - - private - - def restore! - socket_session.keys.each(&method(:restore_key!)) - end - - def restore_key!(key) - return unless socket_session.key?(key) - val = socket_session.delete(key) - rack_session[key] = - if val.is_a?(String) - GlobalID::Locator.locate(val) || val - else - val - end - end - end - end -end diff --git a/spec/base_spec_helper.rb b/spec/base_spec_helper.rb index ab7e334..efe74ce 100644 --- a/spec/base_spec_helper.rb +++ b/spec/base_spec_helper.rb @@ -3,8 +3,8 @@ ENV["RAILS_ENV"] = "test" begin - require "pry-byebug" -rescue LoadError + require "debug" +rescue LoadError, NoMethodError end PROJECT_ROOT = File.expand_path("../", __dir__) diff --git a/spec/dummy/app/channels/application_cable/channel.rb b/spec/dummy/app/channels/application_cable/channel.rb index e7c0666..4474e7b 100644 --- a/spec/dummy/app/channels/application_cable/channel.rb +++ b/spec/dummy/app/channels/application_cable/channel.rb @@ -2,13 +2,16 @@ module ApplicationCable class Channel < ActionCable::Channel::Base - delegate :request, to: :connection delegate :session, to: :request after_subscribe -> { log_event("subscribed") } after_unsubscribe -> { log_event("unsubscribed") } + def request + connection.send(:request) + end + private def log_event(type) diff --git a/spec/dummy/app/channels/application_cable/connection.rb b/spec/dummy/app/channels/application_cable/connection.rb index 96f1b7b..3c32027 100644 --- a/spec/dummy/app/channels/application_cable/connection.rb +++ b/spec/dummy/app/channels/application_cable/connection.rb @@ -16,6 +16,8 @@ def log_event(source, data) rescue_from ActiveRecord::RecordNotFound, with: :track_error end + delegate :session, to: :request + identified_by :current_user, :url state_attr_accessor :token diff --git a/spec/integrations/rails_integration_spec.rb b/spec/integrations/rails_integration_spec.rb index c6e2a8d..6fb3404 100644 --- a/spec/integrations/rails_integration_spec.rb +++ b/spec/integrations/rails_integration_spec.rb @@ -22,6 +22,6 @@ def app end it "assigns connection factory" do - expect(AnyCable.connection_factory).to eq(ApplicationCable::Connection) + expect(AnyCable.connection_factory).to eq(AnyCable::Rails::ConnectionFactory) end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 482c4d4..9935cc8 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -11,6 +11,13 @@ # This code is called from the server callback in Railtie AnyCable.logger = ActiveSupport::TaggedLogging.new(::ActionCable.server.config.logger) +if ENV["DEBUG_RPC_EXCEPTIONS"] + AnyCable.capture_exception do |ex, method, _| + $stdout.puts "Debugging RPC exception for ##{method}: #{ex.message}" + debugger # rubocop:disable Lint/Debugger + end +end + require "active_support/testing/stream" require "ammeter/init" require "anycable/rspec"