Skip to content

Commit

Permalink
Добавил костыль для быстрого чтения пачек с ожидаемым таймаутом
Browse files Browse the repository at this point in the history
  • Loading branch information
e.makrushin committed May 28, 2024
1 parent 3ec20ed commit 1a6c6ad
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions Vostok.Hercules.Consumers/Kafka/KafkaTopicReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal sealed class KafkaTopicReader
private readonly IConsumer<Ignore, byte[]> consumer;

private static readonly DataSize ReadBufferRentSize = 25.Megabytes(); // Approximate size
private static readonly Partition Partition = new Partition(0);
private static readonly Partition Partition = new(0);

public KafkaTopicReader(KafkaTopicReaderSettings settings, ILog log, BufferPool bufferPool)
{
Expand All @@ -48,8 +48,7 @@ public KafkaTopicReader(KafkaTopicReaderSettings settings, ILog log, BufferPool
public void Assign()
{
consumer.Assign(new TopicPartition(settings.Topic, Partition));
log.Info(
$"Kafka consumer assigned to topic. Fetch min bytes: {settings.FetchMinBytes}, Fetch wait max ms: {settings.FetchWaitMaxMs}, Consume timeout ms: {settings.ConsumeTimeout.TotalMilliseconds}".ToString());
log.Info($"Kafka consumer assigned to topic. Fetch min bytes: {settings.FetchMinBytes}, Fetch wait max ms: {settings.FetchWaitMaxMs}, Consume timeout ms: {settings.ConsumeTimeout.TotalMilliseconds}".ToString());
}

public async Task<RawReadStreamResult> ReadAsync(ReadStreamQuery query) =>
Expand All @@ -68,15 +67,24 @@ private RawReadStreamResult ReadInternal(ReadStreamQuery query)
var eventsCount = 0;
try
{
// Суть костыля: с нормальным таймаутом читается только первое сообщение из пачки.
// Остальные либо сразу достаются из буффера, либо сразу получаем null - индикатор конца пачки.
var message = consumer.Consume(settings.ConsumeTimeout);
if (message?.Message is null)
throw new Exception();

eventsWriter.WriteWithoutLength(message.Message.Value);
lastConsumedOffset = message.Offset;
eventsCount++;

while (eventsCount < query.Limit)
{
var result = consumer.Consume(settings.ConsumeTimeout);

if (result?.Message is null)
message = consumer.Consume(TimeSpan.Zero);
if (message?.Message is null)
break;

eventsWriter.WriteWithoutLength(result.Message.Value);
lastConsumedOffset = result.Offset;
eventsWriter.WriteWithoutLength(message.Message.Value);
lastConsumedOffset = message.Offset;
eventsCount++;
}
}
Expand Down

0 comments on commit 1a6c6ad

Please sign in to comment.