Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test consumer group reconnect scenario #16

Merged
merged 2 commits into from
Jul 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (p *Producer) run() {
case ev := <-p.rdProd.Events():
msg := ev.(*rdkafka.Message)
if err := msg.TopicPartition.Error; err != nil {
p.log.Printf("Failed to deliver `%s` to %s", msg.Value, msg)
p.log.Printf("Failed to deliver `%s` to %s: %s", msg.Value, msg, err)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion test/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ source 'https://rubygems.org'

gem "rafka", git: "https://github.com/skroutz/rafka-rb"
gem "minitest"
gem "pry-byebug"
14 changes: 1 addition & 13 deletions test/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,33 +1,21 @@
GIT
remote: https://github.com/skroutz/rafka-rb
revision: 548dee4e5dd908c5ef60f8caa91c21dff10b2ac6
revision: 00d076aff970914edbf4c6e438162f4d0b106c0c
specs:
rafka (0.0.1)
redis (~> 3.3)

GEM
remote: https://rubygems.org/
specs:
byebug (9.0.6)
coderay (1.1.1)
method_source (0.8.2)
minitest (5.10.2)
pry (0.10.4)
coderay (~> 1.1.0)
method_source (~> 0.8.1)
slop (~> 3.4)
pry-byebug (3.4.2)
byebug (~> 9.0)
pry (~> 0.10)
redis (3.3.3)
slop (3.6.0)

PLATFORMS
ruby

DEPENDENCIES
minitest
pry-byebug
rafka!

BUNDLED WITH
Expand Down
119 changes: 100 additions & 19 deletions test/end-to-end
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ host, port = host_port[0], Integer(host_port[1])

CLIENT_DEFAULTS = { host: host, port: port }
FLUSH_TIMEOUT = 5000
CONSUME_RETRIES = 4
CONSUME_TIMEOUT = 3
CONSUME_RETRIES = 3
CONSUME_TIMEOUT = 2

class TestRafka < Minitest::Test
def setup
Expand All @@ -26,7 +26,7 @@ class TestRafka < Minitest::Test
start_consumer!(cons)

3.times { |i| @prod.produce(topic, i) }
flush!(@prod)
assert_flushed @prod

replies = []
3.times do |i|
Expand All @@ -41,15 +41,15 @@ class TestRafka < Minitest::Test

def test_many_consumers_same_topic
with_new_topic do |topic|
group_id = rand_id(:cgroup)
cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons1"))
cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons2"))
gid = rand_id
cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons1"))
cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons2"))
start_consumer!(cons1)
start_consumer!(cons2)

msgs = ["a", "b"]
msgs.each { |msg| @prod.produce(topic, msg) }
flush!(@prod)
assert_flushed @prod

replies = []
tries = 0
Expand All @@ -70,9 +70,9 @@ class TestRafka < Minitest::Test

def test_consumer_group_rebalance
with_new_topic do |topic|
group_id = rand_id(:cgroup)
cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons1"))
cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons2"))
gid = rand_id
cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons1"))
cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons2"))
msg1 = "hi"
msg2 = "hello"

Expand All @@ -81,7 +81,7 @@ class TestRafka < Minitest::Test
assert_rafka_msg_equal msg1, consume_with_retry(cons1)

# commit offsets and shutdown so that cons2 gets all the partitions
cons1.quit
cons1.close
start_consumer!(cons2)

produce_and_flush!(@prod, topic, msg2)
Expand All @@ -108,7 +108,7 @@ class TestRafka < Minitest::Test

@prod.produce(topic1, "I'm Mr. Meeseeks")
@prod.produce(topic2, "Look at me")
flush!(@prod)
assert_flushed @prod

assert_rafka_msg_equal "I'm Mr. Meeseeks", consume_with_retry(cons1)
assert_rafka_msg_equal "Look at me", consume_with_retry(cons2)
Expand All @@ -117,19 +117,100 @@ class TestRafka < Minitest::Test
end

def test_produce_wrong_topic
# TODO(agis): first produce should also raise an error. This is a Rafka
# issue
@prod.produce("idontexist", "foo")
@prod.flush

assert_raises Rafka::ProduceError do
# TODO(agis): first produce won't do it. This is a Rafka issue.
@prod.produce("idontexist", "foo")
@prod.flush
@prod.produce("idontexist", "foo")
end
end

def test_cgroup_reconnect_single_partition
with_new_topic(partitions: 1) do |topic|
produce_and_flush!(@prod, topic, "foo")
group_a = "a_group"
cons = new_consumer(topic, group_a)
assert_rafka_msg_equal "foo", consume_with_retry(cons)
cons.close

produce_and_flush!(@prod, topic, "bar")

assert_rafka_msg_equal "bar", consume_with_retry(new_consumer(topic, group_a))

cons = new_consumer(topic, "another_group")
assert_rafka_msg_equal "foo", consume_with_retry(cons)
assert_rafka_msg_equal "bar", consume_with_retry(cons)
end
end

# This tests a real-world scenario where a client app restarts (eg. during
# deployment), thus stopping and restarting its consumers.
#
# The flow is the following:
#
# 1. Consumers of a group (we call it cgroup) are consuming from topic Y
# 2. Values are produced to topic Y
# 3. cgroup consumes the produced values
# 4. cgroup is restarted (ie. app is deployed)
# 5. More values are produced to topic Y
# 6. cgroup continues consuming from the last position, ie. it doesn't
# reconsume values from step (2) but only from (5)
def test_cgroup_reconnect_many_partitions
partitions = 4
input_size = 20
reconsumes_tolerated = partitions
flunk "input_size must be even, given: #{input_size}" if input_size.odd?

with_new_topic(partitions: partitions) do |topic|
group = "cgroupA"
input = (1..input_size)
input_a, input_b = input.each_slice(input_size/2).to_a
output = Hash.new(0)

# produce some input and consume it
cgroup = Array.new(2) { new_consumer(topic, group) }
cgroup.each { |c| start_consumer!(c) }
input_a.each { |i| @prod.produce(topic, i) }
assert_flushed @prod

while output.size < input_a.size
cgroup.each do |c|
msg = consume_with_retry(c, timeout: 1, retries: 1)
output[msg.value.to_i] += 1 if msg
end
end

assert_equal input_a.to_a, output.keys.sort

# restart cgroup to simulate client app deployment
cgroup.each { |c| c.close }

# produce some more input and assert cgroup continues where it left
# position (ie. does not re-consume input_a)
cgroup = Array.new(2) { new_consumer(topic, group) }
cgroup.each { |c| start_consumer!(c) }
input_b.each { |i| @prod.produce(topic, i) }
assert_flushed @prod

while output.size < input_size
cgroup.each do |c|
msg = consume_with_retry(c, timeout: 1, retries: 1)
output[msg.value.to_i] += 1 if msg
end
end

assert_equal input.to_a, output.keys.sort

actual_reconsumes = output.values.inject(:+) - input_size
assert actual_reconsumes <= reconsumes_tolerated,
"Expected reconsumes to be <= #{reconsumes_tolerated}, " \
"was #{actual_reconsumes}"
end
end
end

puts "\nRunning on #{host_port} " \
"(Client version #{Rafka::VERSION}, CONSUME_RETRIES=#{CONSUME_RETRIES}, " \
puts "\nRunning on #{host_port.join(":")} " \
"(rafka-rb #{Rafka::VERSION}, CONSUME_RETRIES=#{CONSUME_RETRIES}, " \
"CONSUME_TIMEOUT=#{CONSUME_TIMEOUT})..."

$topics = []
Expand Down
4 changes: 3 additions & 1 deletion test/kafka.test.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
},
"producer": {
"go.delivery.reports": true,
"queue.buffering.max.ms": 0
"queue.buffering.max.ms": 0,
"message.send.max.retries": 2,
"retry.backoff.ms": 100
}
}
31 changes: 11 additions & 20 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
def rand_id(type=nil)
SecureRandom.hex(4).prepend(
case type
when :cgroup then "G"
when :cons then "C"
else ""
end
)
def rand_id
SecureRandom.hex(3)
end

def new_consumer(topic, group=rand_id(:cgroup), id=rand_id(:cons))
def new_consumer(topic, group=rand_id, id=rand_id)
Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group, id: id))
end

Expand All @@ -24,23 +18,16 @@ def consume_with_retry(consumer, timeout: CONSUME_TIMEOUT, retries: CONSUME_RETR

def produce_and_flush!(prod, topic, msg)
prod.produce(topic, msg)
flush!(prod)
end

# @raise [] if there are still unflushed messages
def flush!(prod)
unflushed = prod.flush(FLUSH_TIMEOUT)
flunk("#{unflushed} unflushed messages remained") if unflushed > 0
assert_flushed prod
end

def start_consumer!(cons)
cons.consume(1)
end

# Creates a new topic and optionally a consumer to consume from it.
def with_new_topic(topic: "rafka-test-#{Time.now.to_i}-#{rand_id}",
partitions: 4, replication_factor: 2,
consumer: false)
def with_new_topic(topic: "r-#{rand_id}", partitions: 4, replication_factor: 2,
consumer: false)
create_kafka_topic!(topic, partitions, replication_factor)
$topics << topic

Expand All @@ -64,7 +51,6 @@ def delete_kafka_topic!(topic)
raise "Error deleting topic #{topic}: #{out}" if !$?.success?
end


# ASSERTIONS
def assert_rafka_msg(msg)
assert_kind_of Rafka::Message, msg
Expand All @@ -74,3 +60,8 @@ def assert_rafka_msg_equal(exp, act, msg=nil)
assert_rafka_msg(act)
assert_equal exp, act.value
end

def assert_flushed(producer)
assert_equal 0, producer.flush(FLUSH_TIMEOUT)
end