From 5a5544d00b05cc27f7b4aea8253dc1562d072f5d Mon Sep 17 00:00:00 2001 From: Gio Gutierrez Date: Sun, 3 Mar 2024 11:30:44 -0500 Subject: [PATCH] chore: Fix dylint errors --- integration_tests/mqtt/create_source.sql | 8 ++++---- src/connector/src/common.rs | 2 +- src/connector/src/sink/mqtt.rs | 11 +++++++++-- src/connector/src/source/mqtt/enumerator/mod.rs | 3 ++- src/connector/src/source/mqtt/source/reader.rs | 3 ++- src/connector/with_options_sink.yaml | 13 ++++++++++++- src/connector/with_options_source.yaml | 3 ++- 7 files changed, 32 insertions(+), 11 deletions(-) diff --git a/integration_tests/mqtt/create_source.sql b/integration_tests/mqtt/create_source.sql index a586ee0966860..925082841b3e5 100644 --- a/integration_tests/mqtt/create_source.sql +++ b/integration_tests/mqtt/create_source.sql @@ -9,7 +9,8 @@ CREATE TABLE mqtt_source_table WITH ( connector='mqtt', host='mqtt-server', - topic= 'test' + topic= 'test', + qos = '1' ) FORMAT PLAIN ENCODE JSON; @@ -23,7 +24,7 @@ WITH topic= 'test', type = 'append-only', force_append_only='true', - retain = 'true', + retain = 'false', qos = '1' ); @@ -39,5 +40,4 @@ VALUES (7, 'Posey'), (8, 'Waverly'); - -FLUSH; \ No newline at end of file +FLUSH; diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index a9f296b414aa4..5fa81c33fbb0e 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -778,7 +778,7 @@ impl MqttCommon { options.set_credentials(user, self.password.as_deref().unwrap_or_default()); } - Ok(rumqttc::v5::AsyncClient::new(options, 10)) + Ok(rumqttc::v5::AsyncClient::new(options, 100)) } } diff --git a/src/connector/src/sink/mqtt.rs b/src/connector/src/sink/mqtt.rs index 031e3c77d3819..d55f16000e013 100644 --- a/src/connector/src/sink/mqtt.rs +++ b/src/connector/src/sink/mqtt.rs @@ -23,6 +23,7 @@ use rumqttc::v5::mqttbytes::QoS; use rumqttc::v5::ConnectionError; use serde_derive::Deserialize; use serde_with::serde_as; +use thiserror_ext::AsReport; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; use with_options::WithOptions; @@ -171,11 +172,17 @@ impl MqttSinkWriter { if let ConnectionError::MqttState(rumqttc::v5::StateError::Io(err)) = err { if err.kind() != std::io::ErrorKind::ConnectionAborted { - tracing::error!("[Sink] Failed to poll mqtt eventloop: {}", err); + tracing::error!( + "[Sink] Failed to poll mqtt eventloop: {}", + err.as_report() + ); std::thread::sleep(std::time::Duration::from_secs(1)); } } else { - tracing::error!("[Sink] Failed to poll mqtt eventloop: {}", err); + tracing::error!( + "[Sink] Failed to poll mqtt eventloop: {}", + err.as_report() + ); std::thread::sleep(std::time::Duration::from_secs(1)); } } diff --git a/src/connector/src/source/mqtt/enumerator/mod.rs b/src/connector/src/source/mqtt/enumerator/mod.rs index 98f3fcec498bb..1a88603cedde2 100644 --- a/src/connector/src/source/mqtt/enumerator/mod.rs +++ b/src/connector/src/source/mqtt/enumerator/mod.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use risingwave_common::bail; use rumqttc::v5::{ConnectionError, Event, Incoming}; use rumqttc::Outgoing; +use thiserror_ext::AsReport; use tokio::sync::RwLock; use super::source::MqttSplit; @@ -93,7 +94,7 @@ impl SplitEnumerator for MqttSplitEnumerator { tracing::error!( "[Enumerator] Failed to subscribe to topic {}: {}", topic, - err + err.as_report(), ); connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); cloned_client diff --git a/src/connector/src/source/mqtt/source/reader.rs b/src/connector/src/source/mqtt/source/reader.rs index 6872b887ff8ca..41e2b45e8913d 100644 --- a/src/connector/src/source/mqtt/source/reader.rs +++ b/src/connector/src/source/mqtt/source/reader.rs @@ -18,6 +18,7 @@ use risingwave_common::bail; use rumqttc::v5::mqttbytes::v5::Filter; use rumqttc::v5::mqttbytes::QoS; use rumqttc::v5::{ConnectionError, Event, Incoming}; +use thiserror_ext::AsReport; use super::message::MqttMessage; use super::MqttSplit; @@ -105,7 +106,7 @@ impl CommonSplitReader for MqttSplitReader { if let ConnectionError::Timeout(_) = e { continue; } - tracing::error!("[Reader] Failed to poll mqtt eventloop: {}", e); + tracing::error!("[Reader] Failed to poll mqtt eventloop: {}", e.as_report()); client .subscribe_many( splits diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 1c427c6bb6955..a1e516772d710 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -346,7 +346,7 @@ KinesisSinkConfig: field_type: String required: false alias: kinesis.assumerole.external_id -MqttCommon: +MqttConfig: fields: - name: protocol field_type: String @@ -379,6 +379,17 @@ MqttCommon: - name: tls.client_key field_type: String required: false + - name: qos + field_type: u32 + required: false + default: Default::default + - name: retain + field_type: bool + required: false + default: Default::default + - name: r#type + field_type: String + required: true NatsConfig: fields: - name: server_url diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index d959a46afdb34..3cd54ed1b3cfb 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -277,8 +277,9 @@ MqttProperties: field_type: String required: false - name: qos - field_type: i32 + field_type: u32 required: false + default: Default::default NatsProperties: fields: - name: server_url