Skip to content

Commit

Permalink
feat(connector): Add topic to mqtt additional columns
Browse files Browse the repository at this point in the history
  • Loading branch information
Boudewijn26 committed Oct 21, 2024
1 parent c0d6af1 commit 31ab0a5
Showing 9 changed files with 98 additions and 20 deletions.
49 changes: 40 additions & 9 deletions e2e_test/sink/mqtt_sink.slt
Original file line number Diff line number Diff line change
@@ -47,13 +47,42 @@ WITH
force_append_only='true',
);

# First the (retained) topics are primed, so that they will be listened
# to when the mqtt source initializes. Otherwise it would take 30 seconds
# for the next enumerator tick

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '12', 56.0 );

statement ok
FLUSH;

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '13', 20.0 );

statement ok
FLUSH;

statement ok
INSERT INTO mqtt_nested (info, temperature)
VALUES( ROW('12', '/nested/12'), 56.0 );

statement ok
FLUSH;

statement ok
INSERT INTO mqtt_nested (info, temperature)
VALUES( ROW('13', null), 22.0 );

statement ok
CREATE TABLE mqtt_source
(
device_id varchar,
temperature double
)
INCLUDE topic AS mqtt_topic
WITH (
connector='mqtt',
url='tcp://mqtt-server',
@@ -75,29 +104,23 @@ WITH (
) FORMAT PLAIN ENCODE JSON;


statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '12', 56.0 );

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '12', 59.0 );

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '13', 20.0 );
FLUSH;

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '13', 22.0 );

statement ok
INSERT INTO mqtt_nested (info, temperature)
VALUES( ROW('12', '/nested/12'), 56.0 );
FLUSH;

statement ok
INSERT INTO mqtt_nested (info, temperature)
VALUES( ROW('13', null), 22.0 );
VALUES( ROW('12', '/nested/12'), 56.0 );

statement ok
FLUSH;
@@ -112,6 +135,14 @@ SELECT device_id, temperature FROM mqtt ORDER BY device_id, temperature;
13 20
13 22

query ITT rowsort
SELECT device_id, temperature, mqtt_topic FROM mqtt_source ORDER BY device_id, temperature;
----
12 56 /device/12
12 59 /device/12
13 20 /device/13
13 22 /device/13

query IT rowsort
SELECT (info).device_id device_id, temperature from mqtt_nested_source ORDER BY device_id, temperature ;
----
2 changes: 1 addition & 1 deletion integration_tests/mqtt/create_source.sql
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ CREATE TABLE
CREATE TABLE mqtt_source_table
(
id integer,
name varchar,
name varchar
)
WITH (
connector='mqtt',
4 changes: 4 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
@@ -264,6 +264,8 @@ message AdditionalCollectionName {}

message AdditionalColumnPayload {}

message AdditionalColumnTopic {}

// this type means we read all headers as a whole
message AdditionalColumnHeaders {}

@@ -281,6 +283,7 @@ message AdditionalColumn {
AdditionalTableName table_name = 10;
AdditionalCollectionName collection_name = 11;
AdditionalColumnPayload payload = 12;
AdditionalColumnTopic topic = 13;
}
}

@@ -294,4 +297,5 @@ enum AdditionalColumnType {
ADDITIONAL_COLUMN_TYPE_FILENAME = 6;
ADDITIONAL_COLUMN_TYPE_NORMAL = 7;
ADDITIONAL_COLUMN_TYPE_PAYLOAD = 8;
ADDITIONAL_COLUMN_TYPE_TOPIC = 9;
}
18 changes: 15 additions & 3 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
@@ -23,14 +23,14 @@ use risingwave_pb::plan_common::{
AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnTimestamp,
AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName,
AdditionalColumnTopic, AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName,
};

use crate::error::ConnectorResult;
use crate::source::cdc::MONGODB_CDC_CONNECTOR;
use crate::source::{
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR,
OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR,
NATS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
};

// Hidden additional columns connectors which do not support `include` syntax.
@@ -87,6 +87,10 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
"collection_name",
]),
),
(
MQTT_CONNECTOR,
HashSet::from(["topic", "offset", "partition"]),
),
])
});

@@ -266,6 +270,14 @@ pub fn build_additional_column_desc(
)),
},
),
"topic" => ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::Topic(AdditionalColumnTopic {})),
},
),
_ => unreachable!(),
};

8 changes: 8 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
@@ -494,6 +494,14 @@ impl SourceStreamChunkRowWriter<'_> {
// do special logic in `KvEvent::access_field`
parse_field(desc)
}
(_, &Some(AdditionalColumnType::Topic(_))) => {
// topic is used as partition in mqtt connector
return Ok(A::output_for(
self.row_meta
.as_ref()
.map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
));
}
(_, _) => {
// For normal columns, call the user provided closure.
parse_field(desc)
22 changes: 16 additions & 6 deletions src/connector/src/source/mqtt/enumerator/mod.rs
Original file line number Diff line number Diff line change
@@ -17,17 +17,19 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use async_trait::async_trait;
use risingwave_common::bail;
use rumqttc::v5::{ConnectionError, Event, Incoming};
use rumqttc::Outgoing;
use thiserror_ext::AsReport;
use tokio::sync::RwLock;

use super::source::MqttSplit;
use super::{MqttError, MqttProperties};
use crate::error::{ConnectorError, ConnectorResult};
use super::MqttProperties;
use crate::error::ConnectorResult;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

pub struct MqttSplitEnumerator {
#[expect(dead_code)]
topic: String,
#[expect(dead_code)]
client: rumqttc::v5::AsyncClient,
@@ -117,10 +119,18 @@ impl SplitEnumerator for MqttSplitEnumerator {

async fn list_splits(&mut self) -> ConnectorResult<Vec<MqttSplit>> {
if !self.connected.load(std::sync::atomic::Ordering::Relaxed) {
return Err(ConnectorError::from(MqttError(format!(
"Failed to connect to MQTT broker for topic {}",
self.topic
))));
let start = std::time::Instant::now();
loop {
if self.connected.load(std::sync::atomic::Ordering::Relaxed) {
break;
}

if start.elapsed().as_secs() > 10 {
bail!("Failed to connect to mqtt broker");
}

tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}

let topics = self.topics.read().await;
3 changes: 2 additions & 1 deletion src/connector/src/source/mqtt/source/reader.rs
Original file line number Diff line number Diff line change
@@ -52,7 +52,8 @@ impl SplitReader for MqttSplitReader {
) -> Result<Self> {
let (client, eventloop) = properties
.common
.build_client(source_ctx.actor_id, source_ctx.fragment_id as u64)?;
.build_client(source_ctx.actor_id, source_ctx.fragment_id as u64)
.inspect_err(|e| tracing::error!("Failed to build mqtt client: {}", e.as_report()))?;

let qos = properties.common.qos();

1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
@@ -166,6 +166,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"plan_common.AdditionalCollectionName",
"#[derive(Eq, Hash)]",
)
.type_attribute("plan_common.AdditionalColumnTopic", "#[derive(Eq, Hash)]")
.type_attribute("plan_common.AsOfJoinDesc", "#[derive(Eq, Hash)]")
.type_attribute("common.ColumnOrder", "#[derive(Eq, Hash)]")
.type_attribute("common.OrderType", "#[derive(Eq, Hash)]")
11 changes: 11 additions & 0 deletions src/sqlparser/tests/testdata/create.yaml
Original file line number Diff line number Diff line change
@@ -85,6 +85,17 @@
topic = 'dummy')
FORMAT plain ENCODE bytes;
formatted_sql: CREATE TABLE t (raw BYTEA) INCLUDE header AS all_headers INCLUDE header 'foo' AS foo_bytea INCLUDE header 'foo' CHARACTER VARYING AS foo_str INCLUDE header INCLUDE header 'foo' INCLUDE header 'foo' CHARACTER VARYING INCLUDE header 'foo' BYTEA INCLUDE header 'bar' WITH (connector = 'kafka', kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'dummy') FORMAT PLAIN ENCODE BYTES
- input: |-
CREATE TABLE t
(raw BYTEA)
INCLUDE topic
WITH (
connector = 'mqtt',
url = 'tcp://mqtt-server',
topic = 'test',
qos = 'at_least_once')
FORMAT plain ENCODE bytes;
formatted_sql: CREATE TABLE t (raw BYTEA) INCLUDE topic WITH (connector = 'mqtt', url = 'tcp://mqtt-server', topic = 'test', qos = 'at_least_once') FORMAT PLAIN ENCODE BYTES
- input: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT>)
formatted_sql: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT>)
- input: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT, v3 STRUCT<v1 INT, v2 INT>>)

0 comments on commit 31ab0a5

Please sign in to comment.