Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
properly process Kafka produce ACKs to propagate errors
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Nov 21, 2023
1 parent f8a07fb commit 343e661
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 23 deletions.
1 change: 1 addition & 0 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
kafka_producer_queue_mib: 10,
kafka_message_timeout_ms: 10000, // 10s, ACKs can be slow on low volumes
kafka_compression_codec: "none".to_string(),
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
Expand Down
5 changes: 2 additions & 3 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,10 @@ pub async fn process_events<'a>(
tracing::debug!(events=?events, "processed {} events", events.len());

if events.len() == 1 {
sink.send(events[0].clone()).await?;
sink.send(events[0].clone()).await
} else {
sink.send_batch(events).await?;
sink.send_batch(events).await
}
Ok(())
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct KafkaConfig {
pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic
#[envconfig(default = "400")]
pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes
#[envconfig(default = "20000")]
pub kafka_message_timeout_ms: u32, // Time before we stop retrying producing a message: 20 seconds
#[envconfig(default = "none")]
pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd
pub kafka_hosts: String,
Expand Down
3 changes: 2 additions & 1 deletion capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ where
.await;

let partition = PartitionLimiter::new(config.per_second_limit, config.burst_limit);
let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition).unwrap();
let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

router::router(
crate::time::SystemTime {},
Expand Down
73 changes: 54 additions & 19 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use metrics::{absolute_counter, counter, gauge, histogram};
use rdkafka::config::ClientConfig;
use rdkafka::error::RDKafkaErrorCode;
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::Producer;
use rdkafka::producer::{DeliveryFuture, Producer};
use rdkafka::util::Timeout;
use tokio::task::JoinSet;
use tracing::{debug, info, instrument};
use tracing::{debug, error, info, instrument};

use crate::api::CaptureError;
use crate::config::KafkaConfig;
Expand Down Expand Up @@ -128,6 +128,10 @@ impl KafkaSink {
.set("bootstrap.servers", &config.kafka_hosts)
.set("statistics.interval.ms", "10000")
.set("linger.ms", config.kafka_producer_linger_ms.to_string())
.set(
"message.timeout.ms",
config.kafka_message_timeout_ms.to_string(),
)
.set("compression.codec", config.kafka_compression_codec)
.set(
"queue.buffering.max.kbytes",
Expand Down Expand Up @@ -157,17 +161,15 @@ impl KafkaSink {
topic: config.kafka_topic,
})
}
}

impl KafkaSink {
async fn kafka_send(
producer: FutureProducer<KafkaContext>,
topic: String,
event: ProcessedEvent,
limited: bool,
) -> Result<(), CaptureError> {
) -> Result<DeliveryFuture, CaptureError> {
let payload = serde_json::to_string(&event).map_err(|e| {
tracing::error!("failed to serialize event: {}", e);
error!("failed to serialize event: {}", e);
CaptureError::NonRetryableSinkError
})?;

Expand All @@ -182,7 +184,7 @@ impl KafkaSink {
timestamp: None,
headers: None,
}) {
Ok(_) => Ok(()),
Ok(ack) => Ok(ack),
Err((e, _)) => match e.rdkafka_error_code() {
Some(RDKafkaErrorCode::InvalidMessageSize) => {
report_dropped_events("kafka_message_size", 1);
Expand All @@ -191,25 +193,43 @@ impl KafkaSink {
_ => {
// TODO(maybe someday): Don't drop them but write them somewhere and try again
report_dropped_events("kafka_write_error", 1);
tracing::error!("failed to produce event: {}", e);
error!("failed to produce event: {}", e);
Err(CaptureError::RetryableSinkError)
}
},
}
}

async fn process_ack(delivery: DeliveryFuture) -> Result<(), CaptureError> {
match delivery.await {
Err(_) => {
// Cancelled due to timeout while retrying
counter!("capture_kafka_produce_errors_total", 1);
error!("failed to produce to Kafka before write timeout");
Err(CaptureError::RetryableSinkError)
}
Ok(Err((err, _))) => {
// Unretriable produce error
counter!("capture_kafka_produce_errors_total", 1);
error!("failed to produce to Kafka: {}", err);
Err(CaptureError::RetryableSinkError)
}
Ok(Ok(_)) => {
counter!("capture_events_ingested_total", 1);
Ok(())
}
}
}
}

#[async_trait]
impl EventSink for KafkaSink {
#[instrument(skip_all)]
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
let limited = self.partition.is_limited(&event.key());
Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?;

histogram!("capture_event_batch_size", 1.0);
counter!("capture_events_ingested_total", 1);

Ok(())
let ack =
Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?;
Self::process_ack(ack).await
}

#[instrument(skip_all)]
Expand All @@ -219,16 +239,31 @@ impl EventSink for KafkaSink {
for event in events {
let producer = self.producer.clone();
let topic = self.topic.clone();

let limited = self.partition.is_limited(&event.key());
set.spawn(Self::kafka_send(producer, topic, event, limited));

// We await kafka_send once to get events in the producer queue sequentially
let ack = Self::kafka_send(producer, topic, event, limited).await?;

// Then stash the returned DeliveryFuture, waiting for the write ACKs from brokers.
set.spawn(Self::process_ack(ack));
}

// Await on all the produce promises
while (set.join_next().await).is_some() {}
// Await on all the produce promises, fail batch on first failure
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(_)) => {}
Ok(Err(err)) => {
set.abort_all();
return Err(err);
}
Err(err) => {
error!("failed to produce to Kafka: {:?}", err);
return Err(CaptureError::RetryableSinkError);
}
}
}

histogram!("capture_event_batch_size", batch_size as f64);
counter!("capture_events_ingested_total", batch_size as u64);
Ok(())
}
}

0 comments on commit 343e661

Please sign in to comment.