From fb3daeef000c114660efaf853c8e184e0c871ea6 Mon Sep 17 00:00:00 2001 From: Gio Gutierrez Date: Tue, 5 Mar 2024 10:15:39 -0500 Subject: [PATCH] fix: QoS conversion --- src/connector/src/sink/mqtt.rs | 4 ++-- src/connector/src/source/mqtt/enumerator/mod.rs | 4 ++-- src/connector/src/source/mqtt/source/reader.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/connector/src/sink/mqtt.rs b/src/connector/src/sink/mqtt.rs index ab001cee3eb78..6442789b12617 100644 --- a/src/connector/src/sink/mqtt.rs +++ b/src/connector/src/sink/mqtt.rs @@ -178,8 +178,8 @@ impl MqttSinkWriter { .qos .as_ref() .map(|qos| match qos { - QualityOfService::AtLeastOnce => QoS::AtMostOnce, - QualityOfService::AtMostOnce => QoS::AtLeastOnce, + QualityOfService::AtMostOnce => QoS::AtMostOnce, + QualityOfService::AtLeastOnce => QoS::AtLeastOnce, QualityOfService::ExactlyOnce => QoS::ExactlyOnce, }) .unwrap_or(QoS::AtMostOnce); diff --git a/src/connector/src/source/mqtt/enumerator/mod.rs b/src/connector/src/source/mqtt/enumerator/mod.rs index 1a88603cedde2..5cfd952ab0121 100644 --- a/src/connector/src/source/mqtt/enumerator/mod.rs +++ b/src/connector/src/source/mqtt/enumerator/mod.rs @@ -54,7 +54,7 @@ impl SplitEnumerator for MqttSplitEnumerator { } client - .subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtLeastOnce) + .subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtMostOnce) .await?; let cloned_client = client.clone(); @@ -98,7 +98,7 @@ impl SplitEnumerator for MqttSplitEnumerator { ); connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); cloned_client - .subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtLeastOnce) + .subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtMostOnce) .await .unwrap(); } diff --git a/src/connector/src/source/mqtt/source/reader.rs b/src/connector/src/source/mqtt/source/reader.rs index 396f16788adec..21a0e60289a56 100644 --- a/src/connector/src/source/mqtt/source/reader.rs +++ b/src/connector/src/source/mqtt/source/reader.rs @@ -58,8 +58,8 @@ impl SplitReader for MqttSplitReader { .qos .as_ref() .map(|qos| match qos { - QualityOfService::AtLeastOnce => QoS::AtMostOnce, - QualityOfService::AtMostOnce => QoS::AtLeastOnce, + QualityOfService::AtMostOnce => QoS::AtMostOnce, + QualityOfService::AtLeastOnce => QoS::AtLeastOnce, QualityOfService::ExactlyOnce => QoS::ExactlyOnce, }) .unwrap_or(QoS::AtMostOnce);