diff --git a/test/Gemfile b/test/Gemfile index 1f3b22e..5a2a2ba 100644 --- a/test/Gemfile +++ b/test/Gemfile @@ -2,4 +2,3 @@ source 'https://rubygems.org' gem "rafka", git: "https://github.com/skroutz/rafka-rb" gem "minitest" -gem "pry-byebug" diff --git a/test/Gemfile.lock b/test/Gemfile.lock index 8c6827e..dd1c06f 100644 --- a/test/Gemfile.lock +++ b/test/Gemfile.lock @@ -1,6 +1,6 @@ GIT remote: https://github.com/skroutz/rafka-rb - revision: 548dee4e5dd908c5ef60f8caa91c21dff10b2ac6 + revision: 00d076aff970914edbf4c6e438162f4d0b106c0c specs: rafka (0.0.1) redis (~> 3.3) @@ -8,26 +8,14 @@ GIT GEM remote: https://rubygems.org/ specs: - byebug (9.0.6) - coderay (1.1.1) - method_source (0.8.2) minitest (5.10.2) - pry (0.10.4) - coderay (~> 1.1.0) - method_source (~> 0.8.1) - slop (~> 3.4) - pry-byebug (3.4.2) - byebug (~> 9.0) - pry (~> 0.10) redis (3.3.3) - slop (3.6.0) PLATFORMS ruby DEPENDENCIES minitest - pry-byebug rafka! BUNDLED WITH diff --git a/test/end-to-end b/test/end-to-end index 075f7e8..9beaafd 100755 --- a/test/end-to-end +++ b/test/end-to-end @@ -13,8 +13,8 @@ host, port = host_port[0], Integer(host_port[1]) CLIENT_DEFAULTS = { host: host, port: port } FLUSH_TIMEOUT = 5000 -CONSUME_RETRIES = 4 -CONSUME_TIMEOUT = 3 +CONSUME_RETRIES = 3 +CONSUME_TIMEOUT = 2 class TestRafka < Minitest::Test def setup @@ -26,7 +26,7 @@ class TestRafka < Minitest::Test start_consumer!(cons) 3.times { |i| @prod.produce(topic, i) } - flush!(@prod) + assert_flushed @prod replies = [] 3.times do |i| @@ -41,15 +41,15 @@ class TestRafka < Minitest::Test def test_many_consumers_same_topic with_new_topic do |topic| - group_id = rand_id(:cgroup) - cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons1")) - cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons2")) + gid = rand_id + cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons1")) + cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons2")) start_consumer!(cons1) start_consumer!(cons2) msgs = ["a", "b"] msgs.each { |msg| @prod.produce(topic, msg) } - flush!(@prod) + assert_flushed @prod replies = [] tries = 0 @@ -70,9 +70,9 @@ class TestRafka < Minitest::Test def test_consumer_group_rebalance with_new_topic do |topic| - group_id = rand_id(:cgroup) - cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons1")) - cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group_id, id: "cons2")) + gid = rand_id + cons1 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons1")) + cons2 = Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: gid, id: "cons2")) msg1 = "hi" msg2 = "hello" @@ -81,7 +81,7 @@ class TestRafka < Minitest::Test assert_rafka_msg_equal msg1, consume_with_retry(cons1) # commit offsets and shutdown so that cons2 gets all the partitions - cons1.quit + cons1.close start_consumer!(cons2) produce_and_flush!(@prod, topic, msg2) @@ -108,7 +108,7 @@ class TestRafka < Minitest::Test @prod.produce(topic1, "I'm Mr. Meeseeks") @prod.produce(topic2, "Look at me") - flush!(@prod) + assert_flushed @prod assert_rafka_msg_equal "I'm Mr. Meeseeks", consume_with_retry(cons1) assert_rafka_msg_equal "Look at me", consume_with_retry(cons2) @@ -117,19 +117,100 @@ class TestRafka < Minitest::Test end def test_produce_wrong_topic - # TODO(agis): first produce should also raise an error. This is a Rafka - # issue - @prod.produce("idontexist", "foo") - @prod.flush - assert_raises Rafka::ProduceError do + # TODO(agis): first produce won't do it. This is a Rafka issue. + @prod.produce("idontexist", "foo") + @prod.flush @prod.produce("idontexist", "foo") end end + + def test_cgroup_reconnect_single_partition + with_new_topic(partitions: 1) do |topic| + produce_and_flush!(@prod, topic, "foo") + group_a = "a_group" + cons = new_consumer(topic, group_a) + assert_rafka_msg_equal "foo", consume_with_retry(cons) + cons.close + + produce_and_flush!(@prod, topic, "bar") + + assert_rafka_msg_equal "bar", consume_with_retry(new_consumer(topic, group_a)) + + cons = new_consumer(topic, "another_group") + assert_rafka_msg_equal "foo", consume_with_retry(cons) + assert_rafka_msg_equal "bar", consume_with_retry(cons) + end + end + + # This tests a real-world scenario where a client app restarts (eg. during + # deployment), thus stopping and restarting its consumers. + # + # The flow is the following: + # + # 1. Consumers of a group (we call it cgroup) are consuming from topic Y + # 2. Values are produced to topic Y + # 3. cgroup consumes the produced values + # 4. cgroup is restarted (ie. app is deployed) + # 5. More values are produced to topic Y + # 6. cgroup continues consuming from the last position, ie. it doesn't + # reconsume values from step (2) but only from (5) + def test_cgroup_reconnect_many_partitions + partitions = 4 + input_size = 20 + reconsumes_tolerated = partitions + flunk "input_size must be even, given: #{input_size}" if input_size.odd? + + with_new_topic(partitions: partitions) do |topic| + group = "cgroupA" + input = (1..input_size) + input_a, input_b = input.each_slice(input_size/2).to_a + output = Hash.new(0) + + # produce some input and consume it + cgroup = Array.new(2) { new_consumer(topic, group) } + cgroup.each { |c| start_consumer!(c) } + input_a.each { |i| @prod.produce(topic, i) } + assert_flushed @prod + + while output.size < input_a.size + cgroup.each do |c| + msg = consume_with_retry(c, timeout: 1, retries: 1) + output[msg.value.to_i] += 1 if msg + end + end + + assert_equal input_a.to_a, output.keys.sort + + # restart cgroup to simulate client app deployment + cgroup.each { |c| c.close } + + # produce some more input and assert cgroup continues where it left + # position (ie. does not re-consume input_a) + cgroup = Array.new(2) { new_consumer(topic, group) } + cgroup.each { |c| start_consumer!(c) } + input_b.each { |i| @prod.produce(topic, i) } + assert_flushed @prod + + while output.size < input_size + cgroup.each do |c| + msg = consume_with_retry(c, timeout: 1, retries: 1) + output[msg.value.to_i] += 1 if msg + end + end + + assert_equal input.to_a, output.keys.sort + + actual_reconsumes = output.values.inject(:+) - input_size + assert actual_reconsumes <= reconsumes_tolerated, + "Expected reconsumes to be <= #{reconsumes_tolerated}, " \ + "was #{actual_reconsumes}" + end + end end -puts "\nRunning on #{host_port} " \ - "(Client version #{Rafka::VERSION}, CONSUME_RETRIES=#{CONSUME_RETRIES}, " \ +puts "\nRunning on #{host_port.join(":")} " \ + "(rafka-rb #{Rafka::VERSION}, CONSUME_RETRIES=#{CONSUME_RETRIES}, " \ "CONSUME_TIMEOUT=#{CONSUME_TIMEOUT})..." $topics = [] diff --git a/test/kafka.test.json b/test/kafka.test.json index bcb8fa4..97a74e1 100644 --- a/test/kafka.test.json +++ b/test/kafka.test.json @@ -14,6 +14,8 @@ }, "producer": { "go.delivery.reports": true, - "queue.buffering.max.ms": 0 + "queue.buffering.max.ms": 0, + "message.send.max.retries": 2, + "retry.backoff.ms": 100 } } diff --git a/test/test_helper.rb b/test/test_helper.rb index 974cc1a..922c68f 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -1,14 +1,8 @@ -def rand_id(type=nil) - SecureRandom.hex(4).prepend( - case type - when :cgroup then "G" - when :cons then "C" - else "" - end - ) +def rand_id + SecureRandom.hex(3) end -def new_consumer(topic, group=rand_id(:cgroup), id=rand_id(:cons)) +def new_consumer(topic, group=rand_id, id=rand_id) Rafka::Consumer.new(CLIENT_DEFAULTS.merge(topic: topic, group: group, id: id)) end @@ -24,13 +18,7 @@ def consume_with_retry(consumer, timeout: CONSUME_TIMEOUT, retries: CONSUME_RETR def produce_and_flush!(prod, topic, msg) prod.produce(topic, msg) - flush!(prod) -end - -# @raise [] if there are still unflushed messages -def flush!(prod) - unflushed = prod.flush(FLUSH_TIMEOUT) - flunk("#{unflushed} unflushed messages remained") if unflushed > 0 + assert_flushed prod end def start_consumer!(cons) @@ -38,9 +26,8 @@ def start_consumer!(cons) end # Creates a new topic and optionally a consumer to consume from it. -def with_new_topic(topic: "rafka-test-#{Time.now.to_i}-#{rand_id}", - partitions: 4, replication_factor: 2, - consumer: false) +def with_new_topic(topic: "r-#{rand_id}", partitions: 4, replication_factor: 2, + consumer: false) create_kafka_topic!(topic, partitions, replication_factor) $topics << topic @@ -64,7 +51,6 @@ def delete_kafka_topic!(topic) raise "Error deleting topic #{topic}: #{out}" if !$?.success? end - # ASSERTIONS def assert_rafka_msg(msg) assert_kind_of Rafka::Message, msg @@ -74,3 +60,8 @@ def assert_rafka_msg_equal(exp, act, msg=nil) assert_rafka_msg(act) assert_equal exp, act.value end + +def assert_flushed(producer) + assert_equal 0, producer.flush(FLUSH_TIMEOUT) +end +