From 41851044e9b850098405e45fefc6bce3a0cae519 Mon Sep 17 00:00:00 2001 From: John Yang Date: Thu, 7 Nov 2024 15:17:48 -0800 Subject: [PATCH] Lint --- src/lib.rs | 2 +- src/main.rs | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7c6ad4b..aab5c0f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -273,7 +273,7 @@ macro_rules! processing_strategy { )); } - let (commit_sender, commit_receiver) = crate::processing_strategy!( + let (commit_sender, commit_receiver) = $crate::processing_strategy!( @reducers, ($reduce_first $(,$reduce_rest)*), reduce_receiver, diff --git a/src/main.rs b/src/main.rs index ee38dab..bf5028d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use anyhow::{anyhow, Error}; use chrono::serde::ts_seconds; use chrono::{DateTime, Utc}; use kafka_map_reduce::reducers::clickhouse::ClickhouseBatchWriter; +use kafka_map_reduce::reducers::noop::NoopReducer; use kafka_map_reduce::reducers::os_stream::{OsStream, OsStreamWriter}; use kafka_map_reduce::{processing_strategy, start_consumer, ReduceShutdownBehaviour}; use rdkafka::{config::RDKafkaLogLevel, message::OwnedMessage, ClientConfig, Message}; @@ -102,6 +103,7 @@ async fn main() -> Result<(), Error> { .set_log_level(RDKafkaLogLevel::Debug), processing_strategy!({ map: parse, + reduce: ClickhouseBatchWriter::new( host, port, @@ -109,7 +111,10 @@ async fn main() -> Result<(), Error> { 128, Duration::from_secs(4), ReduceShutdownBehaviour::Flush, - ), + ) + => NoopReducer::new("hello") + => NoopReducer::new("world"), + err: OsStreamWriter::new( Duration::from_secs(1), OsStream::StdErr,