Skip to content

Commit

Permalink
feat(mqtt): Allow using field as topic name (#15673)
Browse files Browse the repository at this point in the history
  • Loading branch information
bakjos authored Apr 7, 2024
1 parent e811ad7 commit df0aa3a
Show file tree
Hide file tree
Showing 12 changed files with 582 additions and 62 deletions.
14 changes: 14 additions & 0 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ services:
- clickhouse-server
- redis-server
- pulsar-server
- mqtt-server
- cassandra-server
- doris-server
- starrocks-fe-server
Expand Down Expand Up @@ -329,3 +330,16 @@ services:
MONGO_HOST: mongodb
MONGO_PORT: 27017
MONGO_DB_NAME: random_data
mqtt-server:
image: eclipse-mosquitto
command:
- sh
- -c
- echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf
ports:
- 1883:1883
healthcheck:
test: ["CMD-SHELL", "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0"]
interval: 10s
timeout: 10s
retries: 6
35 changes: 35 additions & 0 deletions ci/scripts/e2e-mqtt-sink-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash

source ci/scripts/common.sh

while getopts 'p:' opt; do
case ${opt} in
p )
profile=$OPTARG
;;
\? )
echo "Invalid Option: -$OPTARG" 1>&2
exit 1
;;
: )
echo "Invalid option: $OPTARG requires an argument" 1>&2
;;
esac
done
shift $((OPTIND -1))

download_and_prepare_rw "$profile" source

echo "--- starting risingwave cluster"
cargo make ci-start ci-sink-test
sleep 1

set -euo pipefail

echo "--- testing mqtt sink"
sqllogictest -p 4566 -d dev './e2e_test/sink/mqtt_sink.slt'

sleep 1

echo "--- Kill cluster"
cargo make ci-kill
19 changes: 19 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,25 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end mqtt sink test"
key: "e2e-mqtt-sink-tests"
command: "ci/scripts/e2e-mqtt-sink-test.sh -p ci-release"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-mqtt-sink-tests"
|| build.env("CI_STEPS") =~ /(^|,)e2e-mqtt-sink-tests?(,|$$)/
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v5.1.0:
run: sink-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "connector node integration test Java {{matrix.java_version}}"
key: "connector-node-integration-test"
command: "ci/scripts/connector-node-integration-test.sh -p ci-release -v {{matrix.java_version}}"
Expand Down
15 changes: 15 additions & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,21 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end mqtt sink test"
if: build.pull_request.labels includes "ci/run-e2e-mqtt-sink-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-mqtt-sink-tests?(,|$$)/
command: "ci/scripts/e2e-mqtt-sink-test.sh -p ci-dev"
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v5.1.0:
run: sink-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end clickhouse sink test"
if: build.pull_request.labels includes "ci/run-e2e-clickhouse-sink-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-clickhouse-sink-tests?(,|$$)/
command: "ci/scripts/e2e-clickhouse-sink-test.sh -p ci-dev"
Expand Down
115 changes: 115 additions & 0 deletions e2e_test/sink/mqtt_sink.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
statement ok
CREATE TABLE mqtt (
device_id varchar,
temperature double,
topic varchar as '/device/' || device_id
);

statement ok
CREATE TABLE mqtt_nested (
info struct<device_id varchar, topic varchar>,
temperature double
);

statement ok
CREATE SINK mqtt_sink
FROM
mqtt
WITH
(
connector='mqtt',
url='tcp://mqtt-server',
type = 'append-only',
topic.field = 'topic',
retain = 'true',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON (
force_append_only='true',
);

statement ok
CREATE SINK mqtt_nested_sink
FROM
mqtt_nested
WITH
(
connector='mqtt',
url='tcp://mqtt-server',
type = 'append-only',
topic = '/nested/fallback',
topic.field = 'info.topic',
retain = 'true',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON (
force_append_only='true',
);


statement ok
CREATE TABLE mqtt_source
(
device_id varchar,
temperature double
)
WITH (
connector='mqtt',
url='tcp://mqtt-server',
topic= '/device/+',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE mqtt_nested_source
(
info struct<device_id varchar, topic varchar>,
temperature double
)
WITH (
connector='mqtt',
url='tcp://mqtt-server',
topic= '/nested/fallback',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON;


statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '12', 56.0 );

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '12', 59.0 );

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '13', 20.0 );

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '13', 22.0 );

statement ok
INSERT INTO mqtt_nested (info, temperature)
VALUES( ROW('12', '/nested/12'), 56.0 );

statement ok
INSERT INTO mqtt_nested (info, temperature)
VALUES( ROW('13', null), 22.0 );

statement ok
FLUSH;

sleep 15s

query IT rowsort
SELECT device_id, temperature FROM mqtt ORDER BY device_id, temperature;
----
12 56
12 59
13 20
13 22

query IT rowsort
SELECT (info).device_id device_id, temperature from mqtt_nested_source ORDER BY device_id, temperature ;
----
13 22
3 changes: 0 additions & 3 deletions src/connector/src/connector_common/mqtt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ pub struct MqttCommon {
/// `mqtts://`, `ssl://` will use the native certificates if no ca is specified
pub url: String,

/// The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/#
pub topic: String,

/// The quality of service to use when publishing messages. Defaults to at_most_once.
/// Could be at_most_once, at_least_once or exactly_once
#[serde_as(as = "Option<DisplayFromStr>")]
Expand Down
Loading

0 comments on commit df0aa3a

Please sign in to comment.