From df0aa3a3851168c45124d3a09aaefde0952d38a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giovanny=20Guti=C3=A9rrez?= Date: Sun, 7 Apr 2024 02:41:28 -0500 Subject: [PATCH] feat(mqtt): Allow using field as topic name (#15673) --- ci/docker-compose.yml | 14 + ci/scripts/e2e-mqtt-sink-test.sh | 35 ++ ci/workflows/main-cron.yml | 19 + ci/workflows/pull-request.yml | 15 + e2e_test/sink/mqtt_sink.slt | 115 +++++ .../src/connector_common/mqtt_common.rs | 3 - src/connector/src/sink/mqtt.rs | 413 ++++++++++++++++-- .../src/source/mqtt/enumerator/mod.rs | 4 +- src/connector/src/source/mqtt/mod.rs | 3 + src/connector/with_options_sink.yaml | 11 +- src/connector/with_options_source.yaml | 8 +- src/frontend/src/handler/create_sink.rs | 4 +- 12 files changed, 582 insertions(+), 62 deletions(-) create mode 100755 ci/scripts/e2e-mqtt-sink-test.sh create mode 100644 e2e_test/sink/mqtt_sink.slt diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 498a0420763be..0beb2ff5ac8ac 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -93,6 +93,7 @@ services: - clickhouse-server - redis-server - pulsar-server + - mqtt-server - cassandra-server - doris-server - starrocks-fe-server @@ -329,3 +330,16 @@ services: MONGO_HOST: mongodb MONGO_PORT: 27017 MONGO_DB_NAME: random_data + mqtt-server: + image: eclipse-mosquitto + command: + - sh + - -c + - echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf + ports: + - 1883:1883 + healthcheck: + test: ["CMD-SHELL", "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0"] + interval: 10s + timeout: 10s + retries: 6 diff --git a/ci/scripts/e2e-mqtt-sink-test.sh b/ci/scripts/e2e-mqtt-sink-test.sh new file mode 100755 index 0000000000000..376de9e4084c8 --- /dev/null +++ b/ci/scripts/e2e-mqtt-sink-test.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +source ci/scripts/common.sh + +while getopts 'p:' opt; do + case ${opt} in + p ) + profile=$OPTARG + ;; + \? ) + echo "Invalid Option: -$OPTARG" 1>&2 + exit 1 + ;; + : ) + echo "Invalid option: $OPTARG requires an argument" 1>&2 + ;; + esac +done +shift $((OPTIND -1)) + +download_and_prepare_rw "$profile" source + +echo "--- starting risingwave cluster" +cargo make ci-start ci-sink-test +sleep 1 + +set -euo pipefail + +echo "--- testing mqtt sink" +sqllogictest -p 4566 -d dev './e2e_test/sink/mqtt_sink.slt' + +sleep 1 + +echo "--- Kill cluster" +cargo make ci-kill \ No newline at end of file diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 327a73ba0ebad..586d56e8fb648 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -991,6 +991,25 @@ steps: timeout_in_minutes: 10 retry: *auto-retry + - label: "end-to-end mqtt sink test" + key: "e2e-mqtt-sink-tests" + command: "ci/scripts/e2e-mqtt-sink-test.sh -p ci-release" + if: | + !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-e2e-mqtt-sink-tests" + || build.env("CI_STEPS") =~ /(^|,)e2e-mqtt-sink-tests?(,|$$)/ + depends_on: + - "build" + - "build-other" + plugins: + - docker-compose#v5.1.0: + run: sink-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 10 + retry: *auto-retry + - label: "connector node integration test Java {{matrix.java_version}}" key: "connector-node-integration-test" command: "ci/scripts/connector-node-integration-test.sh -p ci-release -v {{matrix.java_version}}" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index dcb36a9875a24..9902b555df638 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -263,6 +263,21 @@ steps: timeout_in_minutes: 10 retry: *auto-retry + - label: "end-to-end mqtt sink test" + if: build.pull_request.labels includes "ci/run-e2e-mqtt-sink-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-mqtt-sink-tests?(,|$$)/ + command: "ci/scripts/e2e-mqtt-sink-test.sh -p ci-dev" + depends_on: + - "build" + - "build-other" + plugins: + - docker-compose#v5.1.0: + run: sink-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 10 + retry: *auto-retry + - label: "end-to-end clickhouse sink test" if: build.pull_request.labels includes "ci/run-e2e-clickhouse-sink-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-clickhouse-sink-tests?(,|$$)/ command: "ci/scripts/e2e-clickhouse-sink-test.sh -p ci-dev" diff --git a/e2e_test/sink/mqtt_sink.slt b/e2e_test/sink/mqtt_sink.slt new file mode 100644 index 0000000000000..d19addb024c97 --- /dev/null +++ b/e2e_test/sink/mqtt_sink.slt @@ -0,0 +1,115 @@ +statement ok +CREATE TABLE mqtt ( + device_id varchar, + temperature double, + topic varchar as '/device/' || device_id +); + +statement ok +CREATE TABLE mqtt_nested ( + info struct, + temperature double +); + +statement ok +CREATE SINK mqtt_sink +FROM + mqtt +WITH + ( + connector='mqtt', + url='tcp://mqtt-server', + type = 'append-only', + topic.field = 'topic', + retain = 'true', + qos = 'at_least_once', + ) FORMAT PLAIN ENCODE JSON ( + force_append_only='true', + ); + +statement ok +CREATE SINK mqtt_nested_sink +FROM + mqtt_nested +WITH + ( + connector='mqtt', + url='tcp://mqtt-server', + type = 'append-only', + topic = '/nested/fallback', + topic.field = 'info.topic', + retain = 'true', + qos = 'at_least_once', + ) FORMAT PLAIN ENCODE JSON ( + force_append_only='true', + ); + + +statement ok +CREATE TABLE mqtt_source +( + device_id varchar, + temperature double +) +WITH ( + connector='mqtt', + url='tcp://mqtt-server', + topic= '/device/+', + qos = 'at_least_once', +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE TABLE mqtt_nested_source +( + info struct, + temperature double +) +WITH ( + connector='mqtt', + url='tcp://mqtt-server', + topic= '/nested/fallback', + qos = 'at_least_once', +) FORMAT PLAIN ENCODE JSON; + + +statement ok +INSERT INTO mqtt (device_id, temperature) +VALUES ( '12', 56.0 ); + +statement ok +INSERT INTO mqtt (device_id, temperature) +VALUES ( '12', 59.0 ); + +statement ok +INSERT INTO mqtt (device_id, temperature) +VALUES ( '13', 20.0 ); + +statement ok +INSERT INTO mqtt (device_id, temperature) +VALUES ( '13', 22.0 ); + +statement ok +INSERT INTO mqtt_nested (info, temperature) +VALUES( ROW('12', '/nested/12'), 56.0 ); + +statement ok +INSERT INTO mqtt_nested (info, temperature) +VALUES( ROW('13', null), 22.0 ); + +statement ok +FLUSH; + +sleep 15s + +query IT rowsort +SELECT device_id, temperature FROM mqtt ORDER BY device_id, temperature; +---- +12 56 +12 59 +13 20 +13 22 + +query IT rowsort +SELECT (info).device_id device_id, temperature from mqtt_nested_source ORDER BY device_id, temperature ; +---- +13 22 diff --git a/src/connector/src/connector_common/mqtt_common.rs b/src/connector/src/connector_common/mqtt_common.rs index e607decff58aa..c967cf215fd2d 100644 --- a/src/connector/src/connector_common/mqtt_common.rs +++ b/src/connector/src/connector_common/mqtt_common.rs @@ -42,9 +42,6 @@ pub struct MqttCommon { /// `mqtts://`, `ssl://` will use the native certificates if no ca is specified pub url: String, - /// The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/# - pub topic: String, - /// The quality of service to use when publishing messages. Defaults to at_most_once. /// Could be at_most_once, at_least_once or exactly_once #[serde_as(as = "Option")] diff --git a/src/connector/src/sink/mqtt.rs b/src/connector/src/sink/mqtt.rs index 24e6b664ce43f..308ec450878a9 100644 --- a/src/connector/src/sink/mqtt.rs +++ b/src/connector/src/sink/mqtt.rs @@ -17,9 +17,11 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use anyhow::{anyhow, Context as _}; -use risingwave_common::array::StreamChunk; +use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; +use risingwave_common::row::Row; use risingwave_common::session_config::sink_decouple::SinkDecouple; +use risingwave_common::types::{DataType, ScalarRefImpl}; use rumqttc::v5::mqttbytes::QoS; use rumqttc::v5::ConnectionError; use serde_derive::Deserialize; @@ -27,18 +29,19 @@ use serde_with::serde_as; use thiserror_ext::AsReport; use with_options::WithOptions; -use super::catalog::SinkFormatDesc; -use super::formatter::SinkFormatterImpl; -use super::writer::FormattedSink; +use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; +use super::encoder::{ + DateHandlingMode, JsonEncoder, ProtoEncoder, ProtoHeader, RowEncoder, SerTo, TimeHandlingMode, + TimestampHandlingMode, TimestamptzHandlingMode, +}; +use super::writer::AsyncTruncateSinkWriterExt; use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::connector_common::MqttCommon; +use crate::deserialize_bool_from_string; use crate::sink::catalog::desc::SinkDesc; use crate::sink::log_store::DeliveryFutureManagerAddFuture; -use crate::sink::writer::{ - AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, -}; +use crate::sink::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter}; use crate::sink::{Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY}; -use crate::{deserialize_bool_from_string, dispatch_sink_formatter_impl}; pub const MQTT_SINK: &str = "mqtt"; @@ -48,23 +51,69 @@ pub struct MqttConfig { #[serde(flatten)] pub common: MqttCommon, + /// The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/# + pub topic: Option, + /// Whether the message should be retained by the broker #[serde(default, deserialize_with = "deserialize_bool_from_string")] pub retain: bool, // accept "append-only" pub r#type: String, + + // if set, will use a field value as the topic name, if topic is also set it will be used as a fallback + #[serde(rename = "topic.field")] + pub topic_field: Option, +} + +pub enum RowEncoderWrapper { + Json(JsonEncoder), + Proto(ProtoEncoder), +} + +impl RowEncoder for RowEncoderWrapper { + type Output = Vec; + + fn encode_cols( + &self, + row: impl Row, + col_indices: impl Iterator, + ) -> Result { + match self { + RowEncoderWrapper::Json(json) => json.encode_cols(row, col_indices)?.ser_to(), + RowEncoderWrapper::Proto(proto) => proto.encode_cols(row, col_indices)?.ser_to(), + } + } + + fn schema(&self) -> &Schema { + match self { + RowEncoderWrapper::Json(json) => json.schema(), + RowEncoderWrapper::Proto(proto) => proto.schema(), + } + } + + fn col_indices(&self) -> Option<&[usize]> { + match self { + RowEncoderWrapper::Json(json) => json.col_indices(), + RowEncoderWrapper::Proto(proto) => proto.col_indices(), + } + } + + fn encode(&self, row: impl Row) -> Result { + match self { + RowEncoderWrapper::Json(json) => json.encode(row)?.ser_to(), + RowEncoderWrapper::Proto(proto) => proto.encode(row)?.ser_to(), + } + } } #[derive(Clone, Debug)] pub struct MqttSink { pub config: MqttConfig, schema: Schema, - pk_indices: Vec, format_desc: SinkFormatDesc, - db_name: String, - sink_from_name: String, is_append_only: bool, + name: String, } // sink write @@ -72,7 +121,7 @@ pub struct MqttSinkWriter { pub config: MqttConfig, payload_writer: MqttSinkPayloadWriter, schema: Schema, - formatter: SinkFormatterImpl, + encoder: RowEncoderWrapper, stopped: Arc, } @@ -100,12 +149,10 @@ impl TryFrom for MqttSink { Ok(Self { config, schema, - pk_indices: param.downstream_pk, + name: param.sink_name, format_desc: param .format_desc .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?, - db_name: param.db_name, - sink_from_name: param.sink_from_name, is_append_only: param.sink_type.is_append_only(), }) } @@ -132,6 +179,14 @@ impl Sink for MqttSink { ))); } + if let Some(field) = &self.config.topic_field { + let _ = get_topic_field_index_path(&self.schema, field.as_str())?; + } else if self.config.topic.is_none() { + return Err(SinkError::Config(anyhow!( + "either topic or topic.field must be set" + ))); + } + let _client = (self.config.common.build_client(0, 0)) .context("validate mqtt sink error") .map_err(SinkError::Mqtt)?; @@ -143,10 +198,8 @@ impl Sink for MqttSink { Ok(MqttSinkWriter::new( self.config.clone(), self.schema.clone(), - self.pk_indices.clone(), &self.format_desc, - self.db_name.clone(), - self.sink_from_name.clone(), + &self.name, writer_param.executor_id, ) .await? @@ -158,22 +211,59 @@ impl MqttSinkWriter { pub async fn new( config: MqttConfig, schema: Schema, - pk_indices: Vec, format_desc: &SinkFormatDesc, - db_name: String, - sink_from_name: String, + name: &str, id: u64, ) -> Result { - let formatter = SinkFormatterImpl::new( - format_desc, - schema.clone(), - pk_indices.clone(), - db_name, - sink_from_name, - &config.common.topic, - ) - .await?; + let mut topic_index_path = vec![]; + if let Some(field) = &config.topic_field { + topic_index_path = get_topic_field_index_path(&schema, field.as_str())?; + } + let timestamptz_mode = TimestamptzHandlingMode::from_options(&format_desc.options)?; + + let encoder = match format_desc.format { + SinkFormat::AppendOnly => match format_desc.encode { + SinkEncode::Json => RowEncoderWrapper::Json(JsonEncoder::new( + schema.clone(), + None, + DateHandlingMode::FromCe, + TimestampHandlingMode::Milli, + timestamptz_mode, + TimeHandlingMode::Milli, + )), + SinkEncode::Protobuf => { + let (descriptor, sid) = crate::schema::protobuf::fetch_descriptor( + &format_desc.options, + config.topic.as_deref().unwrap_or(name), + None, + ) + .await + .map_err(|e| SinkError::Config(anyhow!(e)))?; + let header = match sid { + None => ProtoHeader::None, + Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid), + }; + RowEncoderWrapper::Proto(ProtoEncoder::new( + schema.clone(), + None, + descriptor, + header, + )?) + } + _ => { + return Err(SinkError::Config(anyhow!( + "mqtt sink encode unsupported: {:?}", + format_desc.encode, + ))) + } + }, + _ => { + return Err(SinkError::Config(anyhow!( + "Mqtt sink only support append-only mode" + ))) + } + }; let qos = config.common.qos(); let (client, mut eventloop) = config @@ -206,10 +296,11 @@ impl MqttSinkWriter { }); let payload_writer = MqttSinkPayloadWriter { + topic: config.topic.clone(), client, - config: config.clone(), qos, retain: config.retain, + topic_index_path, }; Ok::<_, SinkError>(Self { @@ -217,7 +308,7 @@ impl MqttSinkWriter { payload_writer, schema: schema.clone(), stopped, - formatter, + encoder, }) } } @@ -228,9 +319,7 @@ impl AsyncTruncateSinkWriter for MqttSinkWriter { chunk: StreamChunk, _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - dispatch_sink_formatter_impl!(&self.formatter, formatter, { - self.payload_writer.write_chunk(chunk, formatter).await - }) + self.payload_writer.write_chunk(chunk, &self.encoder).await } } @@ -244,24 +333,256 @@ impl Drop for MqttSinkWriter { struct MqttSinkPayloadWriter { // connection to mqtt, one per executor client: rumqttc::v5::AsyncClient, - config: MqttConfig, + topic: Option, qos: QoS, retain: bool, + topic_index_path: Vec, } -impl FormattedSink for MqttSinkPayloadWriter { - type K = Vec; - type V = Vec; +impl MqttSinkPayloadWriter { + async fn write_chunk(&mut self, chunk: StreamChunk, encoder: &RowEncoderWrapper) -> Result<()> { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + + let topic = match get_topic_from_index_path( + &self.topic_index_path, + self.topic.as_deref(), + &row, + ) { + Some(s) => s, + None => { + tracing::error!("topic field not found in row, skipping: {:?}", row); + return Ok(()); + } + }; - async fn write_one(&mut self, _k: Option, v: Option) -> Result<()> { - match v { - Some(v) => self - .client - .publish(&self.config.common.topic, self.qos, self.retain, v) + let v = encoder.encode(row)?; + + self.client + .publish(topic, self.qos, self.retain, v) .await .context("mqtt sink error") - .map_err(SinkError::Mqtt), - None => Ok(()), + .map_err(SinkError::Mqtt)?; + } + + Ok(()) + } +} + +fn get_topic_from_index_path<'s>( + path: &[usize], + default_topic: Option<&'s str>, + row: &'s RowRef<'s>, +) -> Option<&'s str> { + if let Some(topic) = default_topic + && path.is_empty() + { + Some(topic) + } else { + let mut iter = path.iter(); + let scalar = iter + .next() + .and_then(|pos| row.datum_at(*pos)) + .and_then(|d| { + iter.try_fold(d, |d, pos| match d { + ScalarRefImpl::Struct(struct_ref) => { + struct_ref.iter_fields_ref().nth(*pos).flatten() + } + _ => None, + }) + }); + match scalar { + Some(ScalarRefImpl::Utf8(s)) => Some(s), + _ => { + if let Some(topic) = default_topic { + Some(topic) + } else { + None + } + } } } } + +// This function returns the index path to the topic field in the schema, validating that the field exists and is of type string +// the returnent path can be used to extract the topic field from a row. The path is a list of indexes to be used to navigate the row +// to the topic field. +fn get_topic_field_index_path(schema: &Schema, topic_field: &str) -> Result> { + let mut iter = topic_field.split('.'); + let mut path = vec![]; + let dt = + iter.next() + .and_then(|field| { + // Extract the field from the schema + schema + .fields() + .iter() + .enumerate() + .find(|(_, f)| f.name == field) + .map(|(pos, f)| { + path.push(pos); + &f.data_type + }) + }) + .and_then(|dt| { + // Iterate over the next fields to extract the fields from the nested structs + iter.try_fold(dt, |dt, field| match dt { + DataType::Struct(st) => { + st.iter().enumerate().find(|(_, (s, _))| *s == field).map( + |(pos, (_, dt))| { + path.push(pos); + dt + }, + ) + } + _ => None, + }) + }); + + match dt { + Some(DataType::Varchar) => Ok(path), + Some(dt) => Err(SinkError::Config(anyhow!( + "topic field `{}` must be of type string but got {:?}", + topic_field, + dt + ))), + None => Err(SinkError::Config(anyhow!( + "topic field `{}` not found", + topic_field + ))), + } +} + +#[cfg(test)] +mod test { + use risingwave_common::array::{DataChunk, DataChunkTestExt, RowRef}; + use risingwave_common::catalog::{Field, Schema}; + use risingwave_common::types::{DataType, StructType}; + + use super::{get_topic_field_index_path, get_topic_from_index_path}; + + #[test] + fn test_single_field_extraction() { + let schema = Schema::new(vec![Field::with_name(DataType::Varchar, "topic")]); + let path = get_topic_field_index_path(&schema, "topic").unwrap(); + assert_eq!(path, vec![0]); + + let chunk = DataChunk::from_pretty( + "T + test", + ); + + let row = RowRef::new(&chunk, 0); + + assert_eq!(get_topic_from_index_path(&path, None, &row), Some("test")); + + let result = get_topic_field_index_path(&schema, "other_field"); + assert!(result.is_err()); + } + + #[test] + fn test_nested_field_extraction() { + let schema = Schema::new(vec![Field::with_name( + DataType::Struct(StructType::new(vec![ + ("field", DataType::Int32), + ("subtopic", DataType::Varchar), + ])), + "topic", + )]); + let path = get_topic_field_index_path(&schema, "topic.subtopic").unwrap(); + assert_eq!(path, vec![0, 1]); + + let chunk = DataChunk::from_pretty( + " + (1,test)", + ); + + let row = RowRef::new(&chunk, 0); + + assert_eq!(get_topic_from_index_path(&path, None, &row), Some("test")); + + let result = get_topic_field_index_path(&schema, "topic.other_field"); + assert!(result.is_err()); + } + + #[test] + fn test_null_values_extraction() { + let path = vec![0]; + let chunk = DataChunk::from_pretty( + "T + .", + ); + let row = RowRef::new(&chunk, 0); + assert_eq!( + get_topic_from_index_path(&path, Some("default"), &row), + Some("default") + ); + assert_eq!(get_topic_from_index_path(&path, None, &row), None); + + let path = vec![0, 1]; + let chunk = DataChunk::from_pretty( + " + (1,)", + ); + let row = RowRef::new(&chunk, 0); + assert_eq!( + get_topic_from_index_path(&path, Some("default"), &row), + Some("default") + ); + assert_eq!(get_topic_from_index_path(&path, None, &row), None); + } + + #[test] + fn test_multiple_levels() { + let schema = Schema::new(vec![ + Field::with_name( + DataType::Struct(StructType::new(vec![ + ("field", DataType::Int32), + ( + "subtopic", + DataType::Struct(StructType::new(vec![ + ("int_field", DataType::Int32), + ("boolean_field", DataType::Boolean), + ("string_field", DataType::Varchar), + ])), + ), + ])), + "topic", + ), + Field::with_name(DataType::Varchar, "other_field"), + ]); + + let path = get_topic_field_index_path(&schema, "topic.subtopic.string_field").unwrap(); + assert_eq!(path, vec![0, 1, 2]); + + assert!(get_topic_field_index_path(&schema, "topic.subtopic.boolean_field").is_err()); + + assert!(get_topic_field_index_path(&schema, "topic.subtopic.int_field").is_err()); + + assert!(get_topic_field_index_path(&schema, "topic.field").is_err()); + + let path = get_topic_field_index_path(&schema, "other_field").unwrap(); + assert_eq!(path, vec![1]); + + let chunk = DataChunk::from_pretty( + "> T + (1,(test)) other", + ); + + let row = RowRef::new(&chunk, 0); + + // topic.subtopic.string_field + assert_eq!( + get_topic_from_index_path(&[0, 1, 0], None, &row), + Some("test") + ); + + // topic.field + assert_eq!(get_topic_from_index_path(&[0, 0], None, &row), None); + + // other_field + assert_eq!(get_topic_from_index_path(&[1], None, &row), Some("other")); + } +} diff --git a/src/connector/src/source/mqtt/enumerator/mod.rs b/src/connector/src/source/mqtt/enumerator/mod.rs index 1013f31e07d5e..8d4dc636cd00a 100644 --- a/src/connector/src/source/mqtt/enumerator/mod.rs +++ b/src/connector/src/source/mqtt/enumerator/mod.rs @@ -47,7 +47,7 @@ impl SplitEnumerator for MqttSplitEnumerator { ) -> ConnectorResult { let (client, mut eventloop) = properties.common.build_client(context.info.source_id, 0)?; - let topic = properties.common.topic.clone(); + let topic = properties.topic.clone(); let mut topics = HashSet::new(); if !topic.contains('#') && !topic.contains('+') { topics.insert(topic.clone()); @@ -109,7 +109,7 @@ impl SplitEnumerator for MqttSplitEnumerator { Ok(Self { client, topics, - topic: properties.common.topic, + topic: properties.topic, connected, stopped, }) diff --git a/src/connector/src/source/mqtt/mod.rs b/src/connector/src/source/mqtt/mod.rs index 3879c5efd9d36..218ab0d945283 100644 --- a/src/connector/src/source/mqtt/mod.rs +++ b/src/connector/src/source/mqtt/mod.rs @@ -35,6 +35,9 @@ pub struct MqttProperties { #[serde(flatten)] pub common: MqttCommon, + /// The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/# + pub topic: String, + /// The quality of service to use when publishing messages. Defaults to at_most_once. /// Could be at_most_once, at_least_once or exactly_once #[serde_as(as = "Option")] diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 4d776cc1689f8..6d8a469975af1 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -356,10 +356,6 @@ MqttConfig: field_type: String comments: The url of the broker to connect to. e.g. tcp://localhost. Must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, to denote the protocol for establishing a connection with the broker. `mqtts://`, `ssl://` will use the native certificates if no ca is specified required: true - - name: topic - field_type: String - comments: The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/# - required: true - name: qos field_type: QualityOfService comments: The quality of service to use when publishing messages. Defaults to at_most_once. Could be at_most_once, at_least_once or exactly_once @@ -397,6 +393,10 @@ MqttConfig: field_type: String comments: Path to client's private key file (PEM). Required for client authentication. Can be a file path under fs:// or a string with the private key content. required: false + - name: topic + field_type: String + comments: The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/# + required: false - name: retain field_type: bool comments: Whether the message should be retained by the broker @@ -405,6 +405,9 @@ MqttConfig: - name: r#type field_type: String required: true + - name: topic.field + field_type: String + required: false NatsConfig: fields: - name: server_url diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 6fec5e1af51ed..83fdac056d96f 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -261,10 +261,6 @@ MqttProperties: field_type: String comments: The url of the broker to connect to. e.g. tcp://localhost. Must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, to denote the protocol for establishing a connection with the broker. `mqtts://`, `ssl://` will use the native certificates if no ca is specified required: true - - name: topic - field_type: String - comments: The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/# - required: true - name: qos field_type: QualityOfService comments: The quality of service to use when publishing messages. Defaults to at_most_once. Could be at_most_once, at_least_once or exactly_once @@ -302,6 +298,10 @@ MqttProperties: field_type: String comments: Path to client's private key file (PEM). Required for client authentication. Can be a file path under fs:// or a string with the private key content. required: false + - name: topic + field_type: String + comments: The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/# + required: true - name: qos field_type: MqttQualityOfService comments: The quality of service to use when publishing messages. Defaults to at_most_once. Could be at_most_once, at_least_once or exactly_once diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 822ea77ec39f2..f50291f832e12 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -779,9 +779,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], ), MqttSink::SINK_NAME => hashmap!( - Format::Plain => vec![Encode::Json], - Format::Upsert => vec![Encode::Json], - Format::Debezium => vec![Encode::Json], + Format::Plain => vec![Encode::Json, Encode::Protobuf], ), PulsarSink::SINK_NAME => hashmap!( Format::Plain => vec![Encode::Json],