Skip to content

Commit

Permalink
fix: Update with_options to include mqtt_common.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
bakjos committed Mar 11, 2024
1 parent 45f2c20 commit 98f4782
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
11 changes: 4 additions & 7 deletions src/connector/src/source/mqtt/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use crate::error::ConnectorResult as Result;
use crate::parser::ParserConfig;
use crate::source::common::{into_chunk_stream, CommonSplitReader};
use crate::source::mqtt::MqttProperties;
use crate::source::{
self, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitReader,
};
use crate::source::{BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitReader};

pub struct MqttSplitReader {
eventloop: rumqttc::v5::EventLoop,
Expand All @@ -51,10 +49,9 @@ impl SplitReader for MqttSplitReader {
source_ctx: SourceContextRef,
_columns: Option<Vec<Column>>,
) -> Result<Self> {
let (client, eventloop) = properties.common.build_client(
source_ctx.source_info.actor_id,
source_ctx.source_info.fragment_id as u64,
)?;
let (client, eventloop) = properties
.common
.build_client(source_ctx.actor_id, source_ctx.fragment_id as u64)?;

let qos = properties.common.qos();

Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/with_options_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ fn common_mod_path() -> PathBuf {
connector_crate_path().join("src").join("common.rs")
}

fn mqtt_common_mod_path() -> PathBuf {
connector_crate_path().join("src").join("mqtt_common.rs")
}

pub fn generate_with_options_yaml_source() -> String {
generate_with_options_yaml_inner(&source_mod_path())
}
Expand All @@ -63,6 +67,7 @@ fn generate_with_options_yaml_inner(path: &Path) -> String {
for entry in walkdir::WalkDir::new(path)
.into_iter()
.chain(walkdir::WalkDir::new(common_mod_path()))
.chain(walkdir::WalkDir::new(mqtt_common_mod_path()))
{
let entry = entry.expect("Failed to read directory entry");
if entry.path().extension() == Some("rs".as_ref()) {
Expand Down

0 comments on commit 98f4782

Please sign in to comment.