Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes forking for workers. Should not corrupt pg connection anymore. #218

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/queue_classic/conn_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ class ConnAdapter

attr_accessor :connection
def initialize(c=nil)
@pid = Process.pid
@connection = c.nil? ? establish_new : validate!(c)
@mutex = Mutex.new
end

def execute(stmt, *params)
raise "Forked workers should create a new DB connection" unless @pid == Process.pid
@mutex.synchronize do
QC.log(:at => "exec_sql", :sql => stmt.inspect)
begin
Expand Down
50 changes: 40 additions & 10 deletions lib/queue_classic/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def initialize(args={})
def start
unlock_jobs_of_dead_workers()
while @running
@fork_worker ? fork_and_work : work
work
end
end

Expand All @@ -56,14 +56,6 @@ def stop
@running = false
end

# Calls Worker#work but after the current process is forked.
# The parent process will wait on the child process to exit.
def fork_and_work
cpid = fork {setup_child; work}
log(:at => :fork, :pid => cpid)
Process.wait(cpid)
end

# Blocks on locking a job, and once a job is locked,
# it will process the job.
def work
Expand Down Expand Up @@ -112,7 +104,8 @@ def process(queue, job)
start = Time.now
finished = false
begin
call(job).tap do
result = @fork_worker ? call_forked(job) : call(job)
result.tap do
queue.delete(job[:id])
finished = true
end
Expand All @@ -138,6 +131,36 @@ def call(job)
receiver.send(message, *args)
end

# Invoke worker inside a forked process. Worker should NOT
# use shared pg connection, as it corrupts it. It pipes the
# result back via IO.pipe, passes exceptions through.
# To use pg inside worker, use connection pool or a fresh connection
def call_forked(job)
read, write = IO.pipe
prepare_child
cpid = fork do
read.close
setup_child
begin
result = call(job)
rescue => e
result = e
ensure
Marshal.dump(result, write)
# Exit forked process without running exit handlers
# so pg connection is not corrupted
exit!(0)
end
end
log(:at => :fork, :pid => cpid)
write.close
result = read.read
Process.wait(cpid)
loaded = Marshal.load(result)
raise loaded if Exception === loaded
loaded
end

# This method will be called when an exception
# is raised during the execution of the job.
def handle_failure(job,e)
Expand All @@ -148,6 +171,13 @@ def handle_failure(job,e)
# your worker is forking and you need to
# re-establish database connections
def setup_child

end

# This method is called before process is forked
# We avoid using pg inside a forked process,
# so logging has to happen her
def prepare_child
log(:at => "setup_child")
end

Expand Down
63 changes: 63 additions & 0 deletions test/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ def handle_failure(job,e)
end
end


class WorkerTest < QCTest


def test_work
QC.enqueue("TestObject.no_args")
worker = TestWorker.new
Expand All @@ -42,6 +44,7 @@ def test_failed_job
worker = TestWorker.new
worker.work
assert_equal(1, worker.failed_count)
assert_equal(1, QC.count)
end

def test_failed_job_is_logged
Expand Down Expand Up @@ -192,6 +195,66 @@ def test_unlock_jobs_of_dead_workers
# We should have an unlocked job now
res = adapter.connection.exec(query_locked_jobs)
assert_equal(1, res.count)
adapter.connection.close
end

def test_forked_work_success
QC.enqueue("Process.pid")
worker = TestWorker.new fork_worker: true
chpid = worker.work
assert(Process.pid != chpid && chpid > 0)
assert_equal(0, QC.count)
assert_equal(0, worker.failed_count)
end

def test_forked_work_failure
QC.enqueue("TestObject.not_a_method")
worker = TestWorker.new fork_worker: true
output = capture_stderr_output do
worker.work
end
assert(output.include?("#<NoMethodError: undefined method `not_a_method'"))
assert_equal(1, QC.count)
assert_equal(1, worker.failed_count)
end

def test_mixed_forked_and_unforked_work_success
QC.enqueue("Process.pid")
QC.enqueue("Process.pid")
forked = TestWorker.new fork_worker: true
current = TestWorker.new
forkedpid = forked.work
currentpid = current.work
assert(Process.pid != forkedpid && forkedpid > 0)
assert(Process.pid == currentpid)
assert_equal(0, QC.count)
assert_equal(0, current.failed_count)
assert_equal(0, forked.failed_count)
end


def test_forked_work_connection_reuse_failure
QC.enqueue("QC.default_conn_adapter.execute", "SELECT 123 as value")
worker = TestWorker.new fork_worker: true
output = capture_stderr_output do
worker.work
end
assert(output.include?("<RuntimeError: Forked workers should create a new DB connection"))
assert_equal(1, QC.count)
assert_equal(1, worker.failed_count)
end

def test_work_connection_reuse
QC.enqueue("QC.default_conn_adapter.execute", "SELECT 123 as value")
worker = TestWorker.new
result = nil
output = capture_stderr_output do
result = worker.work
end
assert(!output.include?("<RuntimeError: Forked workers should create a new DB connection"))
assert_equal("123", result["value"])
assert_equal(0, QC.count)
assert_equal(0, worker.failed_count)
end

end