Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Nov 23, 2023
1 parent 343e661 commit b3dca12
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 11 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ uuid = { version = "1.3.3", features = ["serde"] }
async-trait = "0.1.68"
serde_urlencoded = "0.7.1"
rand = "0.8.5"
rdkafka = { version = "0.34", features = ["cmake-build", "ssl"] }
rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl"] }
metrics = "0.21.1"
metrics-exporter-prometheus = "0.12.1"
thiserror = "1.0.48"
Expand Down
2 changes: 1 addition & 1 deletion capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
kafka_producer_queue_mib: 10,
kafka_message_timeout_ms: 10000, // 10s, ACKs can be slow on low volumes
kafka_message_timeout_ms: 10000, // 10s, ACKs can be slow on low volumes, should be tuned
kafka_compression_codec: "none".to_string(),
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
Expand Down
133 changes: 128 additions & 5 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use std::time::Duration;
use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use rdkafka::config::ClientConfig;
use rdkafka::error::RDKafkaErrorCode;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::{DeliveryFuture, Producer};
use rdkafka::util::Timeout;
use tokio::task::JoinSet;
use tracing::{debug, error, info, instrument};
use tracing::instrument;
use tracing::log::{debug, error, info};

use crate::api::CaptureError;
use crate::config::KafkaConfig;
Expand Down Expand Up @@ -162,6 +163,11 @@ impl KafkaSink {
})
}

pub fn flush(&self) -> Result<(), KafkaError> {
// TODO: hook it up on shutdown
self.producer.flush(Duration::new(30, 0))
}

async fn kafka_send(
producer: FutureProducer<KafkaContext>,
topic: String,
Expand Down Expand Up @@ -208,6 +214,11 @@ impl KafkaSink {
error!("failed to produce to Kafka before write timeout");
Err(CaptureError::RetryableSinkError)
}
Ok(Err((KafkaError::MessageProduction(RDKafkaErrorCode::MessageSizeTooLarge), _))) => {
// Rejected by broker due to message size
report_dropped_events("kafka_message_size", 1);
Err(CaptureError::EventTooBig)
}
Ok(Err((err, _))) => {
// Unretriable produce error
counter!("capture_kafka_produce_errors_total", 1);
Expand Down Expand Up @@ -241,10 +252,10 @@ impl EventSink for KafkaSink {
let topic = self.topic.clone();
let limited = self.partition.is_limited(&event.key());

// We await kafka_send once to get events in the producer queue sequentially
// We await kafka_send to get events in the producer queue sequentially
let ack = Self::kafka_send(producer, topic, event, limited).await?;

// Then stash the returned DeliveryFuture, waiting for the write ACKs from brokers.
// Then stash the returned DeliveryFuture, waiting concurrently for the write ACKs from brokers.
set.spawn(Self::process_ack(ack));
}

Expand All @@ -257,7 +268,8 @@ impl EventSink for KafkaSink {
return Err(err);
}
Err(err) => {
error!("failed to produce to Kafka: {:?}", err);
set.abort_all();
error!("join error while waiting on Kafka ACK: {:?}", err);
return Err(CaptureError::RetryableSinkError);
}
}
Expand All @@ -267,3 +279,114 @@ impl EventSink for KafkaSink {
Ok(())
}
}

#[cfg(test)]
mod tests {
use crate::api::CaptureError;
use crate::config;
use crate::event::ProcessedEvent;
use crate::health::HealthRegistry;
use crate::partition_limits::PartitionLimiter;
use crate::sink::{EventSink, KafkaSink};
use crate::utils::uuid_v7;
use rdkafka::mocking::MockCluster;
use rdkafka::producer::DefaultProducerContext;
use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr};
use std::num::NonZeroU32;
use time::Duration;

async fn start_on_mocked_sink() -> (MockCluster<'static, DefaultProducerContext>, KafkaSink) {
let registry = HealthRegistry::new("liveness");
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;
let limiter =
PartitionLimiter::new(NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap());
let cluster = MockCluster::new(1).expect("failed to create mock brokers");
let config = config::KafkaConfig {
kafka_producer_linger_ms: 0,
kafka_producer_queue_mib: 50,
kafka_message_timeout_ms: 500,
kafka_compression_codec: "none".to_string(),
kafka_hosts: cluster.bootstrap_servers(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_tls: false,
};
let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink");
(cluster, sink)
}

#[tokio::test]
async fn kafka_sink_error_handling() {
// Uses a mocked Kafka broker that allows injecting write errors, to check error handling.
// We test different cases in a single test to amortize the startup cost of the producer.

let (cluster, sink) = start_on_mocked_sink().await;
let event: ProcessedEvent = ProcessedEvent {
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
data: "".to_string(),
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
};

// Wait for producer to be healthy, to keep kafka_message_timeout_ms short and tests faster
for _ in 0..20 {
if sink.send(event.clone()).await.is_ok() {
break;
}
}

// Send events to confirm happy path
sink.send(event.clone())
.await
.expect("failed to send one initial event");
sink.send_batch(vec![event.clone(), event.clone()])
.await
.expect("failed to send initial event batch");

// Simulate unretriable errors
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; 1];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
match sink.send(event.clone()).await {
Err(CaptureError::EventTooBig) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
};
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_INVALID_PARTITIONS; 1];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
match sink.send(event.clone()).await {
Err(CaptureError::RetryableSinkError) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
};

// Simulate transient errors, messages should go through OK
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
sink.send(event.clone())
.await
.expect("failed to send one event after recovery");
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
sink.send_batch(vec![event.clone(), event.clone()])
.await
.expect("failed to send event batch after recovery");

// Timeout on a sustained transient error
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 50];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
match sink.send(event.clone()).await {
Err(CaptureError::RetryableSinkError) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
};
}
}

0 comments on commit b3dca12

Please sign in to comment.