diff --git a/Cargo.lock b/Cargo.lock index 8e3b873..6bf02df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1531,9 +1531,9 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.34.0" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "053adfa02fab06e86c01d586cc68aa47ee0ff4489a59469081dc12cbcde578bf" +checksum = "d54f02a5a40220f8a2dfa47ddb38ba9064475a5807a69504b6f91711df2eea63" dependencies = [ "futures-channel", "futures-util", @@ -1549,9 +1549,9 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "4.6.0+2.2.0" +version = "4.7.0+2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad63c279fca41a27c231c450a2d2ad18288032e9cbb159ad16c9d96eba35aaaf" +checksum = "55e0d2f9ba6253f6ec72385e453294f8618e9e15c2c6aba2a5c01ccf9622d615" dependencies = [ "cmake", "libc", diff --git a/Cargo.toml b/Cargo.toml index 5a0d501..89ed49d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ uuid = { version = "1.3.3", features = ["serde"] } async-trait = "0.1.68" serde_urlencoded = "0.7.1" rand = "0.8.5" -rdkafka = { version = "0.34", features = ["cmake-build", "ssl"] } +rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl"] } metrics = "0.21.1" metrics-exporter-prometheus = "0.12.1" thiserror = "1.0.48" diff --git a/capture-server/tests/common.rs b/capture-server/tests/common.rs index 8527954..befc7b8 100644 --- a/capture-server/tests/common.rs +++ b/capture-server/tests/common.rs @@ -32,7 +32,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { kafka: KafkaConfig { kafka_producer_linger_ms: 0, // Send messages as soon as possible kafka_producer_queue_mib: 10, - kafka_message_timeout_ms: 10000, // 10s, ACKs can be slow on low volumes + kafka_message_timeout_ms: 10000, // 10s, ACKs can be slow on low volumes, should be tuned kafka_compression_codec: "none".to_string(), kafka_hosts: "kafka:9092".to_string(), kafka_topic: "events_plugin_ingestion".to_string(), diff --git a/capture/src/sink.rs b/capture/src/sink.rs index 8a23fbc..4eaca05 100644 --- a/capture/src/sink.rs +++ b/capture/src/sink.rs @@ -3,12 +3,13 @@ use std::time::Duration; use async_trait::async_trait; use metrics::{absolute_counter, counter, gauge, histogram}; use rdkafka::config::ClientConfig; -use rdkafka::error::RDKafkaErrorCode; +use rdkafka::error::{KafkaError, RDKafkaErrorCode}; use rdkafka::producer::future_producer::{FutureProducer, FutureRecord}; use rdkafka::producer::{DeliveryFuture, Producer}; use rdkafka::util::Timeout; use tokio::task::JoinSet; -use tracing::{debug, error, info, instrument}; +use tracing::instrument; +use tracing::log::{debug, error, info}; use crate::api::CaptureError; use crate::config::KafkaConfig; @@ -162,6 +163,11 @@ impl KafkaSink { }) } + pub fn flush(&self) -> Result<(), KafkaError> { + // TODO: hook it up on shutdown + self.producer.flush(Duration::new(30, 0)) + } + async fn kafka_send( producer: FutureProducer, topic: String, @@ -208,6 +214,11 @@ impl KafkaSink { error!("failed to produce to Kafka before write timeout"); Err(CaptureError::RetryableSinkError) } + Ok(Err((KafkaError::MessageProduction(RDKafkaErrorCode::MessageSizeTooLarge), _))) => { + // Rejected by broker due to message size + report_dropped_events("kafka_message_size", 1); + Err(CaptureError::EventTooBig) + } Ok(Err((err, _))) => { // Unretriable produce error counter!("capture_kafka_produce_errors_total", 1); @@ -241,10 +252,10 @@ impl EventSink for KafkaSink { let topic = self.topic.clone(); let limited = self.partition.is_limited(&event.key()); - // We await kafka_send once to get events in the producer queue sequentially + // We await kafka_send to get events in the producer queue sequentially let ack = Self::kafka_send(producer, topic, event, limited).await?; - // Then stash the returned DeliveryFuture, waiting for the write ACKs from brokers. + // Then stash the returned DeliveryFuture, waiting concurrently for the write ACKs from brokers. set.spawn(Self::process_ack(ack)); } @@ -257,7 +268,8 @@ impl EventSink for KafkaSink { return Err(err); } Err(err) => { - error!("failed to produce to Kafka: {:?}", err); + set.abort_all(); + error!("join error while waiting on Kafka ACK: {:?}", err); return Err(CaptureError::RetryableSinkError); } } @@ -267,3 +279,124 @@ impl EventSink for KafkaSink { Ok(()) } } + +#[cfg(test)] +mod tests { + use crate::api::CaptureError; + use crate::config; + use crate::event::ProcessedEvent; + use crate::health::HealthRegistry; + use crate::partition_limits::PartitionLimiter; + use crate::sink::{EventSink, KafkaSink}; + use crate::utils::uuid_v7; + use rdkafka::mocking::MockCluster; + use rdkafka::producer::DefaultProducerContext; + use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr}; + use std::num::NonZeroU32; + use time::Duration; + + async fn start_on_mocked_sink() -> (MockCluster<'static, DefaultProducerContext>, KafkaSink) { + let registry = HealthRegistry::new("liveness"); + let handle = registry + .register("one".to_string(), Duration::seconds(30)) + .await; + let limiter = + PartitionLimiter::new(NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap()); + let cluster = MockCluster::new(1).expect("failed to create mock brokers"); + let config = config::KafkaConfig { + kafka_producer_linger_ms: 0, + kafka_producer_queue_mib: 50, + kafka_message_timeout_ms: 250, + kafka_compression_codec: "none".to_string(), + kafka_hosts: cluster.bootstrap_servers(), + kafka_topic: "events_plugin_ingestion".to_string(), + kafka_tls: false, + }; + let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink"); + (cluster, sink) + } + + #[tokio::test] + async fn kafka_sink_error_handling() { + // Uses a mocked Kafka cluster of three brokers that allows injecting write errors. + // TODO: doc + + let event1: ProcessedEvent = ProcessedEvent { + uuid: uuid_v7(), + distinct_id: "id1".to_string(), + ip: "".to_string(), + data: "".to_string(), + now: "".to_string(), + sent_at: None, + token: "token1".to_string(), + }; + let event2: ProcessedEvent = ProcessedEvent { + uuid: uuid_v7(), + distinct_id: "id2".to_string(), + ip: "".to_string(), + data: "".to_string(), + now: "".to_string(), + sent_at: None, + token: "token2".to_string(), + }; + let (cluster, sink) = start_on_mocked_sink().await; + + // Wait for producer to be healthy, to keep kafka_message_timeout_ms short and tests faster + for _ in 0..20 { + if sink.send(event1.clone()).await.is_ok() { + break; + } + } + + // Send events to confirm happy path + sink.send(event1.clone()) + .await + .expect("failed to send one initial event"); + sink.send_batch(vec![event1.clone(), event2.clone()]) + .await + .expect("failed to send initial event batch"); + + // Simulate unretriable errors + cluster.clear_request_errors(RDKafkaApiKey::Produce); + let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; 1]; + cluster.request_errors(RDKafkaApiKey::Produce, &err); + match sink.send(event1.clone()).await { + Err(CaptureError::EventTooBig) => {} // Expected + Err(err) => panic!("wrong error code {}", err), + Ok(()) => panic!("should have errored"), + }; + + cluster.clear_request_errors(RDKafkaApiKey::Produce); + let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_INVALID_PARTITIONS; 1]; + cluster.request_errors(RDKafkaApiKey::Produce, &err); + match sink.send(event1.clone()).await { + Err(CaptureError::RetryableSinkError) => {} // Expected + Err(err) => panic!("wrong error code {}", err), + Ok(()) => panic!("should have errored"), + }; + + // Simulate transient errors, messages should go through OK + cluster.clear_request_errors(RDKafkaApiKey::Produce); + let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2]; + cluster.request_errors(RDKafkaApiKey::Produce, &err); + sink.send(event1.clone()) + .await + .expect("failed to send one event after recovery"); + cluster.clear_request_errors(RDKafkaApiKey::Produce); + let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2]; + cluster.request_errors(RDKafkaApiKey::Produce, &err); + sink.send_batch(vec![event1.clone(), event2.clone()]) + .await + .expect("failed to send event batch after recovery"); + + // Timeout on transient errors + cluster.clear_request_errors(RDKafkaApiKey::Produce); + let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 10000]; + cluster.request_errors(RDKafkaApiKey::Produce, &err); + match sink.send(event1.clone()).await { + Err(CaptureError::RetryableSinkError) => {} // Expected + Err(err) => panic!("wrong error code {}", err), + Ok(()) => panic!("should have errored"), + }; + } +}