diff --git a/src/connector/src/sink/mqtt.rs b/src/connector/src/sink/mqtt.rs index 040e62bfe9634..ab162b4a1a621 100644 --- a/src/connector/src/sink/mqtt.rs +++ b/src/connector/src/sink/mqtt.rs @@ -178,7 +178,6 @@ impl MqttSinkWriter { let stopped = Arc::new(AtomicBool::new(false)); let stopped_clone = stopped.clone(); - tokio::spawn(async move { while !stopped_clone.load(std::sync::atomic::Ordering::Relaxed) { match eventloop.poll().await { @@ -195,7 +194,6 @@ impl MqttSinkWriter { continue; } err => { - println!("Err: {:?}", err); tracing::error!("Failed to poll mqtt eventloop: {}", err.as_report()); std::thread::sleep(std::time::Duration::from_secs(1)); } diff --git a/src/connector/src/source/mqtt/source/reader.rs b/src/connector/src/source/mqtt/source/reader.rs index cdc88f7702a86..aef4311dddcfe 100644 --- a/src/connector/src/source/mqtt/source/reader.rs +++ b/src/connector/src/source/mqtt/source/reader.rs @@ -49,9 +49,10 @@ impl SplitReader for MqttSplitReader { source_ctx: SourceContextRef, _columns: Option>, ) -> Result { - let (client, eventloop) = properties - .common - .build_client(source_ctx.source_info.actor_id, source_ctx.source_info.fragment_id)?; + let (client, eventloop) = properties.common.build_client( + source_ctx.source_info.actor_id, + source_ctx.source_info.fragment_id, + )?; let qos = properties.common.qos();