From f421c86bff3d5e2346fe1fcb7b4c5dfd7d0d912d Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Mon, 15 Jul 2024 09:18:43 +1000 Subject: [PATCH] Initial support for client unsubscribe. --- client/lib/include/cc_mqttsn_client/common.h | 1 - client/lib/src/TopicFilterDefs.h | 12 ++- client/lib/src/op/SubscribeOp.cpp | 53 +++++++---- client/lib/src/op/UnsubscribeOp.cpp | 97 +++++++++++++++++++- client/lib/src/op/UnsubscribeOp.h | 1 + client/lib/test/UnitTestCommonBase.cpp | 1 - client/lib/test/UnitTestCommonBase.h | 1 - client/lib/test/UnitTestSubscribe.th | 7 -- 8 files changed, 142 insertions(+), 31 deletions(-) diff --git a/client/lib/include/cc_mqttsn_client/common.h b/client/lib/include/cc_mqttsn_client/common.h index 73ceba1..2461e9b 100644 --- a/client/lib/include/cc_mqttsn_client/common.h +++ b/client/lib/include/cc_mqttsn_client/common.h @@ -241,7 +241,6 @@ typedef struct typedef struct { CC_MqttsnReturnCode m_returnCode; ///< Return code reported by the @b SUBACK message - CC_MqttsnTopicId m_topicId; ///< Granted topic ID (if applicable). CC_MqttsnQoS m_qos; ///< Granted max QoS value } CC_MqttsnSubscribeInfo; diff --git a/client/lib/src/TopicFilterDefs.h b/client/lib/src/TopicFilterDefs.h index 75f2ce8..225c5ce 100644 --- a/client/lib/src/TopicFilterDefs.h +++ b/client/lib/src/TopicFilterDefs.h @@ -18,14 +18,20 @@ namespace cc_mqttsn_client { using TopicNameStr = SubscribeMsg::Field_topicName::Field::ValueType; -using SubFiltersMap = ObjListType; struct RegTopicInfo { TopicNameStr m_topic; - CC_MqttsnTopicId m_topicId = 0U; // key + CC_MqttsnTopicId m_topicId = 0U; + + template + RegTopicInfo(T&& topic, CC_MqttsnTopicId topicId) : m_topic(std::forward(topic)), m_topicId(topicId) {} + + RegTopicInfo(const char* topic) : m_topic(topic) {} + RegTopicInfo(CC_MqttsnTopicId topicId) : m_topicId(topicId) {} }; -using InRegTopicsMap = ObjListType; +using SubFiltersMap = ObjListType; // key is m_topic +using InRegTopicsMap = ObjListType; // key is m_topicId; } // namespace cc_mqttsn_client diff --git a/client/lib/src/op/SubscribeOp.cpp b/client/lib/src/op/SubscribeOp.cpp index 1d65f66..2643547 100644 --- a/client/lib/src/op/SubscribeOp.cpp +++ b/client/lib/src/op/SubscribeOp.cpp @@ -150,29 +150,50 @@ void SubscribeOp::handle(SubackMsg& msg) auto info = CC_MqttsnSubscribeInfo(); info.m_returnCode = static_cast(msg.field_returnCode().value()); info.m_qos = static_cast(msg.field_flags().field_qos().value()); - info.m_topicId = msg.field_topicId().value(); + auto topicId = static_cast(msg.field_topicId().value()); + auto& topicStr = m_subscribeMsg.field_topicName().field().value(); + auto* topicPtr = topicStr.c_str(); + if (topicStr.empty()) { + topicPtr = nullptr; + } + + if (topicId != 0U) { + storeInRegTopic(topicPtr, topicId); + } + + COMMS_ASSERT((topicId != 0U) || (topicPtr != nullptr)); + auto& filtersMap = client().reuseState().m_subFilters; do { - if (info.m_topicId == 0U) { - break; - } + if (topicPtr != nullptr) { + auto iter = + std::lower_bound( + filtersMap.begin(), filtersMap.end(), topicPtr, + [](auto& elem, const char* topicParam) + { + return elem.m_topic < topicParam; + }); + + if ((iter == filtersMap.end()) || (iter->m_topic != topicPtr)) { + filtersMap.emplace(iter, topicPtr); + } - auto& topicStr = m_subscribeMsg.field_topicName().field().value(); - if (!topicStr.empty()) { - storeInRegTopic(topicStr.c_str(), info.m_topicId); break; } - if constexpr (Config::HasSubTopicVerification) { - storeInRegTopic(nullptr, info.m_topicId); - break; - } - - } while (false); + COMMS_ASSERT(m_subscribeMsg.field_topicId().doesExist()); + COMMS_ASSERT(m_subscribeMsg.field_topicId().field().value() != 0U); - if ((info.m_topicId != 0U) && (!m_subscribeMsg.field_topicName().field().value().empty())) { - storeInRegTopic(m_subscribeMsg.field_topicName().field().value().c_str(), info.m_topicId); - } + auto iter = + std::find_if( + filtersMap.begin(), filtersMap.end(), + [](auto& elem) + { + return !elem.m_topic.empty(); + }); + + filtersMap.emplace(iter, static_cast(m_subscribeMsg.field_topicId().field().value())); + } while (false); completeOpInternal(CC_MqttsnAsyncOpStatus_Complete, &info); } diff --git a/client/lib/src/op/UnsubscribeOp.cpp b/client/lib/src/op/UnsubscribeOp.cpp index 5035e0e..c461160 100644 --- a/client/lib/src/op/UnsubscribeOp.cpp +++ b/client/lib/src/op/UnsubscribeOp.cpp @@ -67,7 +67,47 @@ CC_MqttsnErrorCode UnsubscribeOp::config(const CC_MqttsnUnsubscribeConfig* confi if ((!emptyTopic) && (!verifySubFilter(config->m_topic))) { errorLog("Bad topic filter format in unsubscribe."); return CC_MqttsnErrorCode_BadParam; - } + } + + if constexpr (Config::HasSubTopicVerification) { + do { + if (!client().configState().m_verifySubFilter) { + break; + } + + auto& filtersMap = client().reuseState().m_subFilters; + if (!emptyTopic) { + auto iter = + std::lower_bound( + filtersMap.begin(), filtersMap.end(), config->m_topic, + [](auto& elem, const char* topicParam) + { + return elem.m_topic < topicParam; + }); + + if ((iter == filtersMap.end()) || (iter->m_topic != config->m_topic)) { + errorLog("Requested unsubscribe topic hasn't been used for subscription before"); + return CC_MqttsnErrorCode_BadParam; + } + + break; + } + + COMMS_ASSERT(isValidTopicId(config->m_topicId)); + auto iter = + std::find_if( + filtersMap.begin(), filtersMap.end(), + [config](auto& elem) + { + return config->m_topicId == elem.m_topicId; + }); + + if (iter == filtersMap.end()) { + errorLog("Requested unsubscribe topic ID hasn't been used for subscription before"); + return CC_MqttsnErrorCode_BadParam; + } + } while (false); + } m_unsubscribeMsg.field_flags().field_qos().setValue(config->m_qos); @@ -124,7 +164,60 @@ CC_MqttsnErrorCode UnsubscribeOp::send(CC_MqttsnUnsubscribeCompleteCb cb, void* return ec; } - // TODO: remove the record + // Remove record on first send rather than acknowledgement allowing message + // to get lost. + do { + if (m_recordRemoved) { + break; + } + + m_recordRemoved = true; + + auto& topicStr = m_unsubscribeMsg.field_topicName().field().value(); + auto* topicPtr = topicStr.c_str(); + if (m_unsubscribeMsg.field_topicName().isMissing()) { + topicPtr = nullptr; + } + + auto topicId = m_unsubscribeMsg.field_topicId().field().value(); + COMMS_ASSERT(m_unsubscribeMsg.field_topicId().doesExist() || (topicId == 0U)); + COMMS_ASSERT((topicPtr == nullptr) || (topicId == 0U)); + COMMS_ASSERT((topicPtr != nullptr) || (topicId != 0U)); + + removeInRegTopic(topicPtr, topicId); + + if constexpr (Config::HasSubTopicVerification) { + auto& filtersMap = client().reuseState().m_subFilters; + if (topicPtr != nullptr) { + auto iter = + std::lower_bound( + filtersMap.begin(), filtersMap.end(), topicPtr, + [](auto& elem, const char* topicParam) + { + return elem.m_topic < topicParam; + }); + + if ((iter != filtersMap.end()) && (iter->m_topic != topicPtr)) { + filtersMap.erase(iter); + } + + break; + } + + COMMS_ASSERT(topicId != 0U); + + auto iter = + std::find_if( + filtersMap.begin(), filtersMap.end(), + [topicId](auto& elem) { + return elem.m_topicId == topicId; + }); + + if (iter != filtersMap.end()) { + filtersMap.erase(iter); + } + } + } while (false); completeOnError.release(); return CC_MqttsnErrorCode_Success; diff --git a/client/lib/src/op/UnsubscribeOp.h b/client/lib/src/op/UnsubscribeOp.h index 2de880c..0d5253f 100644 --- a/client/lib/src/op/UnsubscribeOp.h +++ b/client/lib/src/op/UnsubscribeOp.h @@ -56,6 +56,7 @@ class UnsubscribeOp final : public Op CC_MqttsnUnsubscribeCompleteCb m_cb = nullptr; void* m_cbData = nullptr; bool m_suspended = false; + bool m_recordRemoved = false; static_assert(ExtConfig::UnsubscribeOpTimers == 1U); }; diff --git a/client/lib/test/UnitTestCommonBase.cpp b/client/lib/test/UnitTestCommonBase.cpp index 314c930..6e62754 100644 --- a/client/lib/test/UnitTestCommonBase.cpp +++ b/client/lib/test/UnitTestCommonBase.cpp @@ -153,7 +153,6 @@ UnitTestCommonBase::UnitTestConnectCompleteReport::UnitTestConnectCompleteReport UnitTestCommonBase::UnitTestSubscribeInfo& UnitTestCommonBase::UnitTestSubscribeInfo::operator=(const CC_MqttsnSubscribeInfo& info) { m_returnCode = info.m_returnCode; - m_topicId = info.m_topicId; m_qos = info.m_qos; return *this; } diff --git a/client/lib/test/UnitTestCommonBase.h b/client/lib/test/UnitTestCommonBase.h index e80eebd..c26f936 100644 --- a/client/lib/test/UnitTestCommonBase.h +++ b/client/lib/test/UnitTestCommonBase.h @@ -206,7 +206,6 @@ class UnitTestCommonBase struct UnitTestSubscribeInfo { CC_MqttsnReturnCode m_returnCode = CC_MqttsnReturnCode_ValuesLimit; - CC_MqttsnTopicId m_topicId; CC_MqttsnQoS m_qos; UnitTestSubscribeInfo() = default; UnitTestSubscribeInfo(const UnitTestSubscribeInfo&) = default; diff --git a/client/lib/test/UnitTestSubscribe.th b/client/lib/test/UnitTestSubscribe.th index 1c78ff7..ccc6ad2 100644 --- a/client/lib/test/UnitTestSubscribe.th +++ b/client/lib/test/UnitTestSubscribe.th @@ -92,7 +92,6 @@ void UnitTestSubscribe::test1() auto subscribeReport = unitTestSubscribeCompleteReport(); TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete); TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted); - TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, 0U); TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos); TS_ASSERT(unitTestHasTickReq()); @@ -166,7 +165,6 @@ void UnitTestSubscribe::test2() auto subscribeReport = unitTestSubscribeCompleteReport(); TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete); TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted); - TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, AckTopicId); TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos); TS_ASSERT(unitTestHasTickReq()); @@ -236,7 +234,6 @@ void UnitTestSubscribe::test3() auto subscribeReport = unitTestSubscribeCompleteReport(); TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete); TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted); - TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, TopicId); TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos); TS_ASSERT(unitTestHasTickReq()); @@ -322,7 +319,6 @@ void UnitTestSubscribe::test4() auto subscribeReport = unitTestSubscribeCompleteReport(); TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete); TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted); - TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, 0U); TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos); TS_ASSERT(unitTestHasTickReq()); @@ -514,7 +510,6 @@ void UnitTestSubscribe::test6() auto subscribeReport = unitTestSubscribeCompleteReport(); TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete); TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted); - TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, 0U); TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos); } @@ -552,7 +547,6 @@ void UnitTestSubscribe::test6() auto subscribeReport = unitTestSubscribeCompleteReport(); TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete); TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted); - TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, 0U); TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos); } @@ -591,7 +585,6 @@ void UnitTestSubscribe::test6() auto subscribeReport = unitTestSubscribeCompleteReport(); TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete); TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted); - TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, 0U); TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos); }