diff --git a/e2e_test/sink/mqtt_sink.slt b/e2e_test/sink/mqtt_sink.slt index 2602d2ddc6198..16af3e481e9fb 100644 --- a/e2e_test/sink/mqtt_sink.slt +++ b/e2e_test/sink/mqtt_sink.slt @@ -47,6 +47,34 @@ WITH force_append_only='true', ); +# First the (retained) topics are primed, so that they will be listened +# to when the mqtt source initializes. Otherwise it would take 30 seconds +# for the next enumerator tick + +statement ok +INSERT INTO mqtt (device_id, temperature) +VALUES ( '12', 56.0 ); + +statement ok +FLUSH; + +statement ok +INSERT INTO mqtt (device_id, temperature) +VALUES ( '13', 20.0 ); + +statement ok +FLUSH; + +statement ok +INSERT INTO mqtt_nested (info, temperature) +VALUES( ROW('12', '/nested/12'), 56.0 ); + +statement ok +FLUSH; + +statement ok +INSERT INTO mqtt_nested (info, temperature) +VALUES( ROW('13', null), 22.0 ); statement ok CREATE TABLE mqtt_source @@ -54,6 +82,7 @@ CREATE TABLE mqtt_source device_id varchar, temperature double ) +INCLUDE topic AS mqtt_topic WITH ( connector='mqtt', url='tcp://mqtt-server', @@ -75,29 +104,23 @@ WITH ( ) FORMAT PLAIN ENCODE JSON; -statement ok -INSERT INTO mqtt (device_id, temperature) -VALUES ( '12', 56.0 ); - statement ok INSERT INTO mqtt (device_id, temperature) VALUES ( '12', 59.0 ); statement ok -INSERT INTO mqtt (device_id, temperature) -VALUES ( '13', 20.0 ); +FLUSH; statement ok INSERT INTO mqtt (device_id, temperature) VALUES ( '13', 22.0 ); statement ok -INSERT INTO mqtt_nested (info, temperature) -VALUES( ROW('12', '/nested/12'), 56.0 ); +FLUSH; statement ok INSERT INTO mqtt_nested (info, temperature) -VALUES( ROW('13', null), 22.0 ); +VALUES( ROW('12', '/nested/12'), 56.0 ); statement ok FLUSH; @@ -112,6 +135,14 @@ SELECT device_id, temperature FROM mqtt ORDER BY device_id, temperature; 13 20 13 22 +query ITT rowsort +SELECT device_id, temperature, mqtt_topic FROM mqtt_source ORDER BY device_id, temperature; +---- +12 56 /device/12 +12 59 /device/12 +13 20 /device/13 +13 22 /device/13 + query IT rowsort SELECT (info).device_id device_id, temperature from mqtt_nested_source ORDER BY device_id, temperature ; ---- diff --git a/integration_tests/mqtt/create_source.sql b/integration_tests/mqtt/create_source.sql index 7ebceaa706bcc..bf43ba2d75083 100644 --- a/integration_tests/mqtt/create_source.sql +++ b/integration_tests/mqtt/create_source.sql @@ -4,7 +4,7 @@ CREATE TABLE CREATE TABLE mqtt_source_table ( id integer, - name varchar, + name varchar ) WITH ( connector='mqtt', diff --git a/proto/plan_common.proto b/proto/plan_common.proto index f561ee427ea46..26cd8c5ce3497 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -264,6 +264,8 @@ message AdditionalCollectionName {} message AdditionalColumnPayload {} +message AdditionalColumnTopic {} + // this type means we read all headers as a whole message AdditionalColumnHeaders {} @@ -281,6 +283,7 @@ message AdditionalColumn { AdditionalTableName table_name = 10; AdditionalCollectionName collection_name = 11; AdditionalColumnPayload payload = 12; + AdditionalColumnTopic topic = 13; } } @@ -294,4 +297,5 @@ enum AdditionalColumnType { ADDITIONAL_COLUMN_TYPE_FILENAME = 6; ADDITIONAL_COLUMN_TYPE_NORMAL = 7; ADDITIONAL_COLUMN_TYPE_PAYLOAD = 8; + ADDITIONAL_COLUMN_TYPE_TOPIC = 9; } diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 5c91294afa0a7..28e4aba38738a 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -23,14 +23,14 @@ use risingwave_pb::plan_common::{ AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset, AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnTimestamp, - AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName, + AdditionalColumnTopic, AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName, }; use crate::error::ConnectorResult; use crate::source::cdc::MONGODB_CDC_CONNECTOR; use crate::source::{ - AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, - OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, + AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR, + NATS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, }; // Hidden additional columns connectors which do not support `include` syntax. @@ -87,6 +87,10 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Topic(AdditionalColumnTopic {})), + }, + ), _ => unreachable!(), }; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 2142914aa2503..40993f79c49f7 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -494,6 +494,14 @@ impl SourceStreamChunkRowWriter<'_> { // do special logic in `KvEvent::access_field` parse_field(desc) } + (_, &Some(AdditionalColumnType::Topic(_))) => { + // topic is used as partition in mqtt connector + return Ok(A::output_for( + self.row_meta + .as_ref() + .map(|ele| ScalarRefImpl::Utf8(ele.split_id)), + )); + } (_, _) => { // For normal columns, call the user provided closure. parse_field(desc) diff --git a/src/connector/src/source/mqtt/enumerator/mod.rs b/src/connector/src/source/mqtt/enumerator/mod.rs index 676aba2a55ad8..f2b949fca17b6 100644 --- a/src/connector/src/source/mqtt/enumerator/mod.rs +++ b/src/connector/src/source/mqtt/enumerator/mod.rs @@ -17,17 +17,19 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use async_trait::async_trait; +use risingwave_common::bail; use rumqttc::v5::{ConnectionError, Event, Incoming}; use rumqttc::Outgoing; use thiserror_ext::AsReport; use tokio::sync::RwLock; use super::source::MqttSplit; -use super::{MqttError, MqttProperties}; -use crate::error::{ConnectorError, ConnectorResult}; +use super::MqttProperties; +use crate::error::ConnectorResult; use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; pub struct MqttSplitEnumerator { + #[expect(dead_code)] topic: String, #[expect(dead_code)] client: rumqttc::v5::AsyncClient, @@ -117,10 +119,18 @@ impl SplitEnumerator for MqttSplitEnumerator { async fn list_splits(&mut self) -> ConnectorResult> { if !self.connected.load(std::sync::atomic::Ordering::Relaxed) { - return Err(ConnectorError::from(MqttError(format!( - "Failed to connect to MQTT broker for topic {}", - self.topic - )))); + let start = std::time::Instant::now(); + loop { + if self.connected.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + + if start.elapsed().as_secs() > 10 { + bail!("Failed to connect to mqtt broker"); + } + + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } } let topics = self.topics.read().await; diff --git a/src/connector/src/source/mqtt/source/reader.rs b/src/connector/src/source/mqtt/source/reader.rs index 2c57c8b9966b1..f4ab79bfffed8 100644 --- a/src/connector/src/source/mqtt/source/reader.rs +++ b/src/connector/src/source/mqtt/source/reader.rs @@ -52,7 +52,8 @@ impl SplitReader for MqttSplitReader { ) -> Result { let (client, eventloop) = properties .common - .build_client(source_ctx.actor_id, source_ctx.fragment_id as u64)?; + .build_client(source_ctx.actor_id, source_ctx.fragment_id as u64) + .inspect_err(|e| tracing::error!("Failed to build mqtt client: {}", e.as_report()))?; let qos = properties.common.qos(); diff --git a/src/prost/build.rs b/src/prost/build.rs index c4744e14c1b60..6cc8615600bd2 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -166,6 +166,7 @@ fn main() -> Result<(), Box> { "plan_common.AdditionalCollectionName", "#[derive(Eq, Hash)]", ) + .type_attribute("plan_common.AdditionalColumnTopic", "#[derive(Eq, Hash)]") .type_attribute("plan_common.AsOfJoinDesc", "#[derive(Eq, Hash)]") .type_attribute("common.ColumnOrder", "#[derive(Eq, Hash)]") .type_attribute("common.OrderType", "#[derive(Eq, Hash)]") diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index 9d0378f89c661..b1363e32dadfc 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -85,6 +85,17 @@ topic = 'dummy') FORMAT plain ENCODE bytes; formatted_sql: CREATE TABLE t (raw BYTEA) INCLUDE header AS all_headers INCLUDE header 'foo' AS foo_bytea INCLUDE header 'foo' CHARACTER VARYING AS foo_str INCLUDE header INCLUDE header 'foo' INCLUDE header 'foo' CHARACTER VARYING INCLUDE header 'foo' BYTEA INCLUDE header 'bar' WITH (connector = 'kafka', kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'dummy') FORMAT PLAIN ENCODE BYTES +- input: |- + CREATE TABLE t + (raw BYTEA) + INCLUDE topic + WITH ( + connector = 'mqtt', + url = 'tcp://mqtt-server', + topic = 'test', + qos = 'at_least_once') + FORMAT plain ENCODE bytes; + formatted_sql: CREATE TABLE t (raw BYTEA) INCLUDE topic WITH (connector = 'mqtt', url = 'tcp://mqtt-server', topic = 'test', qos = 'at_least_once') FORMAT PLAIN ENCODE BYTES - input: CREATE TABLE T (v1 INT, v2 STRUCT) formatted_sql: CREATE TABLE T (v1 INT, v2 STRUCT) - input: CREATE TABLE T (v1 INT, v2 STRUCT>)