diff --git a/src/connector/src/sink/mqtt.rs b/src/connector/src/sink/mqtt.rs index ab001cee3eb78..6442789b12617 100644 --- a/src/connector/src/sink/mqtt.rs +++ b/src/connector/src/sink/mqtt.rs @@ -178,8 +178,8 @@ impl MqttSinkWriter { .qos .as_ref() .map(|qos| match qos { - QualityOfService::AtLeastOnce => QoS::AtMostOnce, - QualityOfService::AtMostOnce => QoS::AtLeastOnce, + QualityOfService::AtMostOnce => QoS::AtMostOnce, + QualityOfService::AtLeastOnce => QoS::AtLeastOnce, QualityOfService::ExactlyOnce => QoS::ExactlyOnce, }) .unwrap_or(QoS::AtMostOnce); diff --git a/src/connector/src/source/mqtt/enumerator/mod.rs b/src/connector/src/source/mqtt/enumerator/mod.rs index 1a88603cedde2..5cfd952ab0121 100644 --- a/src/connector/src/source/mqtt/enumerator/mod.rs +++ b/src/connector/src/source/mqtt/enumerator/mod.rs @@ -54,7 +54,7 @@ impl SplitEnumerator for MqttSplitEnumerator { } client - .subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtLeastOnce) + .subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtMostOnce) .await?; let cloned_client = client.clone(); @@ -98,7 +98,7 @@ impl SplitEnumerator for MqttSplitEnumerator { ); connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); cloned_client - .subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtLeastOnce) + .subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtMostOnce) .await .unwrap(); } diff --git a/src/connector/src/source/mqtt/source/reader.rs b/src/connector/src/source/mqtt/source/reader.rs index 396f16788adec..21a0e60289a56 100644 --- a/src/connector/src/source/mqtt/source/reader.rs +++ b/src/connector/src/source/mqtt/source/reader.rs @@ -58,8 +58,8 @@ impl SplitReader for MqttSplitReader { .qos .as_ref() .map(|qos| match qos { - QualityOfService::AtLeastOnce => QoS::AtMostOnce, - QualityOfService::AtMostOnce => QoS::AtLeastOnce, + QualityOfService::AtMostOnce => QoS::AtMostOnce, + QualityOfService::AtLeastOnce => QoS::AtLeastOnce, QualityOfService::ExactlyOnce => QoS::ExactlyOnce, }) .unwrap_or(QoS::AtMostOnce);