Skip to content

Commit

Permalink
Test consumer group reconnect scenario
Browse files Browse the repository at this point in the history
This simulates a procedure similar to when a client application is
restarted (ie. its consumer groups are shutdown and start again anew).

Closes #15
  • Loading branch information
agis committed Jul 4, 2017
1 parent 1e966a0 commit 92a76b4
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 54 deletions.
1 change: 0 additions & 1 deletion test/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ source 'https://rubygems.org'

gem "rafka", git: "https://github.com/skroutz/rafka-rb"
gem "minitest"
gem "pry-byebug"
14 changes: 1 addition & 13 deletions test/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,33 +1,21 @@
GIT
remote: https://github.com/skroutz/rafka-rb
revision: 548dee4e5dd908c5ef60f8caa91c21dff10b2ac6
revision: 00d076aff970914edbf4c6e438162f4d0b106c0c
specs:
rafka (0.0.1)
redis (~> 3.3)

GEM
remote: https://rubygems.org/
specs:
byebug (9.0.6)
coderay (1.1.1)
method_source (0.8.2)
minitest (5.10.2)
pry (0.10.4)
coderay (~> 1.1.0)
method_source (~> 0.8.1)
slop (~> 3.4)
pry-byebug (3.4.2)
byebug (~> 9.0)
pry (~> 0.10)
redis (3.3.3)
slop (3.6.0)

PLATFORMS
ruby

DEPENDENCIES
minitest
pry-byebug
rafka!

BUNDLED WITH
Expand Down
119 changes: 100 additions & 19 deletions test/end-to-end
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ host, port = host_port[0], Integer(host_port[1])

CLIENT_DEFAULTS = { host: host, port: port }
FLUSH_TIMEOUT = 5000
CONSUME_RETRIES = 4
CONSUME_TIMEOUT = 3
CONSUME_RETRIES = 3
CONSUME_TIMEOUT = 2

class TestRafka < Minitest::Test
def setup
Expand All @@ -26,7 +26,7 @@ class TestRafka < Minitest::Test
start_consumer!(cons)

3.times { |i| @prod.produce(topic, i) }
flush!(@prod)
assert_flushed @prod

replies = []
3.times do |i|
Expand All @@ -41,15 +41,15 @@ class TestRafka < Minitest::Test

def test_many_consumers_same_topic
with_new_topic do |topic|
group_id = rand_id(:cgroup)
cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons1"))
cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons2"))
gid = rand_id
cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons1"))
cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons2"))
start_consumer!(cons1)
start_consumer!(cons2)

msgs = ["a", "b"]
msgs.each { |msg| @prod.produce(topic, msg) }
flush!(@prod)
assert_flushed @prod

replies = []
tries = 0
Expand All @@ -70,9 +70,9 @@ class TestRafka < Minitest::Test

def test_consumer_group_rebalance
with_new_topic do |topic|
group_id = rand_id(:cgroup)
cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons1"))
cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons2"))
gid = rand_id
cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons1"))
cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons2"))
msg1 = "hi"
msg2 = "hello"

Expand All @@ -81,7 +81,7 @@ class TestRafka < Minitest::Test
assert_rafka_msg_equal msg1, consume_with_retry(cons1)

# commit offsets and shutdown so that cons2 gets all the partitions
cons1.quit
cons1.close
start_consumer!(cons2)

produce_and_flush!(@prod, topic, msg2)
Expand All @@ -108,7 +108,7 @@ class TestRafka < Minitest::Test

@prod.produce(topic1, "I'm Mr. Meeseeks")
@prod.produce(topic2, "Look at me")
flush!(@prod)
assert_flushed @prod

assert_rafka_msg_equal "I'm Mr. Meeseeks", consume_with_retry(cons1)
assert_rafka_msg_equal "Look at me", consume_with_retry(cons2)
Expand All @@ -117,19 +117,100 @@ class TestRafka < Minitest::Test
end

def test_produce_wrong_topic
# TODO(agis): first produce should also raise an error. This is a Rafka
# issue
@prod.produce("idontexist", "foo")
@prod.flush

assert_raises Rafka::ProduceError do
# TODO(agis): first produce won't do it. This is a Rafka issue.
@prod.produce("idontexist", "foo")
@prod.flush
@prod.produce("idontexist", "foo")
end
end

def test_cgroup_reconnect_single_partition
with_new_topic(partitions: 1) do |topic|
produce_and_flush!(@prod, topic, "foo")
group_a = "a_group"
cons = new_consumer(topic, group_a)
assert_rafka_msg_equal "foo", consume_with_retry(cons)
cons.close

produce_and_flush!(@prod, topic, "bar")

assert_rafka_msg_equal "bar", consume_with_retry(new_consumer(topic, group_a))

cons = new_consumer(topic, "another_group")
assert_rafka_msg_equal "foo", consume_with_retry(cons)
assert_rafka_msg_equal "bar", consume_with_retry(cons)
end
end

# This tests a real-world scenario where a client app restarts (eg. during
# deployment), thus stopping and restarting its consumers.
#
# The flow is the following:
#
# 1. Consumers of a group (we call it cgroup) are consuming from topic Y
# 2. Values are produced to topic Y
# 3. cgroup consumes the produced values
# 4. cgroup is restarted (ie. app is deployed)
# 5. More values are produced to topic Y
# 6. cgroup continues consuming from the last position, ie. it doesn't
# reconsume values from step (2) but only from (5)
def test_cgroup_reconnect_many_partitions
partitions = 4
input_size = 20
reconsumes_tolerated = partitions
flunk "input_size must be even, given: #{input_size}" if input_size.odd?

with_new_topic(partitions: partitions) do |topic|
group = "cgroupA"
input = (1..input_size)
input_a, input_b = input.each_slice(input_size/2).to_a
output = Hash.new(0)

# produce some input and consume it
cgroup = Array.new(2) { new_consumer(topic, group) }
cgroup.each { |c| start_consumer!(c) }
input_a.each { |i| @prod.produce(topic, i) }
assert_flushed @prod

while output.size < input_a.size
cgroup.each do |c|
msg = consume_with_retry(c, timeout: 1, retries: 1)
output[msg.value.to_i] += 1 if msg
end
end

assert_equal input_a.to_a, output.keys.sort

# restart cgroup to simulate client app deployment
cgroup.each { |c| c.close }

# produce some more input and assert cgroup continues where it left
# position (ie. does not re-consume input_a)
cgroup = Array.new(2) { new_consumer(topic, group) }
cgroup.each { |c| start_consumer!(c) }
input_b.each { |i| @prod.produce(topic, i) }
assert_flushed @prod

while output.size < input_size
cgroup.each do |c|
msg = consume_with_retry(c, timeout: 1, retries: 1)
output[msg.value.to_i] += 1 if msg
end
end

assert_equal input.to_a, output.keys.sort

actual_reconsumes = output.values.inject(:+) - input_size
assert actual_reconsumes <= reconsumes_tolerated,
"Expected reconsumes to be <= #{reconsumes_tolerated}, " \
"was #{actual_reconsumes}"
end
end
end

puts "\nRunning on #{host_port} " \
"(Client version #{Rafka::VERSION}, CONSUME_RETRIES=#{CONSUME_RETRIES}, " \
puts "\nRunning on #{host_port.join(":")} " \
"(rafka-rb #{Rafka::VERSION}, CONSUME_RETRIES=#{CONSUME_RETRIES}, " \
"CONSUME_TIMEOUT=#{CONSUME_TIMEOUT})..."

$topics = []
Expand Down
4 changes: 3 additions & 1 deletion test/kafka.test.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
},
"producer": {
"go.delivery.reports": true,
"queue.buffering.max.ms": 0
"queue.buffering.max.ms": 0,
"message.send.max.retries": 2,
"retry.backoff.ms": 100
}
}
31 changes: 11 additions & 20 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
def rand_id(type=nil)
SecureRandom.hex(4).prepend(
case type
when :cgroup then "G"
when :cons then "C"
else ""
end
)
def rand_id
SecureRandom.hex(3)
end

def new_consumer(topic, group=rand_id(:cgroup), id=rand_id(:cons))
def new_consumer(topic, group=rand_id, id=rand_id)
Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group, id: id))
end

Expand All @@ -24,23 +18,16 @@ def consume_with_retry(consumer, timeout: CONSUME_TIMEOUT, retries: CONSUME_RETR

def produce_and_flush!(prod, topic, msg)
prod.produce(topic, msg)
flush!(prod)
end

# @raise [] if there are still unflushed messages
def flush!(prod)
unflushed = prod.flush(FLUSH_TIMEOUT)
flunk("#{unflushed} unflushed messages remained") if unflushed > 0
assert_flushed prod
end

def start_consumer!(cons)
cons.consume(1)
end

# Creates a new topic and optionally a consumer to consume from it.
def with_new_topic(topic: "rafka-test-#{Time.now.to_i}-#{rand_id}",
partitions: 4, replication_factor: 2,
consumer: false)
def with_new_topic(topic: "r-#{rand_id}", partitions: 4, replication_factor: 2,
consumer: false)
create_kafka_topic!(topic, partitions, replication_factor)
$topics << topic

Expand All @@ -64,7 +51,6 @@ def delete_kafka_topic!(topic)
raise "Error deleting topic #{topic}: #{out}" if !$?.success?
end


# ASSERTIONS
def assert_rafka_msg(msg)
assert_kind_of Rafka::Message, msg
Expand All @@ -74,3 +60,8 @@ def assert_rafka_msg_equal(exp, act, msg=nil)
assert_rafka_msg(act)
assert_equal exp, act.value
end

def assert_flushed(producer)
assert_equal 0, producer.flush(FLUSH_TIMEOUT)
end

0 comments on commit 92a76b4

Please sign in to comment.