From 1a6c6adfde749f2db3f878d19d73a7d3c235fe80 Mon Sep 17 00:00:00 2001 From: "e.makrushin" Date: Tue, 28 May 2024 16:31:27 +0700 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB=20?= =?UTF-8?q?=D0=BA=D0=BE=D1=81=D1=82=D1=8B=D0=BB=D1=8C=20=D0=B4=D0=BB=D1=8F?= =?UTF-8?q?=20=D0=B1=D1=8B=D1=81=D1=82=D1=80=D0=BE=D0=B3=D0=BE=20=D1=87?= =?UTF-8?q?=D1=82=D0=B5=D0=BD=D0=B8=D1=8F=20=D0=BF=D0=B0=D1=87=D0=B5=D0=BA?= =?UTF-8?q?=20=D1=81=20=D0=BE=D0=B6=D0=B8=D0=B4=D0=B0=D0=B5=D0=BC=D1=8B?= =?UTF-8?q?=D0=BC=20=D1=82=D0=B0=D0=B9=D0=BC=D0=B0=D1=83=D1=82=D0=BE=D0=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Kafka/KafkaTopicReader.cs | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/Vostok.Hercules.Consumers/Kafka/KafkaTopicReader.cs b/Vostok.Hercules.Consumers/Kafka/KafkaTopicReader.cs index 93ed6e7..1099ff9 100644 --- a/Vostok.Hercules.Consumers/Kafka/KafkaTopicReader.cs +++ b/Vostok.Hercules.Consumers/Kafka/KafkaTopicReader.cs @@ -23,7 +23,7 @@ internal sealed class KafkaTopicReader private readonly IConsumer consumer; private static readonly DataSize ReadBufferRentSize = 25.Megabytes(); // Approximate size - private static readonly Partition Partition = new Partition(0); + private static readonly Partition Partition = new(0); public KafkaTopicReader(KafkaTopicReaderSettings settings, ILog log, BufferPool bufferPool) { @@ -48,8 +48,7 @@ public KafkaTopicReader(KafkaTopicReaderSettings settings, ILog log, BufferPool public void Assign() { consumer.Assign(new TopicPartition(settings.Topic, Partition)); - log.Info( - $"Kafka consumer assigned to topic. Fetch min bytes: {settings.FetchMinBytes}, Fetch wait max ms: {settings.FetchWaitMaxMs}, Consume timeout ms: {settings.ConsumeTimeout.TotalMilliseconds}".ToString()); + log.Info($"Kafka consumer assigned to topic. Fetch min bytes: {settings.FetchMinBytes}, Fetch wait max ms: {settings.FetchWaitMaxMs}, Consume timeout ms: {settings.ConsumeTimeout.TotalMilliseconds}".ToString()); } public async Task ReadAsync(ReadStreamQuery query) => @@ -68,15 +67,24 @@ private RawReadStreamResult ReadInternal(ReadStreamQuery query) var eventsCount = 0; try { + // Суть костыля: с нормальным таймаутом читается только первое сообщение из пачки. + // Остальные либо сразу достаются из буффера, либо сразу получаем null - индикатор конца пачки. + var message = consumer.Consume(settings.ConsumeTimeout); + if (message?.Message is null) + throw new Exception(); + + eventsWriter.WriteWithoutLength(message.Message.Value); + lastConsumedOffset = message.Offset; + eventsCount++; + while (eventsCount < query.Limit) { - var result = consumer.Consume(settings.ConsumeTimeout); - - if (result?.Message is null) + message = consumer.Consume(TimeSpan.Zero); + if (message?.Message is null) break; - eventsWriter.WriteWithoutLength(result.Message.Value); - lastConsumedOffset = result.Offset; + eventsWriter.WriteWithoutLength(message.Message.Value); + lastConsumedOffset = message.Offset; eventsCount++; } }