Skip to content

Commit

Permalink
Lint
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang committed Nov 7, 2024
1 parent f11ce16 commit 4185104
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ macro_rules! processing_strategy {
));
}

let (commit_sender, commit_receiver) = crate::processing_strategy!(
let (commit_sender, commit_receiver) = $crate::processing_strategy!(
@reducers,
($reduce_first $(,$reduce_rest)*),
reduce_receiver,
Expand Down
7 changes: 6 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::{anyhow, Error};
use chrono::serde::ts_seconds;
use chrono::{DateTime, Utc};
use kafka_map_reduce::reducers::clickhouse::ClickhouseBatchWriter;
use kafka_map_reduce::reducers::noop::NoopReducer;
use kafka_map_reduce::reducers::os_stream::{OsStream, OsStreamWriter};
use kafka_map_reduce::{processing_strategy, start_consumer, ReduceShutdownBehaviour};
use rdkafka::{config::RDKafkaLogLevel, message::OwnedMessage, ClientConfig, Message};
Expand Down Expand Up @@ -102,14 +103,18 @@ async fn main() -> Result<(), Error> {
.set_log_level(RDKafkaLogLevel::Debug),
processing_strategy!({
map: parse,

reduce: ClickhouseBatchWriter::new(
host,
port,
table,
128,
Duration::from_secs(4),
ReduceShutdownBehaviour::Flush,
),
)
=> NoopReducer::new("hello")
=> NoopReducer::new("world"),

err: OsStreamWriter::new(
Duration::from_secs(1),
OsStream::StdErr,
Expand Down

0 comments on commit 4185104

Please sign in to comment.