Skip to content

Commit

Permalink
Remove dead connections from the pool on connection errors. (#111)
Browse files Browse the repository at this point in the history
With the introduction of Yugabyte, there is now the possibility that
connections in the pool may become "dead" when new nodes are rotated
into a cluster, since they end up pointing at a node which no longer
exists. In this case, we can proactively remove these connections from
the connection pool when we see the corresponding exceptions.

As long as jobs are retryable the work will still eventually be
processed - this merely prevents us from being stuck in a pathological
case where the pool contains dead connections and nothing is
automatically clearing them out (e.g. a deployment killing the pod).
  • Loading branch information
benk-gc authored Oct 28, 2024
1 parent 5b3d374 commit b6bab65
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 2 deletions.
4 changes: 4 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ RSpec/IndexedLet:

RSpec/NestedGroups:
Max: 5

Sequel/IrreversibleMigration:
Exclude:
- "**/*_spec.rb"
22 changes: 20 additions & 2 deletions lib/que/adapters/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,26 @@ def add_to_transaction

private

def checkout_activerecord_adapter(&block)
::ActiveRecord::Base.connection_pool.with_connection(&block)
def checkout_activerecord_adapter
::ActiveRecord::Base.connection_pool.with_connection do |conn|
yield conn
rescue ::PG::Error, ::ActiveRecord::StatementInvalid => e
remove_dead_connections(e)
raise
end
end

def remove_dead_connections(exception)
# Cater for errors both from a raw connection or a connection adapter,
# since the calling code could use either.
cause = exception.is_a?(::PG::Error) ? exception : exception.cause

return unless cause.instance_of?(::PG::UnableToSend) ||
cause.instance_of?(::PG::ConnectionBad)

::ActiveRecord::Base.connection_pool.connections.
filter { |conn| conn.owner == ActiveSupport::IsolatedExecutionState.context }.
each { |failed| failed.pool.remove(failed) }
end
end
end
Expand Down
25 changes: 25 additions & 0 deletions spec/lib/que/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,31 @@
end
end

context "when postgres raises a bad connection error while processing a job" do
before do
allow(Que).to receive(:execute).
with(:lock_job, ["default", 0]).
and_raise(PG::ConnectionBad)

# Ensure we don't have any currently leased connections, since in a thread
# using with_connection this would never be the case (but in specs it
# sometimes is).
pool.disconnect!
end

let(:pool) { ActiveRecord::Base.connection_pool }

it "rescues it and returns an error" do
FakeJob.enqueue(1)

expect(work).to eq(:postgres_error)
end

it "removes the connection from the connection pool" do
expect { work }.to_not change { pool.connections.count }.from(0)
end
end

context "when we time out checking out a new connection" do
it "rescues it and returns an error" do
FakeJob.enqueue(1)
Expand Down

0 comments on commit b6bab65

Please sign in to comment.