Skip to content

Commit

Permalink
fix: QoS conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
bakjos committed Mar 5, 2024
1 parent b8c2fc7 commit fb3daee
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/connector/src/sink/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ impl MqttSinkWriter {
.qos
.as_ref()
.map(|qos| match qos {
QualityOfService::AtLeastOnce => QoS::AtMostOnce,
QualityOfService::AtMostOnce => QoS::AtLeastOnce,
QualityOfService::AtMostOnce => QoS::AtMostOnce,
QualityOfService::AtLeastOnce => QoS::AtLeastOnce,
QualityOfService::ExactlyOnce => QoS::ExactlyOnce,
})
.unwrap_or(QoS::AtMostOnce);
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/mqtt/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl SplitEnumerator for MqttSplitEnumerator {
}

client
.subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtLeastOnce)
.subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtMostOnce)
.await?;

let cloned_client = client.clone();
Expand Down Expand Up @@ -98,7 +98,7 @@ impl SplitEnumerator for MqttSplitEnumerator {
);
connected_clone.store(false, std::sync::atomic::Ordering::Relaxed);
cloned_client
.subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtLeastOnce)
.subscribe(topic.clone(), rumqttc::v5::mqttbytes::QoS::AtMostOnce)
.await
.unwrap();
}
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/mqtt/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ impl SplitReader for MqttSplitReader {
.qos
.as_ref()
.map(|qos| match qos {
QualityOfService::AtLeastOnce => QoS::AtMostOnce,
QualityOfService::AtMostOnce => QoS::AtLeastOnce,
QualityOfService::AtMostOnce => QoS::AtMostOnce,
QualityOfService::AtLeastOnce => QoS::AtLeastOnce,
QualityOfService::ExactlyOnce => QoS::ExactlyOnce,
})
.unwrap_or(QoS::AtMostOnce);
Expand Down

0 comments on commit fb3daee

Please sign in to comment.