From b6bab65fbbf77c54fd5f1f2fde92638392be8b82 Mon Sep 17 00:00:00 2001 From: Ben Kyriakou <74675306+benk-gc@users.noreply.github.com> Date: Mon, 28 Oct 2024 14:27:07 +0000 Subject: [PATCH] Remove dead connections from the pool on connection errors. (#111) With the introduction of Yugabyte, there is now the possibility that connections in the pool may become "dead" when new nodes are rotated into a cluster, since they end up pointing at a node which no longer exists. In this case, we can proactively remove these connections from the connection pool when we see the corresponding exceptions. As long as jobs are retryable the work will still eventually be processed - this merely prevents us from being stuck in a pathological case where the pool contains dead connections and nothing is automatically clearing them out (e.g. a deployment killing the pod). --- .rubocop.yml | 4 ++++ lib/que/adapters/active_record.rb | 22 ++++++++++++++++++++-- spec/lib/que/worker_spec.rb | 25 +++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index 1758b32..9b4d464 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -25,3 +25,7 @@ RSpec/IndexedLet: RSpec/NestedGroups: Max: 5 + +Sequel/IrreversibleMigration: + Exclude: + - "**/*_spec.rb" diff --git a/lib/que/adapters/active_record.rb b/lib/que/adapters/active_record.rb index 6a4b02f..c5f8c7f 100644 --- a/lib/que/adapters/active_record.rb +++ b/lib/que/adapters/active_record.rb @@ -85,8 +85,26 @@ def add_to_transaction private - def checkout_activerecord_adapter(&block) - ::ActiveRecord::Base.connection_pool.with_connection(&block) + def checkout_activerecord_adapter + ::ActiveRecord::Base.connection_pool.with_connection do |conn| + yield conn + rescue ::PG::Error, ::ActiveRecord::StatementInvalid => e + remove_dead_connections(e) + raise + end + end + + def remove_dead_connections(exception) + # Cater for errors both from a raw connection or a connection adapter, + # since the calling code could use either. + cause = exception.is_a?(::PG::Error) ? exception : exception.cause + + return unless cause.instance_of?(::PG::UnableToSend) || + cause.instance_of?(::PG::ConnectionBad) + + ::ActiveRecord::Base.connection_pool.connections. + filter { |conn| conn.owner == ActiveSupport::IsolatedExecutionState.context }. + each { |failed| failed.pool.remove(failed) } end end end diff --git a/spec/lib/que/worker_spec.rb b/spec/lib/que/worker_spec.rb index 400ec0f..8f45fc1 100644 --- a/spec/lib/que/worker_spec.rb +++ b/spec/lib/que/worker_spec.rb @@ -216,6 +216,31 @@ end end + context "when postgres raises a bad connection error while processing a job" do + before do + allow(Que).to receive(:execute). + with(:lock_job, ["default", 0]). + and_raise(PG::ConnectionBad) + + # Ensure we don't have any currently leased connections, since in a thread + # using with_connection this would never be the case (but in specs it + # sometimes is). + pool.disconnect! + end + + let(:pool) { ActiveRecord::Base.connection_pool } + + it "rescues it and returns an error" do + FakeJob.enqueue(1) + + expect(work).to eq(:postgres_error) + end + + it "removes the connection from the connection pool" do + expect { work }.to_not change { pool.connections.count }.from(0) + end + end + context "when we time out checking out a new connection" do it "rescues it and returns an error" do FakeJob.enqueue(1)