Skip to content

Commit

Permalink
fix early return dangling state (#514)
Browse files Browse the repository at this point in the history
* fix early return dangling state

* add more specs

* add better rollback references

* nicer error

* fix readme
  • Loading branch information
mensfeld authored Jul 3, 2024
1 parent 60f5e99 commit a265665
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# WaterDrop changelog

## 2.7.4 (Unreleased)
- [Maintenance] Alias `WaterDrop::Errors::AbortTransaction` with `WaterDrop::AbortTransaction`.
- [Maintenance] Lower the precision reporting to 100 microseconds in the logger listener.
- [Fix] Consumer consuming error: Local: Erroneous state (state) post break flow in transaction.
- [Change] Require 'karafka-core' `>= 2.4.3`

## 2.7.3 (2024-06-09)
Expand Down
4 changes: 4 additions & 0 deletions lib/waterdrop/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,8 @@ def initialize(dispatched, message)
end
end
end

# Alias so we can have a nicer API to abort transactions
# This makes referencing easier
AbortTransaction = Errors::AbortTransaction
end
37 changes: 25 additions & 12 deletions lib/waterdrop/producer/transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ module Transactions
# with a transaction. One transaction per single dispatch and for `produce_many` it will be
# a single transaction wrapping all messages dispatches (not one per message).
#
# @return Block result
# @param block [Proc] block of code that should run
# @return Block result or `nil` in case of early break/return
#
# @example Simple transaction
# producer.transaction do
Expand All @@ -54,7 +55,7 @@ module Transactions
# end
#
# handler.wait
def transaction
def transaction(&block)
# This will safely allow us to support one operation transactions so a transactional
# producer can work without the transactional block if needed
return yield if @transaction_mutex.owned?
Expand All @@ -65,13 +66,7 @@ def transaction
transactional_instrument(:started) { client.begin_transaction }
end

result = nil
commit = false

catch(:abort) do
result = yield
commit = true
end
result, commit = transactional_execute(&block)

commit || raise(WaterDrop::Errors::AbortTransaction)

Expand All @@ -82,15 +77,12 @@ def transaction
result
# We need to handle any interrupt including critical in order not to have the transaction
# running. This will also handle things like `IRB::Abort`
#
# rubocop:disable Lint/RescueException
rescue Exception => e
# This code is a bit tricky. We have an error and when it happens we try to rollback
# the transaction. However we may end up in a state where transaction aborting itself
# produces error. In such case we also want to handle it as fatal and reload client.
# This is why we catch this here
begin
# rubocop:enable Lint/RescueException
with_transactional_error_handling(:abort) do
transactional_instrument(:aborted) do
client.abort_transaction
Expand Down Expand Up @@ -176,6 +168,27 @@ def with_transaction_if_transactional(&block)
transactional? ? transaction(&block) : yield
end

# Executes the requested code in a transaction with error handling and ensures, that upon
# early break we rollback the transaction instead of having it dangling and causing an issue
# where transactional producer would end up in an error state.
def transactional_execute
result = nil
commit = false

catch(:abort) do
result = yield
commit = true
end

[result, commit]
rescue Exception => e
errored = true

raise e
ensure
return [result, commit] unless errored
end

# Instruments the transactional operation with producer id
#
# @param key [Symbol] transaction operation key
Expand Down
7 changes: 7 additions & 0 deletions spec/lib/waterdrop/errors_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,11 @@

specify { expect(error).to be < described_class::BaseError }
end

# Aliases for better DX
describe 'root AbortTransaction' do
subject(:error) { WaterDrop::AbortTransaction }

specify { expect(error).to eq described_class::AbortTransaction }
end
end
26 changes: 26 additions & 0 deletions spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,32 @@
end
end

context 'when we are inside a transaction and early break' do
it 'expect not to corrupt the state of the producer' do
10.times do
producer.transaction { break }
producer.transaction {}
end
end

it 'expect to return nil' do
result = producer.transaction { break(10) }
expect(result).to eq(nil)
end

it 'expect to cancel dispatches' do
handler = nil

producer.transaction do
handler = producer.produce_async(topic: 'example_topic', payload: 'na')

break
end

expect { handler.wait }.to raise_error(Rdkafka::RdkafkaError, /Purged in queue/)
end
end

context 'when producer gets a critical broker errors with reload on' do
let(:topic) { SecureRandom.uuid }

Expand Down

0 comments on commit a265665

Please sign in to comment.