From 536dc5fc6725d3c77818c5c74a442d87d27d67f2 Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Fri, 16 Aug 2024 08:04:40 +1000 Subject: [PATCH 01/14] Updating next version to be v2.0.1 --- client/lib/include/cc_mqttsn_client/common.h | 2 +- gateway/lib/include/cc_mqttsn_gateway/version.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/lib/include/cc_mqttsn_client/common.h b/client/lib/include/cc_mqttsn_client/common.h index 7b994afe..7d7edb52 100644 --- a/client/lib/include/cc_mqttsn_client/common.h +++ b/client/lib/include/cc_mqttsn_client/common.h @@ -26,7 +26,7 @@ extern "C" { /// @brief Patch level of the library /// @ingroup global -#define CC_MQTTSN_CLIENT_PATCH_VERSION 0U +#define CC_MQTTSN_CLIENT_PATCH_VERSION 1U /// @brief Macro to create numeric version as single unsigned number #define CC_MQTTSN_CLIENT_MAKE_VERSION(major_, minor_, patch_) \ diff --git a/gateway/lib/include/cc_mqttsn_gateway/version.h b/gateway/lib/include/cc_mqttsn_gateway/version.h index 03fbbd98..927ebb29 100644 --- a/gateway/lib/include/cc_mqttsn_gateway/version.h +++ b/gateway/lib/include/cc_mqttsn_gateway/version.h @@ -17,7 +17,7 @@ #define CC_MQTTSN_GW_MINOR_VERSION 0U /// @brief Patch level of the library -#define CC_MQTTSN_GW_PATCH_VERSION 0U +#define CC_MQTTSN_GW_PATCH_VERSION 1U /// @brief Macro to create numeric version as single unsigned number #define CC_MQTTSN_GW_MAKE_VERSION(major_, minor_, patch_) \ From 4f288e27ee99d98d5fa343c03317aed16d6c6539 Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Tue, 20 Aug 2024 08:41:47 +1000 Subject: [PATCH 02/14] Added an ability to the "cc_mqttsn_client_pub" application to send sequence of the same message. --- client/app/common/ProgramOptions.cpp | 12 +++++++ client/app/common/ProgramOptions.h | 4 ++- client/app/pub/Pub.cpp | 53 +++++++++++++++++++++++++--- client/app/pub/Pub.h | 5 +++ 4 files changed, 68 insertions(+), 6 deletions(-) diff --git a/client/app/common/ProgramOptions.cpp b/client/app/common/ProgramOptions.cpp index d35e8b4c..ae162c8c 100644 --- a/client/app/common/ProgramOptions.cpp +++ b/client/app/common/ProgramOptions.cpp @@ -92,6 +92,8 @@ void ProgramOptions::addPublish() ("pub-qos,q", po::value()->default_value(0U), "Publish QoS value") ("pub-retain", "Publish retained message") ("pub-no-disconnect", "Do not gracefuly disconnect when publish is complete") + ("pub-count", po::value()->default_value(1U), "Number of publishes to perform") + ("pub-delay", po::value()->default_value(100U), "Delay in ms between publishes") ; m_desc.add(opts); @@ -230,6 +232,16 @@ bool ProgramOptions::pubNoDisconnect() const return m_vm.count("pub-no-disconnect") > 0U; } +unsigned ProgramOptions::pubCount() const +{ + return m_vm["pub-count"].as(); +} + +unsigned ProgramOptions::pubDelay() const +{ + return m_vm["pub-delay"].as(); +} + std::vector ProgramOptions::subTopics() const { std::vector result; diff --git a/client/app/common/ProgramOptions.h b/client/app/common/ProgramOptions.h index b924298e..8a194383 100644 --- a/client/app/common/ProgramOptions.h +++ b/client/app/common/ProgramOptions.h @@ -70,7 +70,9 @@ class ProgramOptions std::string pubMessage() const; unsigned pubQos() const; bool pubRetain() const; - bool pubNoDisconnect() const; + bool pubNoDisconnect() const; + unsigned pubCount() const; + unsigned pubDelay() const; // Subscribe Options std::vector subTopics() const; diff --git a/client/app/pub/Pub.cpp b/client/app/pub/Pub.cpp index c06039bd..97fc7a99 100644 --- a/client/app/pub/Pub.cpp +++ b/client/app/pub/Pub.cpp @@ -8,6 +8,7 @@ #include "Pub.h" #include +#include #include #include #include @@ -27,7 +28,8 @@ Pub* asThis(void* data) Pub::Pub(boost::asio::io_context& io, int& result) : - Base(io, result) + Base(io, result), + m_timer(io) { opts().addCommon(); opts().addNetwork(); @@ -50,10 +52,21 @@ bool Pub::startImpl() return false; } + m_remCount = opts().pubCount(); + if (m_remCount == 0U) { + logError() << "Amount of requested publishes needs to be at least 1." << std::endl; + return false; + } + return doConnect(); } void Pub::connectCompleteImpl() +{ + doPublish(); +} + +void Pub::doPublish() { auto config = CC_MqttsnPublishConfig(); cc_mqttsn_client_publish_init_config(&config); @@ -77,7 +90,17 @@ void Pub::connectCompleteImpl() } logError() << "Failed to initiate publish operation with error code: " << toString(ec) << std::endl; - doTerminate(); + doTerminate(); +} + +void Pub::doCompleteInternal() +{ + if (opts().pubNoDisconnect()) { + doComplete(); + return; + } + + doDisconnect(); } void Pub::publishCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_MqttsnPublishInfo* info) @@ -98,12 +121,32 @@ void Pub::publishCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_Mqttsn logInfo() << "Publish complete" << std::endl; } - if (opts().pubNoDisconnect()) { - doComplete(); + assert(m_remCount > 0U); + --m_remCount; + + if (m_remCount == 0U) { + doCompleteInternal(); return; } - doDisconnect(); + auto delay = opts().pubDelay(); + if (delay == 0U) { + doPublish(); + return; + } + + m_timer.expires_after(std::chrono::milliseconds(delay)); + m_timer.async_wait( + [this](const boost::system::error_code& ec) + { + if (ec == boost::asio::error::operation_aborted) { + return; + } + + assert(!ec); + doPublish(); + } + ); } void Pub::publishCompleteCb( diff --git a/client/app/pub/Pub.h b/client/app/pub/Pub.h index 6f111125..2d7a11f6 100644 --- a/client/app/pub/Pub.h +++ b/client/app/pub/Pub.h @@ -26,8 +26,13 @@ class Pub : public AppClient virtual void connectCompleteImpl() override; private: + void doPublish(); + void doCompleteInternal(); void publishCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_MqttsnPublishInfo* info); static void publishCompleteCb(void* data, CC_MqttsnPublishHandle handle, CC_MqttsnAsyncOpStatus status, const CC_MqttsnPublishInfo* info); + + boost::asio::steady_timer m_timer; + unsigned m_remCount = 0U; }; } // namespace cc_mqttsn_client_app From 1183994678afa3e2354f43279c4a6c013b53c43d Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Wed, 21 Aug 2024 09:14:44 +1000 Subject: [PATCH 03/14] Testing reuse of the same topic ID in reception after the relevant publish with register. --- client/lib/src/ClientImpl.cpp | 134 ++++++++++++++++----- client/lib/src/ClientImpl.h | 2 + client/lib/src/op/SendOp.cpp | 23 ++-- client/lib/src/op/SubscribeOp.cpp | 2 +- client/lib/test/UnitTestCommonBase.cpp | 115 ++++++++++++++++++ client/lib/test/UnitTestCommonBase.h | 7 ++ client/lib/test/default/UnitTestPublish.th | 2 + client/lib/test/default/UnitTestReceive.th | 60 +++++++++ 8 files changed, 306 insertions(+), 39 deletions(-) diff --git a/client/lib/src/ClientImpl.cpp b/client/lib/src/ClientImpl.cpp index dee846ed..9002f653 100644 --- a/client/lib/src/ClientImpl.cpp +++ b/client/lib/src/ClientImpl.cpp @@ -55,7 +55,7 @@ void updateEc(CC_MqttsnErrorCode* ec, CC_MqttsnErrorCode val) } } -InRegTopicsMap::iterator findInRegTopicInfo(CC_MqttsnTopicId topicId, InRegTopicsMap& map) +InRegTopicsMap::iterator findInRegTopicInfoInternal(CC_MqttsnTopicId topicId, InRegTopicsMap& map) { return std::lower_bound( @@ -65,7 +65,7 @@ InRegTopicsMap::iterator findInRegTopicInfo(CC_MqttsnTopicId topicId, InRegTopic }); } -InRegTopicsMap::iterator findInRegTopicInfo(const char* topic, InRegTopicsMap& map) +InRegTopicsMap::iterator findInRegTopicInfoInternal(const char* topic, InRegTopicsMap& map) { return std::find_if( @@ -76,6 +76,46 @@ InRegTopicsMap::iterator findInRegTopicInfo(const char* topic, InRegTopicsMap& m }); } +OutRegTopicsMap::iterator findOutRegTopicInfoInternal(CC_MqttsnTopicId topicId, OutRegTopicsMap& map) +{ + return + std::find_if( + map.begin(), map.end(), + [topicId](auto& info) + { + return info.m_topicId == topicId; + }); +} + +OutRegTopicsMap::iterator findOutRegTopicInfoInternal(const char* topic, InRegTopicsMap& map) +{ + return + std::lower_bound( + map.begin(), map.end(), topic, + [](auto& info, const char* topicParam) { + return info.m_topic < topicParam; + }); +} + +template +bool removeLeastRecentlyUsedRegTopicInfoIfNeeded(TMap& map, std::size_t limit) +{ + if (map.size() < limit) { + return false; + } + + auto eraseIter = + std::min_element( + map.begin(), map.end(), + [](auto& info1, auto& info2) + { + return info1.m_timestamp < info2.m_timestamp; + }); + + COMMS_ASSERT(eraseIter != map.end()); + map.erase(eraseIter); + return true; +} bool isTopicMatch(std::string_view filter, std::string_view topic) { @@ -917,17 +957,34 @@ void ClientImpl::handle(PublishMsg& msg) CC_MqttsnTopicId topicId = msg.field_topicId().value(); using TopicIdType = std::decay_t; - if (topicIdType == TopicIdType::Normal) { - auto& regMap = m_reuseState.m_inRegTopics; - auto iter = findInRegTopicInfo(topicId, regMap); - if ((iter == regMap.end()) || (iter->m_topicId != topicId)) { - sendPuback(ReturnCode::InvalidTopicId); - return; + do { + if (topicIdType != TopicIdType::Normal) { + break; } - COMMS_ASSERT(!iter->m_topic.empty()); - topic = std::string_view(iter->m_topic.c_str(), iter->m_topic.size()); - } + auto& inRegMap = m_reuseState.m_inRegTopics; + auto inIter = findInRegTopicInfoInternal(topicId, inRegMap); + if ((inIter != inRegMap.end()) && (inIter->m_topicId == topicId)) { + COMMS_ASSERT(!inIter->m_topic.empty()); + topic = std::string_view(inIter->m_topic.c_str(), inIter->m_topic.size()); + break; + } + + auto& outRegMap = m_reuseState.m_outRegTopics; + auto outIter = findOutRegTopicInfoInternal(topicId, outRegMap); + if ((outIter != outRegMap.end()) && (outIter->m_topicId == topicId)) { + COMMS_ASSERT(!outIter->m_topic.empty()); + topic = std::string_view(outIter->m_topic.c_str(), outIter->m_topic.size()); + + // For future use, copy it into input topics as well + storeInRegTopic(outIter->m_topic.c_str(), topicId); + break; + } + + // Didn't fine in either map + sendPuback(ReturnCode::InvalidTopicId); + return; + } while (false); if (topicIdType == TopicIdType::ShortTopicName) { shortTopicName[0] = static_cast((topicId >> 8U) & 0xff); @@ -1325,31 +1382,17 @@ void ClientImpl::allowNextPrepare() void ClientImpl::storeInRegTopic(const char* topic, CC_MqttsnTopicId topicId) { + COMMS_ASSERT(topic != nullptr); auto& map = m_reuseState.m_inRegTopics; - auto iter = findInRegTopicInfo(topicId, map); + auto iter = findInRegTopicInfoInternal(topicId, map); if ((iter != map.end()) && (iter->m_topicId == topicId)) { iter->m_topic = topic; iter->m_timestamp = m_clientState.m_timestamp; return; } - if (m_clientState.m_inRegTopicsLimit <= map.size()) { - auto eraseIter = - std::min_element( - map.begin(), map.end(), - [](auto& info1, auto& info2) - { - return info1.m_timestamp < info2.m_timestamp; - }); - - COMMS_ASSERT(eraseIter != map.end()); - map.erase(eraseIter); - iter = findInRegTopicInfo(topicId, map); // The location can change after erase - } - - if (topic == nullptr) { - map.insert(iter, FullRegTopicInfo{m_clientState.m_timestamp, TopicNameStr(), topicId}); - return; + if (removeLeastRecentlyUsedRegTopicInfoIfNeeded(map, m_clientState.m_inRegTopicsLimit)) { + iter = findInRegTopicInfoInternal(topicId, map); // The location can change after erase } map.insert(iter, FullRegTopicInfo{m_clientState.m_timestamp, topic, topicId}); @@ -1359,7 +1402,7 @@ bool ClientImpl::removeInRegTopic(const char* topic, CC_MqttsnTopicId topicId) { auto& map = m_reuseState.m_inRegTopics; if (op::Op::isValidTopicId(topicId)) { - auto iter = findInRegTopicInfo(topicId, map); + auto iter = findInRegTopicInfoInternal(topicId, map); if ((iter != map.end()) && (iter->m_topicId == topicId)) { map.erase(iter); return true; @@ -1372,7 +1415,7 @@ bool ClientImpl::removeInRegTopic(const char* topic, CC_MqttsnTopicId topicId) return false; } - auto iter = findInRegTopicInfo(topic, map); + auto iter = findInRegTopicInfoInternal(topic, map); if (iter == map.end()) { return false; } @@ -1381,6 +1424,35 @@ bool ClientImpl::removeInRegTopic(const char* topic, CC_MqttsnTopicId topicId) return true; } +CC_MqttsnTopicId ClientImpl::findInRegTopicId(const char* topic) +{ + auto& map = m_reuseState.m_inRegTopics; + auto iter = findInRegTopicInfoInternal(topic, map); + if ((iter == map.end()) || (iter->m_topic != topic)) { + return 0; + } + + return iter->m_topicId; +} + +void ClientImpl::storeOutRegTopic(const char* topic, CC_MqttsnTopicId topicId) +{ + COMMS_ASSERT(topic != nullptr); + auto& map = m_reuseState.m_outRegTopics; + auto iter = findOutRegTopicInfoInternal(topic, map); + if ((iter != map.end()) && (iter->m_topic == topic)) { + iter->m_topicId = topicId; + iter->m_timestamp = m_clientState.m_timestamp; + return; + } + + if (removeLeastRecentlyUsedRegTopicInfoIfNeeded(map, m_clientState.m_outRegTopicsLimit)) { + iter = findOutRegTopicInfoInternal(topic, map); // The location can change after erase + } + + map.insert(iter, FullRegTopicInfo{m_clientState.m_timestamp, topic, topicId}); +} + void ClientImpl::doApiEnter() { ++m_apiEnterCount; diff --git a/client/lib/src/ClientImpl.h b/client/lib/src/ClientImpl.h index 03550159..8e5fb331 100644 --- a/client/lib/src/ClientImpl.h +++ b/client/lib/src/ClientImpl.h @@ -173,6 +173,8 @@ class ClientImpl final : public ProtMsgHandler void allowNextPrepare(); void storeInRegTopic(const char* topic, CC_MqttsnTopicId topicId); bool removeInRegTopic(const char* topic, CC_MqttsnTopicId topicId); + CC_MqttsnTopicId findInRegTopicId(const char* topic); + void storeOutRegTopic(const char* topic, CC_MqttsnTopicId topicId); TimerMgr& timerMgr() { diff --git a/client/lib/src/op/SendOp.cpp b/client/lib/src/op/SendOp.cpp index 610b3717..5b6b44c5 100644 --- a/client/lib/src/op/SendOp.cpp +++ b/client/lib/src/op/SendOp.cpp @@ -109,20 +109,29 @@ CC_MqttsnErrorCode SendOp::config(const CC_MqttsnPublishConfig* config) } m_registerMsg.field_topicName().value() = config->m_topic; - auto& regMap = client().reuseState().m_outRegTopics; - auto iter = + m_publishMsg.field_flags().field_topicIdType().value() = TopicIdType::Normal; + + auto& outRegMap = client().reuseState().m_outRegTopics; + auto outIter = std::lower_bound( - regMap.begin(), regMap.end(), config->m_topic, + outRegMap.begin(), outRegMap.end(), config->m_topic, [](auto& elem, const char* topicParam) { return elem.m_topic < topicParam; }); - if ((iter != regMap.end()) && (iter->m_topic == config->m_topic)) { - m_publishMsg.field_topicId().setValue(iter->m_topicId); - m_publishMsg.field_flags().field_topicIdType().value() = TopicIdType::Normal; + if ((outIter != outRegMap.end()) && (outIter->m_topic == config->m_topic)) { + m_publishMsg.field_topicId().setValue(outIter->m_topicId); m_stage = Stage_Publish; - iter->m_timestamp = client().clientState().m_timestamp; + outIter->m_timestamp = client().clientState().m_timestamp; + break; + } + + auto inTopicId = client().findInRegTopicId(config->m_topic); + if (inTopicId != 0U) { + m_publishMsg.field_topicId().setValue(outIter->m_topicId); + m_stage = Stage_Publish; + client().storeOutRegTopic(config->m_topic, inTopicId); break; } diff --git a/client/lib/src/op/SubscribeOp.cpp b/client/lib/src/op/SubscribeOp.cpp index 2271c08d..67ec1f7d 100644 --- a/client/lib/src/op/SubscribeOp.cpp +++ b/client/lib/src/op/SubscribeOp.cpp @@ -177,7 +177,7 @@ void SubscribeOp::handle(SubackMsg& msg) topicPtr = nullptr; } while (false); - if (topicId != 0U) { + if ((topicId != 0U) && (topicPtr != nullptr)) { client().storeInRegTopic(topicPtr, topicId); } diff --git a/client/lib/test/UnitTestCommonBase.cpp b/client/lib/test/UnitTestCommonBase.cpp index a753b52d..a444843d 100644 --- a/client/lib/test/UnitTestCommonBase.cpp +++ b/client/lib/test/UnitTestCommonBase.cpp @@ -800,6 +800,121 @@ UnitTestCommonBase::UnitTestPublishCompleteReportPtr UnitTestCommonBase::unitTes return ptr; } +void UnitTestCommonBase::unitTestDoPublish( + CC_MqttsnClient* client, + const CC_MqttsnPublishConfig* config, + const UnitTestPublishResponseConfig* respConfig) +{ + auto ec = m_funcs.m_publish(client, config, &UnitTestCommonBase::unitTestPublishCompleteCb, this); + test_assert(ec == CC_MqttsnErrorCode_Success); + + unsigned topicId = 0U; + if ((respConfig != nullptr) && (respConfig->m_regTopicId != 0U)) { + auto regMsgId = 0U; + { + test_assert(unitTestHasOutputData()); + auto sentMsg = unitTestPopOutputMessage(); + auto* registerMsg = dynamic_cast(sentMsg.get()); + test_assert(registerMsg != nullptr); + test_assert(registerMsg->field_topicName().value() == config->m_topic); + test_assert(!unitTestHasOutputData()); + + regMsgId = registerMsg->field_msgId().value(); + } + + test_assert(unitTestHasTickReq()); + unitTestTick(client, 100); + + { + UnitTestRegackMsg regackMsg; + regackMsg.field_msgId().setValue(regMsgId); + regackMsg.field_topicId().setValue(respConfig->m_regTopicId); + regackMsg.field_returnCode().setValue(CC_MqttsnReturnCode_Accepted); + unitTestClientInputMessage(client, regackMsg); + + topicId = respConfig->m_regTopicId; + } + } + + unsigned pubMsgId = 0U; + { + test_assert(unitTestHasOutputData()); + auto sentMsg = unitTestPopOutputMessage(); + auto* publishMsg = dynamic_cast(sentMsg.get()); + test_assert(publishMsg != nullptr); + test_assert(!unitTestHasOutputData()); + + pubMsgId = publishMsg->field_msgId().value(); + } + + if (config->m_qos == CC_MqttsnQoS_AtMostOnceDelivery) { + test_assert(unitTestHasPublishCompleteReport()); + auto publishReport = unitTestPublishCompleteReport(); + test_assert(publishReport->m_status == CC_MqttsnAsyncOpStatus_Complete); + } + + if ((respConfig != nullptr) && (respConfig->m_pubackRetCode != CC_MqttsnReturnCode_Accepted)) { + test_assert(unitTestHasTickReq()); + unitTestTick(client, 100); + + UnitTestPubackMsg pubackMsg; + pubackMsg.field_topicId().setValue(topicId); + pubackMsg.field_msgId().setValue(pubMsgId); + pubackMsg.field_returnCode().setValue(respConfig->m_pubackRetCode); + unitTestClientInputMessage(client, pubackMsg); + return; + } + + if (config->m_qos == CC_MqttsnQoS_AtMostOnceDelivery) { + return; + } + + test_assert(unitTestHasTickReq()); + unitTestTick(client, 100); + + if (config->m_qos == CC_MqttsnQoS_AtLeastOnceDelivery) { + UnitTestPubackMsg pubackMsg; + pubackMsg.field_topicId().setValue(topicId); + pubackMsg.field_msgId().setValue(pubMsgId); + pubackMsg.field_returnCode().setValue(CC_MqttsnReturnCode_Accepted); + unitTestClientInputMessage(client, pubackMsg); + + test_assert(unitTestHasPublishCompleteReport()); + auto publishReport = unitTestPublishCompleteReport(); + test_assert(publishReport->m_status == CC_MqttsnAsyncOpStatus_Complete); + return; + } + + test_assert(config->m_qos == CC_MqttsnQoS_ExactlyOnceDelivery); + + { + UnitTestPubrecMsg pubrecMsg; + pubrecMsg.field_msgId().setValue(pubMsgId); + unitTestClientInputMessage(client, pubrecMsg); + } + + { + test_assert(unitTestHasOutputData()); + auto sentMsg = unitTestPopOutputMessage(); + auto* pubrelMsg = dynamic_cast(sentMsg.get()); + test_assert(pubrelMsg != nullptr); + test_assert(!unitTestHasOutputData()); + } + + test_assert(unitTestHasTickReq()); + unitTestTick(client, 100); + + { + UnitTestPubcompMsg pubcompMsg; + pubcompMsg.field_msgId().setValue(pubMsgId); + unitTestClientInputMessage(client, pubcompMsg); + } + + test_assert(unitTestHasPublishCompleteReport()); + auto publishReport = unitTestPublishCompleteReport(); + test_assert(publishReport->m_status == CC_MqttsnAsyncOpStatus_Complete); +} + CC_MqttsnErrorCode UnitTestCommonBase::unitTestPublishSend(CC_MqttsnPublishHandle publish) { return m_funcs.m_publish_send(publish, &UnitTestCommonBase::unitTestPublishCompleteCb, this); diff --git a/client/lib/test/UnitTestCommonBase.h b/client/lib/test/UnitTestCommonBase.h index d8cbbd68..ef619b48 100644 --- a/client/lib/test/UnitTestCommonBase.h +++ b/client/lib/test/UnitTestCommonBase.h @@ -319,6 +319,12 @@ class UnitTestCommonBase using UnitTestPublishCompleteReportPtr = std::unique_ptr; using UnitTestPublishCompleteReportList = std::list; + struct UnitTestPublishResponseConfig + { + CC_MqttsnTopicId m_regTopicId = 0U; + CC_MqttsnReturnCode m_pubackRetCode = CC_MqttsnReturnCode_Accepted; + }; + struct UnitTestWillInfo { CC_MqttsnReturnCode m_topicUpdReturnCode = CC_MqttsnReturnCode_ValuesLimit; @@ -434,6 +440,7 @@ class UnitTestCommonBase bool unitTestHasPublishCompleteReport() const; UnitTestPublishCompleteReportPtr unitTestPublishCompleteReport(bool mustExist = true); + void unitTestDoPublish(CC_MqttsnClient* client, const CC_MqttsnPublishConfig* config, const UnitTestPublishResponseConfig* respConfig = nullptr); CC_MqttsnErrorCode unitTestPublishSend(CC_MqttsnPublishHandle publish); bool unitTestHasWillCompleteReport() const; diff --git a/client/lib/test/default/UnitTestPublish.th b/client/lib/test/default/UnitTestPublish.th index cda97cef..0575b28b 100644 --- a/client/lib/test/default/UnitTestPublish.th +++ b/client/lib/test/default/UnitTestPublish.th @@ -2361,3 +2361,5 @@ void UnitTestPublish::test21() TS_ASSERT_EQUALS(publishReport->m_status, CC_MqttsnAsyncOpStatus_Complete); } } + +// TODO: publish on the same topic after reception of the message, test reuse of the topic ID \ No newline at end of file diff --git a/client/lib/test/default/UnitTestReceive.th b/client/lib/test/default/UnitTestReceive.th index 4febcf9e..ac4b8aa4 100644 --- a/client/lib/test/default/UnitTestReceive.th +++ b/client/lib/test/default/UnitTestReceive.th @@ -21,6 +21,7 @@ public: void test11(); void test12(); void test13(); + void test14(); private: virtual void setUp() override @@ -1039,3 +1040,62 @@ void UnitTestReceive::test13() TS_ASSERT(unitTestHasTickReq()); } + +void UnitTestReceive::test14() +{ + // Testing reuse of the same topic ID in reception allocated for publish + + auto clientPtr = unitTestAllocClient(true); + auto* client = clientPtr.get(); + + const std::string ClientId("bla"); + unitTestDoConnectBasic(client, ClientId); + + TS_ASSERT(unitTestHasTickReq()); + unitTestTick(client, 1000); + + unitTestDoSubscribeTopic(client, "#"); + + TS_ASSERT(unitTestHasTickReq()); + unitTestTick(client, 1000); + + const std::string Topic("some/topic"); + const UnitTestData Data = {1, 2, 3, 4}; + const CC_MqttsnTopicId TopicId = 123; + + + CC_MqttsnPublishConfig pubConfig; + apiPublishInitConfig(&pubConfig); + pubConfig.m_topic = Topic.c_str(); + pubConfig.m_data = Data.data(); + pubConfig.m_dataLen = static_cast(Data.size()); + + UnitTestPublishResponseConfig pubResp; + pubResp.m_regTopicId = TopicId; + + unitTestDoPublish(client, &pubConfig, &pubResp); + + TS_ASSERT(unitTestHasTickReq()); + unitTestTick(client, 1000); + + { + UnitTestPublishMsg publishMsg; + publishMsg.field_flags().field_qos().setValue(CC_MqttsnQoS_AtMostOnceDelivery); + publishMsg.field_flags().field_topicIdType().value() = TopicIdType::Normal; + publishMsg.field_topicId().setValue(TopicId); + publishMsg.field_data().value() = Data; + + unitTestClientInputMessage(client, publishMsg); + } + + { + TS_ASSERT(unitTestHasReceivedMessage()); + auto msgInfo = unitTestReceivedMessage(); + TS_ASSERT(!msgInfo->m_topic.empty()); + TS_ASSERT_EQUALS(msgInfo->m_topic, Topic); + TS_ASSERT_EQUALS(msgInfo->m_data, Data); + TS_ASSERT(!unitTestHasReceivedMessage()); + } + + TS_ASSERT(!unitTestHasOutputData()); +} \ No newline at end of file From b09e351c821fa1f6e70d73c45fd584603d67d2ce Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Thu, 22 Aug 2024 07:58:30 +1000 Subject: [PATCH 04/14] Testing reuse of the same topic ID in publish after reception registation of the same topic. --- client/lib/src/op/SendOp.cpp | 2 +- client/lib/test/default/UnitTestPublish.th | 106 ++++++++++++++++++++- 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/client/lib/src/op/SendOp.cpp b/client/lib/src/op/SendOp.cpp index 5b6b44c5..a5cff2d8 100644 --- a/client/lib/src/op/SendOp.cpp +++ b/client/lib/src/op/SendOp.cpp @@ -129,7 +129,7 @@ CC_MqttsnErrorCode SendOp::config(const CC_MqttsnPublishConfig* config) auto inTopicId = client().findInRegTopicId(config->m_topic); if (inTopicId != 0U) { - m_publishMsg.field_topicId().setValue(outIter->m_topicId); + m_publishMsg.field_topicId().setValue(inTopicId); m_stage = Stage_Publish; client().storeOutRegTopic(config->m_topic, inTopicId); break; diff --git a/client/lib/test/default/UnitTestPublish.th b/client/lib/test/default/UnitTestPublish.th index 0575b28b..3d4316ca 100644 --- a/client/lib/test/default/UnitTestPublish.th +++ b/client/lib/test/default/UnitTestPublish.th @@ -29,6 +29,7 @@ public: void test19(); void test20(); void test21(); + void test22(); private: virtual void setUp() override @@ -42,6 +43,7 @@ private: } using TopicIdType = UnitTestPublishMsg::Field_flags::Field_topicIdType::ValueType; + using RetCode = UnitTestPubackMsg::Field_returnCode::ValueType; }; void UnitTestPublish::test1() @@ -2362,4 +2364,106 @@ void UnitTestPublish::test21() } } -// TODO: publish on the same topic after reception of the message, test reuse of the topic ID \ No newline at end of file +void UnitTestPublish::test22() +{ + // Testing publish on the same topic after reception of the message, testing reuse of the topic ID + + auto clientPtr = unitTestAllocClient(true); + auto* client = clientPtr.get(); + + const std::string ClientId("bla"); + unitTestDoConnectBasic(client, ClientId); + + TS_ASSERT(unitTestHasTickReq()); + unitTestTick(client, 1000); + + unitTestDoSubscribeTopic(client, "#"); + + TS_ASSERT(unitTestHasTickReq()); + unitTestTick(client, 1000); + + const CC_MqttsnQoS Qos = CC_MqttsnQoS_AtMostOnceDelivery; + const std::string Topic = "abcd"; + const UnitTestData Data = {1, 2, 3, 4}; + const CC_MqttsnTopicId TopicId = 123; + const std::uint16_t RegMsgId = 1; + + { + UnitTestRegisterMsg registerMsg; + registerMsg.field_topicId().setValue(TopicId); + registerMsg.field_msgId().setValue(RegMsgId); + registerMsg.field_topicName().setValue(Topic); + unitTestClientInputMessage(client, registerMsg); + } + + { + TS_ASSERT(unitTestHasOutputData()); + auto sentMsg = unitTestPopOutputMessage(); + auto* regackMsg = dynamic_cast(sentMsg.get()); + TS_ASSERT_DIFFERS(regackMsg, nullptr); + TS_ASSERT_EQUALS(regackMsg->field_topicId().value(), TopicId); + TS_ASSERT_EQUALS(regackMsg->field_msgId().value(), RegMsgId); + TS_ASSERT_EQUALS(regackMsg->field_returnCode().value(), RetCode::Accepted); + } + + TS_ASSERT(unitTestHasTickReq()); + unitTestTick(client, 1000); + + { + UnitTestPublishMsg publishMsg; + publishMsg.field_flags().field_qos().setValue(Qos); + publishMsg.field_flags().field_topicIdType().value() = TopicIdType::Normal; + publishMsg.field_topicId().setValue(TopicId); + publishMsg.field_data().value() = Data; + + unitTestClientInputMessage(client, publishMsg); + } + + { + TS_ASSERT(unitTestHasReceivedMessage()); + auto msgInfo = unitTestReceivedMessage(); + TS_ASSERT_EQUALS(msgInfo->m_topic, Topic); + TS_ASSERT_EQUALS(msgInfo->m_topicId, 0U); + TS_ASSERT_EQUALS(msgInfo->m_data, Data); + TS_ASSERT_EQUALS(msgInfo->m_qos, Qos); + TS_ASSERT(!unitTestHasReceivedMessage()); + } + + TS_ASSERT(unitTestHasTickReq()); + unitTestTick(client, 1000); + + CC_MqttsnPublishConfig config; + apiPublishInitConfig(&config); + + config.m_topic = Topic.c_str(); + config.m_data = Data.data(); + config.m_dataLen = static_cast(Data.size()); + + auto publish = apiPublishPrepare(client); + TS_ASSERT_DIFFERS(publish, nullptr); + + auto ec = apiPublishConfig(publish, &config); + TS_ASSERT_EQUALS(ec, CC_MqttsnErrorCode_Success); + + ec = unitTestPublishSend(publish); + TS_ASSERT_EQUALS(ec, CC_MqttsnErrorCode_Success); + + { + TS_ASSERT(unitTestHasOutputData()); + auto sentMsg = unitTestPopOutputMessage(); + auto* publishMsg = dynamic_cast(sentMsg.get()); + TS_ASSERT_DIFFERS(publishMsg, nullptr); + TS_ASSERT_EQUALS(publishMsg->field_flags().field_topicIdType().value(), TopicIdType::Normal); + TS_ASSERT_EQUALS(publishMsg->field_topicId().value(), TopicId); + TS_ASSERT_EQUALS(publishMsg->field_data().value(), Data); + TS_ASSERT(!unitTestHasOutputData()); + } + + { + TS_ASSERT(unitTestHasPublishCompleteReport()); + auto publishReport = unitTestPublishCompleteReport(); + TS_ASSERT_EQUALS(publishReport->m_handle, publish); + TS_ASSERT_EQUALS(publishReport->m_status, CC_MqttsnAsyncOpStatus_Complete); + } + +} From 70779d2bb9bf9d501260bc136242cf7eff7c1dfd Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Thu, 22 Aug 2024 08:19:25 +1000 Subject: [PATCH 05/14] Attempt to use upstream boost cmake configuration for latest cmake. --- .github/workflows/actions_build.yml | 2 +- CMakeLists.txt | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/actions_build.yml b/.github/workflows/actions_build.yml index 59d8f745..567b9b91 100644 --- a/.github/workflows/actions_build.yml +++ b/.github/workflows/actions_build.yml @@ -235,7 +235,7 @@ jobs: working-directory: ${{runner.workspace}}/build run: | cmake %GITHUB_WORKSPACE% -A ${{matrix.arch}} -DCMAKE_BUILD_TYPE=${{matrix.type}} -DCMAKE_INSTALL_PREFIX=install ^ - -DCMAKE_PREFIX_PATH=${{runner.workspace}}/build/install -DCMAKE_CXX_STANDARD=${{matrix.cpp}} ^ + -DCMAKE_PREFIX_PATH="${{runner.workspace}}/build/install;C:\local\boost_1_86_0" -DCMAKE_CXX_STANDARD=${{matrix.cpp}} ^ -DBoost_USE_STATIC_LIBS=ON -DCC_MQTTSN_BUILD_UNIT_TESTS=ON ^ -DCC_MQTTSN_CLIENT_APPS=${{env.HAS_BOOST}} -DCC_MQTTSN_GATEWAY_APPS=${{env.HAS_BOOST}} ^ -DCC_MQTTSN_CUSTOM_CLIENT_CONFIG_FILES="%GITHUB_WORKSPACE%/client/lib/script/BareMetalTestConfig.cmake;%GITHUB_WORKSPACE%/client/lib/script/NoGwDiscoverConfig.cmake;%GITHUB_WORKSPACE%/client/lib/script/Qos1Config.cmake;%GITHUB_WORKSPACE%/client/lib/script/Qos0Config.cmake" diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a8103e7..dbb822e9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,6 +26,11 @@ if ("${CMAKE_CXX_STANDARD}" STREQUAL "") set(CMAKE_CXX_STANDARD 17) endif () +if ("${CMAKE_VERSION}" VERSION_GREATER_EQUAL "3.30") + # Find boost cmake configuration from the boost installation + cmake_policy(SET CMP0167 NEW) +endif () + find_package(LibComms REQUIRED) include (${PROJECT_SOURCE_DIR}/cmake/Compile.cmake) From b70385db9efb917bfd30f31508e42b7266f79db3 Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Thu, 22 Aug 2024 08:20:28 +1000 Subject: [PATCH 06/14] Temporarily disable builds on github actions. --- .github/workflows/actions_build.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/actions_build.yml b/.github/workflows/actions_build.yml index 567b9b91..f480db6f 100644 --- a/.github/workflows/actions_build.yml +++ b/.github/workflows/actions_build.yml @@ -9,6 +9,7 @@ env: jobs: build_gcc_ubuntu_22_04: + if: false runs-on: ubuntu-22.04 strategy: fail-fast: false @@ -69,6 +70,7 @@ jobs: run: ctest -V build_clang_ubuntu_22_04: + if: false runs-on: ubuntu-22.04 strategy: fail-fast: false @@ -139,6 +141,7 @@ jobs: run: ctest -V build_msvc_2019: + if: false runs-on: windows-2019 strategy: fail-fast: false From 4bcb675a022640e92c1caa5013f5c4803beb7378 Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Thu, 22 Aug 2024 08:39:37 +1000 Subject: [PATCH 07/14] Using old way of finding boost on github actions. --- .github/workflows/actions_build.yml | 3 ++- CMakeLists.txt | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/actions_build.yml b/.github/workflows/actions_build.yml index f480db6f..cb98fcae 100644 --- a/.github/workflows/actions_build.yml +++ b/.github/workflows/actions_build.yml @@ -238,7 +238,8 @@ jobs: working-directory: ${{runner.workspace}}/build run: | cmake %GITHUB_WORKSPACE% -A ${{matrix.arch}} -DCMAKE_BUILD_TYPE=${{matrix.type}} -DCMAKE_INSTALL_PREFIX=install ^ - -DCMAKE_PREFIX_PATH="${{runner.workspace}}/build/install;C:\local\boost_1_86_0" -DCMAKE_CXX_STANDARD=${{matrix.cpp}} ^ + -DCMAKE_PREFIX_PATH=${{runner.workspace}}/build/install -DCMAKE_CXX_STANDARD=${{matrix.cpp}} ^ + -DCMAKE_POLICY_DEFAULT_CMP0167=OLD ^ -DBoost_USE_STATIC_LIBS=ON -DCC_MQTTSN_BUILD_UNIT_TESTS=ON ^ -DCC_MQTTSN_CLIENT_APPS=${{env.HAS_BOOST}} -DCC_MQTTSN_GATEWAY_APPS=${{env.HAS_BOOST}} ^ -DCC_MQTTSN_CUSTOM_CLIENT_CONFIG_FILES="%GITHUB_WORKSPACE%/client/lib/script/BareMetalTestConfig.cmake;%GITHUB_WORKSPACE%/client/lib/script/NoGwDiscoverConfig.cmake;%GITHUB_WORKSPACE%/client/lib/script/Qos1Config.cmake;%GITHUB_WORKSPACE%/client/lib/script/Qos0Config.cmake" diff --git a/CMakeLists.txt b/CMakeLists.txt index dbb822e9..27594c9b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,7 +26,8 @@ if ("${CMAKE_CXX_STANDARD}" STREQUAL "") set(CMAKE_CXX_STANDARD 17) endif () -if ("${CMAKE_VERSION}" VERSION_GREATER_EQUAL "3.30") +if (("${CMAKE_VERSION}" VERSION_GREATER_EQUAL "3.30") AND + (NOT DEFINED CMAKE_POLICY_DEFAULT_CMP0167)) # Find boost cmake configuration from the boost installation cmake_policy(SET CMP0167 NEW) endif () From 34c0174b45f5c1a3fbf4efceb171908cac51ac05 Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Thu, 22 Aug 2024 08:55:53 +1000 Subject: [PATCH 08/14] Trying version 1.85 of boost on windows-2022 github actions runner. --- .github/workflows/actions_build.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/actions_build.yml b/.github/workflows/actions_build.yml index cb98fcae..82722592 100644 --- a/.github/workflows/actions_build.yml +++ b/.github/workflows/actions_build.yml @@ -216,7 +216,7 @@ jobs: if: matrix.arch == 'x64' shell: cmd run: | - choco install boost-msvc-14.3 + choco install boost-msvc-14.3 --version=1.85.0 - name: Prepare externals shell: cmd @@ -239,7 +239,6 @@ jobs: run: | cmake %GITHUB_WORKSPACE% -A ${{matrix.arch}} -DCMAKE_BUILD_TYPE=${{matrix.type}} -DCMAKE_INSTALL_PREFIX=install ^ -DCMAKE_PREFIX_PATH=${{runner.workspace}}/build/install -DCMAKE_CXX_STANDARD=${{matrix.cpp}} ^ - -DCMAKE_POLICY_DEFAULT_CMP0167=OLD ^ -DBoost_USE_STATIC_LIBS=ON -DCC_MQTTSN_BUILD_UNIT_TESTS=ON ^ -DCC_MQTTSN_CLIENT_APPS=${{env.HAS_BOOST}} -DCC_MQTTSN_GATEWAY_APPS=${{env.HAS_BOOST}} ^ -DCC_MQTTSN_CUSTOM_CLIENT_CONFIG_FILES="%GITHUB_WORKSPACE%/client/lib/script/BareMetalTestConfig.cmake;%GITHUB_WORKSPACE%/client/lib/script/NoGwDiscoverConfig.cmake;%GITHUB_WORKSPACE%/client/lib/script/Qos1Config.cmake;%GITHUB_WORKSPACE%/client/lib/script/Qos0Config.cmake" From 5b9d61b07c1d7f2a0f9c1543b74fef7a7c937139 Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Thu, 22 Aug 2024 09:07:05 +1000 Subject: [PATCH 09/14] Forcing old way to find boost on github actions. --- .github/workflows/actions_build.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/actions_build.yml b/.github/workflows/actions_build.yml index 82722592..a30dc94d 100644 --- a/.github/workflows/actions_build.yml +++ b/.github/workflows/actions_build.yml @@ -182,7 +182,7 @@ jobs: run: | cmake %GITHUB_WORKSPACE% -A ${{matrix.arch}} -DCMAKE_BUILD_TYPE=${{matrix.type}} -DCMAKE_INSTALL_PREFIX=install ^ -DCMAKE_PREFIX_PATH=${{runner.workspace}}/build/install -DCMAKE_CXX_STANDARD=${{matrix.cpp}} ^ - -DBoost_USE_STATIC_LIBS=ON -DCC_MQTTSN_BUILD_UNIT_TESTS=ON ^ + -DCMAKE_POLICY_DEFAULT_CMP0167=OLD -DBoost_USE_STATIC_LIBS=ON -DCC_MQTTSN_BUILD_UNIT_TESTS=ON ^ -DCC_MQTTSN_CUSTOM_CLIENT_CONFIG_FILES="%GITHUB_WORKSPACE%/client/lib/script/BareMetalTestConfig.cmake;%GITHUB_WORKSPACE%/client/lib/script/NoGwDiscoverConfig.cmake;%GITHUB_WORKSPACE%/client/lib/script/Qos1Config.cmake;%GITHUB_WORKSPACE%/client/lib/script/Qos0Config.cmake" - name: Build Target @@ -239,7 +239,7 @@ jobs: run: | cmake %GITHUB_WORKSPACE% -A ${{matrix.arch}} -DCMAKE_BUILD_TYPE=${{matrix.type}} -DCMAKE_INSTALL_PREFIX=install ^ -DCMAKE_PREFIX_PATH=${{runner.workspace}}/build/install -DCMAKE_CXX_STANDARD=${{matrix.cpp}} ^ - -DBoost_USE_STATIC_LIBS=ON -DCC_MQTTSN_BUILD_UNIT_TESTS=ON ^ + -DCMAKE_POLICY_DEFAULT_CMP0167=OLD -DBoost_USE_STATIC_LIBS=ON -DCC_MQTTSN_BUILD_UNIT_TESTS=ON ^ -DCC_MQTTSN_CLIENT_APPS=${{env.HAS_BOOST}} -DCC_MQTTSN_GATEWAY_APPS=${{env.HAS_BOOST}} ^ -DCC_MQTTSN_CUSTOM_CLIENT_CONFIG_FILES="%GITHUB_WORKSPACE%/client/lib/script/BareMetalTestConfig.cmake;%GITHUB_WORKSPACE%/client/lib/script/NoGwDiscoverConfig.cmake;%GITHUB_WORKSPACE%/client/lib/script/Qos1Config.cmake;%GITHUB_WORKSPACE%/client/lib/script/Qos0Config.cmake" env: From ed37b551319a2fe4f68a29a39dfa4f3a249b860c Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Thu, 22 Aug 2024 09:18:40 +1000 Subject: [PATCH 10/14] Revert "Temporarily disable builds on github actions." This reverts commit b70385db9efb917bfd30f31508e42b7266f79db3. --- .github/workflows/actions_build.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/actions_build.yml b/.github/workflows/actions_build.yml index a30dc94d..74861c74 100644 --- a/.github/workflows/actions_build.yml +++ b/.github/workflows/actions_build.yml @@ -9,7 +9,6 @@ env: jobs: build_gcc_ubuntu_22_04: - if: false runs-on: ubuntu-22.04 strategy: fail-fast: false @@ -70,7 +69,6 @@ jobs: run: ctest -V build_clang_ubuntu_22_04: - if: false runs-on: ubuntu-22.04 strategy: fail-fast: false @@ -141,7 +139,6 @@ jobs: run: ctest -V build_msvc_2019: - if: false runs-on: windows-2019 strategy: fail-fast: false From e36b0505150ce234ff9adc9d3e9242f86a913cca Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Fri, 23 Aug 2024 08:42:56 +1000 Subject: [PATCH 11/14] Explicitly ignoring reception of previously broadcast data in UDP gateway. --- .../gateway/GatewayIoClientAcceptor_Udp.cpp | 25 +++++++++++++++++++ .../app/gateway/GatewayIoClientAcceptor_Udp.h | 1 + 2 files changed, 26 insertions(+) diff --git a/gateway/app/gateway/GatewayIoClientAcceptor_Udp.cpp b/gateway/app/gateway/GatewayIoClientAcceptor_Udp.cpp index 8897f0e9..a2855512 100644 --- a/gateway/app/gateway/GatewayIoClientAcceptor_Udp.cpp +++ b/gateway/app/gateway/GatewayIoClientAcceptor_Udp.cpp @@ -164,6 +164,26 @@ void GatewayIoClientAcceptor_Udp::doAccept() break; } + do { + if (m_lastBroadcastData.empty()) { + break; + } + + if (m_senderEndpoint.port() != m_socket.local_endpoint().port()) { + break; + } + + if ((m_lastBroadcastData.size() != bytesCount) || + (!std::equal(m_lastBroadcastData.begin(), m_lastBroadcastData.end(), m_inBuf.begin()))) { + break; + } + + // Received previous broadcast, ignoring... + m_lastBroadcastData.clear(); + doAccept(); + return; + } while (false); + auto iter = m_clients.find(m_senderEndpoint); if (iter != m_clients.end()) { auto* socketPtr = iter->second; @@ -251,6 +271,11 @@ void GatewayIoClientAcceptor_Udp::sendPendingWrites() } while (false); + if ((info.m_endpoint == m_broadcastEndpoint) && + (m_socket.local_endpoint().port() == m_broadcastEndpoint.port())) { + m_lastBroadcastData = std::move(m_pendingWrites.front().m_data); + } + m_pendingWrites.pop_front(); sendPendingWrites(); }); diff --git a/gateway/app/gateway/GatewayIoClientAcceptor_Udp.h b/gateway/app/gateway/GatewayIoClientAcceptor_Udp.h index 83489845..bbe23323 100644 --- a/gateway/app/gateway/GatewayIoClientAcceptor_Udp.h +++ b/gateway/app/gateway/GatewayIoClientAcceptor_Udp.h @@ -50,6 +50,7 @@ class GatewayIoClientAcceptor_Udp final : public GatewayIoClientAcceptor Socket m_socket; Endpoint m_senderEndpoint; Endpoint m_broadcastEndpoint; + DataBuf m_lastBroadcastData; std::uint16_t m_acceptPort = 0U; std::uint16_t m_broadcastPort = 0U; std::string m_broadcastAddress; From e6edf2a12658b047d52f6cd3908ffdff843bc737 Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Fri, 23 Aug 2024 09:13:20 +1000 Subject: [PATCH 12/14] Checking that the gateway disconnection report callback was assigned. --- client/app/common/AppClient.cpp | 42 ++++++++++++++++++++++++++++++--- client/app/common/AppClient.h | 3 +++ client/lib/src/ClientImpl.cpp | 3 ++- client/lib/src/TimerMgr.cpp | 1 + 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/client/app/common/AppClient.cpp b/client/app/common/AppClient.cpp index 5bde8db7..c432df7b 100644 --- a/client/app/common/AppClient.cpp +++ b/client/app/common/AppClient.cpp @@ -46,6 +46,26 @@ std::string toString(CC_MqttsnQoS val) return Map[idx] + " (" + std::to_string(val) + ')'; } + +std::string toStringInternal(CC_MqttsnGatewayDisconnectReason val) +{ + static const std::string Map[] = { + /* CC_MqttsnGatewayDisconnectReason_DisconnectMsg */ "DISCONNECT Message", + /* CC_MqttsnGatewayDisconnectReason_NoGatewayResponse */ "No Response", + }; + + static constexpr std::size_t MapSize = std::extent::value; + static_assert(MapSize == CC_MqttsnGatewayDisconnectReason_ValuesLimit); + + auto idx = static_cast(val); + if (MapSize <= idx) { + assert(false); // Should not happen + return std::to_string(val); + } + + return Map[idx] + " (" + std::to_string(val) + ')'; +} + void printQos(const char* prefix, CC_MqttsnQoS val) { std::cout << "\t" << prefix << ": " << toString(val) << '\n'; @@ -173,6 +193,7 @@ AppClient::AppClient(boost::asio::io_context& io, int& result) : { assert(m_client); ::cc_mqttsn_client_set_send_output_data_callback(m_client.get(), &AppClient::sendDataCb, this); + ::cc_mqttsn_client_set_gw_disconnect_report_callback(m_client.get(), &AppClient::gwDisconnectedReportCb, this); ::cc_mqttsn_client_set_message_report_callback(m_client.get(), &AppClient::messageReceivedCb, this); ::cc_mqttsn_client_set_error_log_callback(m_client.get(), &AppClient::logMessageCb, this); ::cc_mqttsn_client_set_next_tick_program_callback(m_client.get(), &AppClient::nextTickProgramCb, this); @@ -278,6 +299,11 @@ void AppClient::disconnectCompleteImpl() doComplete(); } +void AppClient::gwDisconnectedReportImpl() +{ + doTerminate(); +} + std::vector AppClient::parseBinaryData(const std::string& val) { std::vector result; @@ -364,7 +390,6 @@ std::string AppClient::toString(CC_MqttsnAsyncOpStatus val) return Map[idx] + " (" + std::to_string(val) + ')'; } - std::string AppClient::toString(CC_MqttsnReturnCode val) { static const std::string Map[] = { @@ -492,6 +517,12 @@ void AppClient::disconnectCompleteInternal(CC_MqttsnAsyncOpStatus status) disconnectCompleteImpl(); } +void AppClient::gwDisconnectedReportInternal(CC_MqttsnGatewayDisconnectReason reason) +{ + logInfo() << "Gateway disconnected with reason: " << toStringInternal(reason) << std::endl; + gwDisconnectedReportImpl(); +} + void AppClient::sendDataCb(void* data, const unsigned char* buf, unsigned bufLen, unsigned broadcastRadius) { asThis(data)->sendDataInternal(buf, bufLen, broadcastRadius); @@ -519,12 +550,17 @@ unsigned AppClient::cancelNextTickWaitCb(void* data) void AppClient::connectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info) { - return asThis(data)->connectCompleteInternal(status, info); + asThis(data)->connectCompleteInternal(status, info); } void AppClient::disconnectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status) { - return asThis(data)->disconnectCompleteInternal(status); + asThis(data)->disconnectCompleteInternal(status); +} + +void AppClient::gwDisconnectedReportCb(void* data, CC_MqttsnGatewayDisconnectReason reason) +{ + asThis(data)->gwDisconnectedReportInternal(reason); } } // namespace cc_mqttsn_client_app diff --git a/client/app/common/AppClient.h b/client/app/common/AppClient.h index ccea178c..5883d62b 100644 --- a/client/app/common/AppClient.h +++ b/client/app/common/AppClient.h @@ -72,6 +72,7 @@ class AppClient virtual void messageReceivedImpl(const CC_MqttsnMessageInfo* info); virtual void connectCompleteImpl(); virtual void disconnectCompleteImpl(); + virtual void gwDisconnectedReportImpl(); static std::vector parseBinaryData(const std::string& val); @@ -94,6 +95,7 @@ class AppClient bool createSession(); void connectCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info); void disconnectCompleteInternal(CC_MqttsnAsyncOpStatus status); + void gwDisconnectedReportInternal(CC_MqttsnGatewayDisconnectReason reason); static void sendDataCb(void* data, const unsigned char* buf, unsigned bufLen, unsigned broadcastRadius); static void messageReceivedCb(void* data, const CC_MqttsnMessageInfo* info); @@ -102,6 +104,7 @@ class AppClient static unsigned cancelNextTickWaitCb(void* data); static void connectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info); static void disconnectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status); + static void gwDisconnectedReportCb(void* data, CC_MqttsnGatewayDisconnectReason reason); boost::asio::io_context& m_io; int& m_result; diff --git a/client/lib/src/ClientImpl.cpp b/client/lib/src/ClientImpl.cpp index 9002f653..fe75c9db 100644 --- a/client/lib/src/ClientImpl.cpp +++ b/client/lib/src/ClientImpl.cpp @@ -1554,7 +1554,8 @@ CC_MqttsnErrorCode ClientImpl::initInternal() auto guard = apiEnter(); if ((m_sendOutputDataCb == nullptr) || - (m_messageReceivedReportCb == nullptr)) { + (m_messageReceivedReportCb == nullptr) || + (m_gatewayDisconnectedReportCb == nullptr)) { errorLog("Hasn't set all must have callbacks"); return CC_MqttsnErrorCode_NotIntitialized; } diff --git a/client/lib/src/TimerMgr.cpp b/client/lib/src/TimerMgr.cpp index f19d57da..fabf98d5 100644 --- a/client/lib/src/TimerMgr.cpp +++ b/client/lib/src/TimerMgr.cpp @@ -74,6 +74,7 @@ void TimerMgr::tick(unsigned ms) if (info.m_timeoutMs <= ms) { cbList.push_back({info.m_timeoutCb, info.m_timeoutData}); timerCancel(idx); + COMMS_ASSERT(!timerIsActive(idx)); continue; } From b0fa25d0d87913b585699b7d2b7da73a8fbc193b Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Mon, 26 Aug 2024 08:54:43 +1000 Subject: [PATCH 13/14] Fixing non-clean session start for client applications. --- client/lib/src/op/ConnectOp.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/lib/src/op/ConnectOp.cpp b/client/lib/src/op/ConnectOp.cpp index 7c1b2583..a064115e 100644 --- a/client/lib/src/op/ConnectOp.cpp +++ b/client/lib/src/op/ConnectOp.cpp @@ -45,7 +45,7 @@ CC_MqttsnErrorCode ConnectOp::config(const CC_MqttsnConnectConfig* config) return CC_MqttsnErrorCode_BadParam; } - if (client().clientState().m_firstConnect && (!config->m_cleanSession)) { + if (client().clientState().m_firstConnect && (!config->m_cleanSession) && client().configState().m_verifySubFilter) { errorLog("First connect must force clean session"); return CC_MqttsnErrorCode_BadParam; } @@ -130,7 +130,8 @@ CC_MqttsnErrorCode ConnectOp::send(CC_MqttsnConnectCompleteCb cb, void* cbData) } if ((!m_connectMsg.field_flags().field_mid().getBitValue_CleanSession()) && - (client().clientState().m_firstConnect)) { + (client().clientState().m_firstConnect) && + (client().configState().m_verifySubFilter)) { errorLog("Clean session flag needs to be set on the first connection attempt, perform configuration first."); return CC_MqttsnErrorCode_InsufficientConfig; } From b03bfe4d0d83ce68998e10bb8531dfcdb4b26086 Mon Sep 17 00:00:00 2001 From: Alex Robenko Date: Mon, 26 Aug 2024 08:54:59 +1000 Subject: [PATCH 14/14] More events output for the gateway application. --- gateway/app/gateway/GatewayApp.cpp | 27 ++++++++++++++++++- .../gateway/GatewayIoClientAcceptor_Udp.cpp | 1 + .../app/gateway/GatewayIoClientSocket_Udp.cpp | 9 +++++++ .../app/gateway/GatewayIoClientSocket_Udp.h | 6 +---- gateway/app/gateway/GatewaySession.cpp | 25 ++++++++++++++--- gateway/app/gateway/GatewaySession.h | 23 +++++++++++++--- 6 files changed, 79 insertions(+), 12 deletions(-) diff --git a/gateway/app/gateway/GatewayApp.cpp b/gateway/app/gateway/GatewayApp.cpp index f2e34f86..0be3e5ea 100644 --- a/gateway/app/gateway/GatewayApp.cpp +++ b/gateway/app/gateway/GatewayApp.cpp @@ -68,7 +68,7 @@ bool GatewayApp::start(int argc, const char* argv[]) { auto session = std::make_unique(m_io, m_logger, m_config, std::move(clientSocket)); - session->setTermpReqCb( + session->setTermReqCb( [this, sessionPtr = session.get()]() { auto iter = @@ -87,6 +87,31 @@ bool GatewayApp::start(int argc, const char* argv[]) m_sessions.erase(iter); }); + session->setClientIdReportCb( + [this, sessionPtr = session.get()](const std::string& clientId) + { + auto iter = + std::find_if( + m_sessions.begin(), m_sessions.end(), + [sessionPtr, &clientId](auto& s) + { + assert(s); + return (s.get() != sessionPtr) && (clientId == s->clientId()); + }); + + if (iter == m_sessions.end()) { + return; + } + + boost::asio::post( + m_io, + [this, iter]() + { + m_sessions.erase(iter); + }); + + }); + if (!session->start()) { m_logger.error() << "Failed to start session" << std::endl; return; diff --git a/gateway/app/gateway/GatewayIoClientAcceptor_Udp.cpp b/gateway/app/gateway/GatewayIoClientAcceptor_Udp.cpp index a2855512..ee0cb469 100644 --- a/gateway/app/gateway/GatewayIoClientAcceptor_Udp.cpp +++ b/gateway/app/gateway/GatewayIoClientAcceptor_Udp.cpp @@ -179,6 +179,7 @@ void GatewayIoClientAcceptor_Udp::doAccept() } // Received previous broadcast, ignoring... + logger().info() << "Ignoring received recently broadcast gateway data, not valid input" << std::endl; m_lastBroadcastData.clear(); doAccept(); return; diff --git a/gateway/app/gateway/GatewayIoClientSocket_Udp.cpp b/gateway/app/gateway/GatewayIoClientSocket_Udp.cpp index 48e7023f..ea137f1a 100644 --- a/gateway/app/gateway/GatewayIoClientSocket_Udp.cpp +++ b/gateway/app/gateway/GatewayIoClientSocket_Udp.cpp @@ -10,8 +10,17 @@ namespace cc_mqttsn_gateway_app { +GatewayIoClientSocket_Udp::GatewayIoClientSocket_Udp(boost::asio::io_context& io, GatewayLogger& loggerParam, const Endpoint& endpoint) : + Base(io, loggerParam), + m_endpoint(endpoint) +{ + logger().info() << "New UDP client connection from: " << m_endpoint << std::endl; +}; + GatewayIoClientSocket_Udp::~GatewayIoClientSocket_Udp() { + logger().info() << "Terminated UDP client connection from: " << m_endpoint << std::endl; + if (m_socketDeletedCb) { m_socketDeletedCb(m_endpoint); } diff --git a/gateway/app/gateway/GatewayIoClientSocket_Udp.h b/gateway/app/gateway/GatewayIoClientSocket_Udp.h index 9b650aca..fd9375f7 100644 --- a/gateway/app/gateway/GatewayIoClientSocket_Udp.h +++ b/gateway/app/gateway/GatewayIoClientSocket_Udp.h @@ -22,11 +22,7 @@ class GatewayIoClientSocket_Udp final : public GatewayIoClientSocket public: using Endpoint = boost::asio::ip::udp::endpoint; - explicit GatewayIoClientSocket_Udp(boost::asio::io_context& io, GatewayLogger& logger, const Endpoint& endpoint) : - Base(io, logger), - m_endpoint(endpoint) - { - }; + explicit GatewayIoClientSocket_Udp(boost::asio::io_context& io, GatewayLogger& loggerParam, const Endpoint& endpoint); virtual ~GatewayIoClientSocket_Udp(); diff --git a/gateway/app/gateway/GatewaySession.cpp b/gateway/app/gateway/GatewaySession.cpp index 5f7721d3..f1297471 100644 --- a/gateway/app/gateway/GatewaySession.cpp +++ b/gateway/app/gateway/GatewaySession.cpp @@ -27,6 +27,7 @@ GatewaySession::GatewaySession( m_logger(logger), m_config(config), m_timer(io), + m_reconnectTimer(io), m_clientSocket(std::move(clientSocket)), m_sessionPtr(std::make_unique()), m_session(m_sessionPtr.get()) @@ -42,10 +43,16 @@ GatewaySession::GatewaySession( m_logger(logger), m_config(config), m_timer(io), + m_reconnectTimer(io), m_session(session) { } +GatewaySession::~GatewaySession() +{ + m_logger.info() << "Terminating session for client: " << m_clientId << std::endl; +} + bool GatewaySession::start() { assert(m_termReqCb); @@ -125,6 +132,7 @@ void GatewaySession::doBrokerConnect() [this]() { m_session->setBrokerConnected(true); + m_brokerConnected = true; } ); @@ -135,6 +143,7 @@ void GatewaySession::doBrokerConnect() m_session->setBrokerConnected(false); } + m_brokerConnected = false; doBrokerReconnect(); }); @@ -147,10 +156,14 @@ void GatewaySession::doBrokerConnect() void GatewaySession::doBrokerReconnect() { - boost::asio::post( - m_io, - [this]() + m_reconnectTimer.expires_after(std::chrono::milliseconds(100)); + m_reconnectTimer.async_wait( + [this](const boost::system::error_code& ec) { + if (ec == boost::asio::error::operation_aborted) { + return; + } + doBrokerConnect(); }); } @@ -212,6 +225,12 @@ bool GatewaySession::startSession() m_session->setClientConnectedReportCb( [this](const std::string& clientId) { + m_logger.info() << "Connected client: " << clientId << std::endl; + + m_clientId = clientId; + assert(m_clientIdReportCb); + m_clientIdReportCb(clientId); + auto& predefinedTopics = m_config.predefinedTopics(); auto applyForClient = diff --git a/gateway/app/gateway/GatewaySession.h b/gateway/app/gateway/GatewaySession.h index df49b1ae..cf4a3daf 100644 --- a/gateway/app/gateway/GatewaySession.h +++ b/gateway/app/gateway/GatewaySession.h @@ -44,15 +44,29 @@ class GatewaySession const cc_mqttsn_gateway::Config& config, cc_mqttsn_gateway::Session* session); + ~GatewaySession(); + bool start(); - using TermpReqCb = std::function; + const std::string& clientId() const + { + return m_clientId; + } + + using TermReqCb = std::function; template - void setTermpReqCb(TFunc&& func) + void setTermReqCb(TFunc&& func) { m_termReqCb = std::forward(func); } + using ClientIdReportCb = std::function; + template + void setClientIdReportCb(TFunc&& func) + { + m_clientIdReportCb = std::forward(func); + } + private: using Timer = boost::asio::steady_timer; using TimestampClock = std::chrono::steady_clock; @@ -70,11 +84,14 @@ class GatewaySession GatewayLogger& m_logger; const cc_mqttsn_gateway::Config& m_config; Timer m_timer; + Timer m_reconnectTimer; GatewayIoClientSocketPtr m_clientSocket; GatewayIoBrokerSocketPtr m_brokerSocket; std::unique_ptr m_sessionPtr; cc_mqttsn_gateway::Session* m_session = nullptr; - TermpReqCb m_termReqCb; + TermReqCb m_termReqCb; + ClientIdReportCb m_clientIdReportCb; + std::string m_clientId; DataBuf m_brokerData; Timestamp m_tickReqTs; std::list m_fwdEncSessions;