From a26566545eceb7ddbfd864e0e6804d8d5f0330b3 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Wed, 3 Jul 2024 21:57:09 +0200 Subject: [PATCH] fix early return dangling state (#514) * fix early return dangling state * add more specs * add better rollback references * nicer error * fix readme --- CHANGELOG.md | 2 + lib/waterdrop/errors.rb | 4 ++ lib/waterdrop/producer/transactions.rb | 37 +++++++++++++------ spec/lib/waterdrop/errors_spec.rb | 7 ++++ .../waterdrop/producer/transactions_spec.rb | 26 +++++++++++++ 5 files changed, 64 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 672e2c8b..0e2833ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,9 @@ # WaterDrop changelog ## 2.7.4 (Unreleased) +- [Maintenance] Alias `WaterDrop::Errors::AbortTransaction` with `WaterDrop::AbortTransaction`. - [Maintenance] Lower the precision reporting to 100 microseconds in the logger listener. +- [Fix] Consumer consuming error: Local: Erroneous state (state) post break flow in transaction. - [Change] Require 'karafka-core' `>= 2.4.3` ## 2.7.3 (2024-06-09) diff --git a/lib/waterdrop/errors.rb b/lib/waterdrop/errors.rb index 1b9022a6..163d04b4 100644 --- a/lib/waterdrop/errors.rb +++ b/lib/waterdrop/errors.rb @@ -60,4 +60,8 @@ def initialize(dispatched, message) end end end + + # Alias so we can have a nicer API to abort transactions + # This makes referencing easier + AbortTransaction = Errors::AbortTransaction end diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb index 9a9f65c2..957f714c 100644 --- a/lib/waterdrop/producer/transactions.rb +++ b/lib/waterdrop/producer/transactions.rb @@ -35,7 +35,8 @@ module Transactions # with a transaction. One transaction per single dispatch and for `produce_many` it will be # a single transaction wrapping all messages dispatches (not one per message). # - # @return Block result + # @param block [Proc] block of code that should run + # @return Block result or `nil` in case of early break/return # # @example Simple transaction # producer.transaction do @@ -54,7 +55,7 @@ module Transactions # end # # handler.wait - def transaction + def transaction(&block) # This will safely allow us to support one operation transactions so a transactional # producer can work without the transactional block if needed return yield if @transaction_mutex.owned? @@ -65,13 +66,7 @@ def transaction transactional_instrument(:started) { client.begin_transaction } end - result = nil - commit = false - - catch(:abort) do - result = yield - commit = true - end + result, commit = transactional_execute(&block) commit || raise(WaterDrop::Errors::AbortTransaction) @@ -82,15 +77,12 @@ def transaction result # We need to handle any interrupt including critical in order not to have the transaction # running. This will also handle things like `IRB::Abort` - # - # rubocop:disable Lint/RescueException rescue Exception => e # This code is a bit tricky. We have an error and when it happens we try to rollback # the transaction. However we may end up in a state where transaction aborting itself # produces error. In such case we also want to handle it as fatal and reload client. # This is why we catch this here begin - # rubocop:enable Lint/RescueException with_transactional_error_handling(:abort) do transactional_instrument(:aborted) do client.abort_transaction @@ -176,6 +168,27 @@ def with_transaction_if_transactional(&block) transactional? ? transaction(&block) : yield end + # Executes the requested code in a transaction with error handling and ensures, that upon + # early break we rollback the transaction instead of having it dangling and causing an issue + # where transactional producer would end up in an error state. + def transactional_execute + result = nil + commit = false + + catch(:abort) do + result = yield + commit = true + end + + [result, commit] + rescue Exception => e + errored = true + + raise e + ensure + return [result, commit] unless errored + end + # Instruments the transactional operation with producer id # # @param key [Symbol] transaction operation key diff --git a/spec/lib/waterdrop/errors_spec.rb b/spec/lib/waterdrop/errors_spec.rb index 4acfe0d6..c40a0c7f 100644 --- a/spec/lib/waterdrop/errors_spec.rb +++ b/spec/lib/waterdrop/errors_spec.rb @@ -66,4 +66,11 @@ specify { expect(error).to be < described_class::BaseError } end + + # Aliases for better DX + describe 'root AbortTransaction' do + subject(:error) { WaterDrop::AbortTransaction } + + specify { expect(error).to eq described_class::AbortTransaction } + end end diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index 226a631e..951f4629 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -597,6 +597,32 @@ end end + context 'when we are inside a transaction and early break' do + it 'expect not to corrupt the state of the producer' do + 10.times do + producer.transaction { break } + producer.transaction {} + end + end + + it 'expect to return nil' do + result = producer.transaction { break(10) } + expect(result).to eq(nil) + end + + it 'expect to cancel dispatches' do + handler = nil + + producer.transaction do + handler = producer.produce_async(topic: 'example_topic', payload: 'na') + + break + end + + expect { handler.wait }.to raise_error(Rdkafka::RdkafkaError, /Purged in queue/) + end + end + context 'when producer gets a critical broker errors with reload on' do let(:topic) { SecureRandom.uuid }