diff --git a/.rubocop.yml b/.rubocop.yml index 1758b32..9b4d464 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -25,3 +25,7 @@ RSpec/IndexedLet: RSpec/NestedGroups: Max: 5 + +Sequel/IrreversibleMigration: + Exclude: + - "**/*_spec.rb" diff --git a/lib/que/adapters/active_record.rb b/lib/que/adapters/active_record.rb index 6a4b02f..c5f8c7f 100644 --- a/lib/que/adapters/active_record.rb +++ b/lib/que/adapters/active_record.rb @@ -85,8 +85,26 @@ def add_to_transaction private - def checkout_activerecord_adapter(&block) - ::ActiveRecord::Base.connection_pool.with_connection(&block) + def checkout_activerecord_adapter + ::ActiveRecord::Base.connection_pool.with_connection do |conn| + yield conn + rescue ::PG::Error, ::ActiveRecord::StatementInvalid => e + remove_dead_connections(e) + raise + end + end + + def remove_dead_connections(exception) + # Cater for errors both from a raw connection or a connection adapter, + # since the calling code could use either. + cause = exception.is_a?(::PG::Error) ? exception : exception.cause + + return unless cause.instance_of?(::PG::UnableToSend) || + cause.instance_of?(::PG::ConnectionBad) + + ::ActiveRecord::Base.connection_pool.connections. + filter { |conn| conn.owner == ActiveSupport::IsolatedExecutionState.context }. + each { |failed| failed.pool.remove(failed) } end end end diff --git a/spec/lib/que/worker_spec.rb b/spec/lib/que/worker_spec.rb index 400ec0f..8f45fc1 100644 --- a/spec/lib/que/worker_spec.rb +++ b/spec/lib/que/worker_spec.rb @@ -216,6 +216,31 @@ end end + context "when postgres raises a bad connection error while processing a job" do + before do + allow(Que).to receive(:execute). + with(:lock_job, ["default", 0]). + and_raise(PG::ConnectionBad) + + # Ensure we don't have any currently leased connections, since in a thread + # using with_connection this would never be the case (but in specs it + # sometimes is). + pool.disconnect! + end + + let(:pool) { ActiveRecord::Base.connection_pool } + + it "rescues it and returns an error" do + FakeJob.enqueue(1) + + expect(work).to eq(:postgres_error) + end + + it "removes the connection from the connection pool" do + expect { work }.to_not change { pool.connections.count }.from(0) + end + end + context "when we time out checking out a new connection" do it "rescues it and returns an error" do FakeJob.enqueue(1)