From 98f478286bea81606b1bd59359abdd65b5d67bb1 Mon Sep 17 00:00:00 2001 From: Gio Gutierrez Date: Wed, 6 Mar 2024 10:13:58 -0500 Subject: [PATCH] fix: Update with_options to include mqtt_common.rs --- src/connector/src/source/mqtt/source/reader.rs | 11 ++++------- src/connector/src/with_options_test.rs | 5 +++++ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/connector/src/source/mqtt/source/reader.rs b/src/connector/src/source/mqtt/source/reader.rs index 74e315b6cd656..50f90c816390c 100644 --- a/src/connector/src/source/mqtt/source/reader.rs +++ b/src/connector/src/source/mqtt/source/reader.rs @@ -25,9 +25,7 @@ use crate::error::ConnectorResult as Result; use crate::parser::ParserConfig; use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::mqtt::MqttProperties; -use crate::source::{ - self, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitReader, -}; +use crate::source::{BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitReader}; pub struct MqttSplitReader { eventloop: rumqttc::v5::EventLoop, @@ -51,10 +49,9 @@ impl SplitReader for MqttSplitReader { source_ctx: SourceContextRef, _columns: Option>, ) -> Result { - let (client, eventloop) = properties.common.build_client( - source_ctx.source_info.actor_id, - source_ctx.source_info.fragment_id as u64, - )?; + let (client, eventloop) = properties + .common + .build_client(source_ctx.actor_id, source_ctx.fragment_id as u64)?; let qos = properties.common.qos(); diff --git a/src/connector/src/with_options_test.rs b/src/connector/src/with_options_test.rs index 4ead1685244d8..fd234e880e469 100644 --- a/src/connector/src/with_options_test.rs +++ b/src/connector/src/with_options_test.rs @@ -38,6 +38,10 @@ fn common_mod_path() -> PathBuf { connector_crate_path().join("src").join("common.rs") } +fn mqtt_common_mod_path() -> PathBuf { + connector_crate_path().join("src").join("mqtt_common.rs") +} + pub fn generate_with_options_yaml_source() -> String { generate_with_options_yaml_inner(&source_mod_path()) } @@ -63,6 +67,7 @@ fn generate_with_options_yaml_inner(path: &Path) -> String { for entry in walkdir::WalkDir::new(path) .into_iter() .chain(walkdir::WalkDir::new(common_mod_path())) + .chain(walkdir::WalkDir::new(mqtt_common_mod_path())) { let entry = entry.expect("Failed to read directory entry"); if entry.path().extension() == Some("rs".as_ref()) {