Skip to content

Commit

Permalink
chore: Fix dylint errors
Browse files Browse the repository at this point in the history
  • Loading branch information
bakjos committed Mar 3, 2024
1 parent 2788587 commit 0d1861c
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 8 deletions.
8 changes: 4 additions & 4 deletions integration_tests/mqtt/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ CREATE TABLE mqtt_source_table
WITH (
connector='mqtt',
host='mqtt-server',
topic= 'test'
topic= 'test',
qos = '1'
) FORMAT PLAIN ENCODE JSON;


Expand All @@ -23,7 +24,7 @@ WITH
topic= 'test',
type = 'append-only',
force_append_only='true',
retain = 'true',
retain = 'false',
qos = '1'
);

Expand All @@ -39,5 +40,4 @@ VALUES
(7, 'Posey'),
(8, 'Waverly');


FLUSH;
FLUSH;
5 changes: 3 additions & 2 deletions src/connector/src/sink/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use rumqttc::v5::mqttbytes::QoS;
use rumqttc::v5::ConnectionError;
use serde_derive::Deserialize;
use serde_with::serde_as;
use thiserror_ext::AsReport;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
use with_options::WithOptions;
Expand Down Expand Up @@ -171,11 +172,11 @@ impl MqttSinkWriter {

if let ConnectionError::MqttState(rumqttc::v5::StateError::Io(err)) = err {
if err.kind() != std::io::ErrorKind::ConnectionAborted {
tracing::error!("[Sink] Failed to poll mqtt eventloop: {}", err);
tracing::error!("[Sink] Failed to poll mqtt eventloop: {}", err.as_report());
std::thread::sleep(std::time::Duration::from_secs(1));
}
} else {
tracing::error!("[Sink] Failed to poll mqtt eventloop: {}", err);
tracing::error!("[Sink] Failed to poll mqtt eventloop: {}", err.as_report());
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/mqtt/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use async_trait::async_trait;
use risingwave_common::bail;
use rumqttc::v5::{ConnectionError, Event, Incoming};
use rumqttc::Outgoing;
use thiserror_ext::AsReport;
use tokio::sync::RwLock;

use super::source::MqttSplit;
Expand Down Expand Up @@ -93,7 +94,7 @@ impl SplitEnumerator for MqttSplitEnumerator {
tracing::error!(
"[Enumerator] Failed to subscribe to topic {}: {}",
topic,
err
err.as_report(),
);
connected_clone.store(false, std::sync::atomic::Ordering::Relaxed);
cloned_client
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/mqtt/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_common::bail;
use rumqttc::v5::mqttbytes::v5::Filter;
use rumqttc::v5::mqttbytes::QoS;
use rumqttc::v5::{ConnectionError, Event, Incoming};
use thiserror_ext::AsReport;

use super::message::MqttMessage;
use super::MqttSplit;
Expand Down Expand Up @@ -105,7 +106,7 @@ impl CommonSplitReader for MqttSplitReader {
if let ConnectionError::Timeout(_) = e {
continue;
}
tracing::error!("[Reader] Failed to poll mqtt eventloop: {}", e);
tracing::error!("[Reader] Failed to poll mqtt eventloop: {}", e.as_report());
client
.subscribe_many(
splits
Expand Down

0 comments on commit 0d1861c

Please sign in to comment.