diff --git a/client/app/CMakeLists.txt b/client/app/CMakeLists.txt index f3fa8d32..4b1d2f7e 100644 --- a/client/app/CMakeLists.txt +++ b/client/app/CMakeLists.txt @@ -21,4 +21,4 @@ set (COMMON_APPS_LIB "cc_mqttsn_client_apps_lib") add_subdirectory (common) add_subdirectory (gw_discover) add_subdirectory (pub) -#add_subdirectory (sub) \ No newline at end of file +add_subdirectory (sub) \ No newline at end of file diff --git a/client/app/common/AppClient.cpp b/client/app/common/AppClient.cpp index 01f90bf7..5bde8db7 100644 --- a/client/app/common/AppClient.cpp +++ b/client/app/common/AppClient.cpp @@ -27,44 +27,34 @@ AppClient* asThis(void* data) return reinterpret_cast(data); } -// std::string toString(CC_MqttsnQoS val) -// { -// static const std::string Map[] = { -// /* CC_MqttsnQoS_AtMostOnceDelivery */ "QoS0 - At Most Once Delivery", -// /* CC_MqttsnQoS_AtLeastOnceDelivery */ "QoS1 - At Least Once Delivery", -// /* CC_MqttsnQoS_ExactlyOnceDelivery */ "QoS2 - Exactly Once Delivery", -// }; -// static constexpr std::size_t MapSize = std::extent::value; -// static_assert(MapSize == CC_MqttsnQoS_ValuesLimit); - -// auto idx = static_cast(val); -// if (MapSize <= idx) { -// assert(false); // Should not happen -// return std::to_string(val); -// } - -// return Map[idx] + " (" + std::to_string(val) + ')'; -// } - -// void printQos(const char* prefix, CC_MqttsnQoS val) -// { -// std::cout << "\t" << prefix << ": " << toString(val) << '\n'; -// } - -// void printBool(const char* prefix, bool val) -// { -// std::cout << '\t' << prefix << ": " << std::boolalpha << val << '\n'; -// } - -// void printConnectReturnCode(CC_MqttsnConnectReturnCode val) -// { -// std::cout << "\tReturn Code: " << AppClient::toString(val) << '\n'; -// } - -// void printSubscribeReturnCode(CC_MqttsnSubscribeReturnCode val) -// { -// std::cout << "\tReturn Code: " << AppClient::toString(val) << '\n'; -// } +std::string toString(CC_MqttsnQoS val) +{ + static const std::string Map[] = { + /* CC_MqttsnQoS_AtMostOnceDelivery */ "QoS0 - At Most Once Delivery", + /* CC_MqttsnQoS_AtLeastOnceDelivery */ "QoS1 - At Least Once Delivery", + /* CC_MqttsnQoS_ExactlyOnceDelivery */ "QoS2 - Exactly Once Delivery", + }; + static constexpr std::size_t MapSize = std::extent::value; + static_assert(MapSize == CC_MqttsnQoS_ValuesLimit); + + auto idx = static_cast(val); + if (MapSize <= idx) { + assert(false); // Should not happen + return std::to_string(val); + } + + return Map[idx] + " (" + std::to_string(val) + ')'; +} + +void printQos(const char* prefix, CC_MqttsnQoS val) +{ + std::cout << "\t" << prefix << ": " << toString(val) << '\n'; +} + +void printBool(const char* prefix, bool val) +{ + std::cout << '\t' << prefix << ": " << std::boolalpha << val << '\n'; +} } // namespace @@ -128,52 +118,52 @@ std::string AppClient::toString(CC_MqttsnErrorCode val) return Map[idx] + " (" + std::to_string(val) + ')'; } -// std::string AppClient::toString(const std::uint8_t* data, unsigned dataLen, bool forceBinary) -// { -// bool binary = forceBinary; -// if (!binary) { -// binary = -// std::any_of( -// data, data + dataLen, -// [](auto byte) -// { -// if (std::isprint(static_cast(byte)) == 0) { -// return true; -// } - -// if (byte > 0x7e) { -// return true; -// } - -// return false; -// }); -// } - -// if (!binary) { -// return std::string(reinterpret_cast(data), dataLen); -// } - -// std::stringstream stream; -// stream << std::hex; -// for (auto idx = 0U; idx < dataLen; ++idx) { -// stream << std::setw(2) << std::setfill('0') << static_cast(data[idx]) << ' '; -// } -// return stream.str(); -// } - -// void AppClient::print(const CC_MqttsnMessageInfo& info, bool printMessage) -// { -// std::cout << "[INFO]: Message Info:\n"; -// if (printMessage) { -// std::cout << -// "\tTopic: " << info.m_topic << '\n' << -// "\tData: " << toString(info.m_data, info.m_dataLen, m_opts.subBinary()) << '\n'; -// } - -// printQos("QoS", info.m_qos); -// printBool("Retained", info.m_retained); -// std::cout << std::endl; -// } +std::string AppClient::toString(const std::uint8_t* data, unsigned dataLen, bool forceBinary) +{ + bool binary = forceBinary; + if (!binary) { + binary = + std::any_of( + data, data + dataLen, + [](auto byte) + { + if (std::isprint(static_cast(byte)) == 0) { + return true; + } + + if (byte > 0x7e) { + return true; + } + + return false; + }); + } + + if (!binary) { + return std::string(reinterpret_cast(data), dataLen); + } + + std::stringstream stream; + stream << std::hex; + for (auto idx = 0U; idx < dataLen; ++idx) { + stream << std::setw(2) << std::setfill('0') << static_cast(data[idx]) << ' '; + } + return stream.str(); +} + +void AppClient::print(const CC_MqttsnMessageInfo& info, bool printMessage) +{ + std::cout << "[INFO]: Message Info:\n"; + if (printMessage) { + std::cout << + "\tTopic: " << info.m_topic << '\n' << + "\tData: " << toString(info.m_data, info.m_dataLen, m_opts.subBinary()) << '\n'; + } + + printQos("QoS", info.m_qos); + printBool("Retained", info.m_retained); + std::cout << std::endl; +} AppClient::AppClient(boost::asio::io_context& io, int& result) : m_io(io), @@ -191,12 +181,12 @@ AppClient::AppClient(boost::asio::io_context& io, int& result) : std::ostream& AppClient::logError() { - return std::cerr << "ERROR: "; + return std::cerr << "[ERROR] "; } std::ostream& AppClient::logInfo() { - return std::cout << "INFO: "; + return std::cout << "[INFO] "; } void AppClient::doTerminate(int result) @@ -254,6 +244,21 @@ bool AppClient::doConnect() return true; } +bool AppClient::doDisconnect() +{ + if (m_opts.verbose()) { + logInfo() << "Attempting disconnection" << std::endl; + } + + auto ec = cc_mqttsn_client_disconnect(m_client.get(), &AppClient::disconnectCompleteCb, this); + if (ec != CC_MqttsnErrorCode_Success) { + logError() << "Failed to initiate disconnection from the gateway" << std::endl; + return false; + } + + return true; +} + bool AppClient::startImpl() { return true; @@ -268,6 +273,11 @@ void AppClient::connectCompleteImpl() { } +void AppClient::disconnectCompleteImpl() +{ + doComplete(); +} + std::vector AppClient::parseBinaryData(const std::string& val) { std::vector result; @@ -467,6 +477,21 @@ void AppClient::connectCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_ connectCompleteImpl(); } +void AppClient::disconnectCompleteInternal(CC_MqttsnAsyncOpStatus status) +{ + if (status != CC_MqttsnAsyncOpStatus_Complete) { + logError() << "Failed to disconnect with status: " << toString(status) << std::endl; + doTerminate(); + return; + } + + if (m_opts.verbose()) { + logInfo() << "Disconnected" << std::endl; + } + + disconnectCompleteImpl(); +} + void AppClient::sendDataCb(void* data, const unsigned char* buf, unsigned bufLen, unsigned broadcastRadius) { asThis(data)->sendDataInternal(buf, bufLen, broadcastRadius); @@ -497,4 +522,9 @@ void AppClient::connectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status, con return asThis(data)->connectCompleteInternal(status, info); } +void AppClient::disconnectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status) +{ + return asThis(data)->disconnectCompleteInternal(status); +} + } // namespace cc_mqttsn_client_app diff --git a/client/app/common/AppClient.h b/client/app/common/AppClient.h index 01e6b80c..ccea178c 100644 --- a/client/app/common/AppClient.h +++ b/client/app/common/AppClient.h @@ -35,7 +35,8 @@ class AppClient static std::string toString(CC_MqttsnErrorCode val); //static std::string toString(CC_MqttsnAsyncOpStatus val); - //void print(const CC_MqttsnMessageInfo& info, bool printMessage = true); + static std::string toString(const std::uint8_t* data, unsigned dataLen, bool forceBinary = false); + void print(const CC_MqttsnMessageInfo& info, bool printMessage = true); protected: using Timer = boost::asio::steady_timer; @@ -65,10 +66,12 @@ class AppClient void doTerminate(int result = 1); void doComplete(); bool doConnect(); + bool doDisconnect(); virtual bool startImpl(); virtual void messageReceivedImpl(const CC_MqttsnMessageInfo* info); virtual void connectCompleteImpl(); + virtual void disconnectCompleteImpl(); static std::vector parseBinaryData(const std::string& val); @@ -90,6 +93,7 @@ class AppClient void sendDataInternal(const unsigned char* buf, unsigned bufLen, unsigned broadcastRadius); bool createSession(); void connectCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info); + void disconnectCompleteInternal(CC_MqttsnAsyncOpStatus status); static void sendDataCb(void* data, const unsigned char* buf, unsigned bufLen, unsigned broadcastRadius); static void messageReceivedCb(void* data, const CC_MqttsnMessageInfo* info); @@ -97,6 +101,7 @@ class AppClient static void nextTickProgramCb(void* data, unsigned duration); static unsigned cancelNextTickWaitCb(void* data); static void connectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info); + static void disconnectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status); boost::asio::io_context& m_io; int& m_result; diff --git a/client/app/common/ProgramOptions.cpp b/client/app/common/ProgramOptions.cpp index b4437b1b..d35e8b4c 100644 --- a/client/app/common/ProgramOptions.cpp +++ b/client/app/common/ProgramOptions.cpp @@ -91,6 +91,21 @@ void ProgramOptions::addPublish() ("pub-message,m", po::value()->default_value(std::string()), "Publish message data, use \"\\x\" prefix before hex value of each byte for binary string") ("pub-qos,q", po::value()->default_value(0U), "Publish QoS value") ("pub-retain", "Publish retained message") + ("pub-no-disconnect", "Do not gracefuly disconnect when publish is complete") + ; + + m_desc.add(opts); +} + +void ProgramOptions::addSubscribe() +{ + po::options_description opts("Subscribe Options"); + opts.add_options() + ("sub-topic,t", po::value>(), "Subscribe to topic, can be used multiple times.") + ("sub-topic-id,i", po::value>(), "Subscribe topic id, can be used multiple times") + ("sub-qos,q", po::value()->default_value(2U), "Subscribe max QoS value") + ("sub-no-retained", "Ignore retained messages") + ("sub-binary", "Force binary output of the received message data") ; m_desc.add(opts); @@ -210,4 +225,44 @@ bool ProgramOptions::pubRetain() const return m_vm.count("pub-retain") > 0U; } +bool ProgramOptions::pubNoDisconnect() const +{ + return m_vm.count("pub-no-disconnect") > 0U; +} + +std::vector ProgramOptions::subTopics() const +{ + std::vector result; + if (m_vm.count("sub-topic") > 0U) { + result = m_vm["sub-topic"].as>(); + } + + return result; +} + +std::vector ProgramOptions::subTopicIds() const +{ + std::vector result; + if (m_vm.count("sub-topic-id") > 0U) { + result = m_vm["sub-topic-id"].as>(); + } + + return result; +} + +unsigned ProgramOptions::subQos() const +{ + return m_vm["sub-qos"].as(); +} + +bool ProgramOptions::subNoRetained() const +{ + return m_vm.count("sub-no-retained") > 0U; +} + +bool ProgramOptions::subBinary() const +{ + return m_vm.count("sub-binary") > 0U; +} + } // namespace cc_mqttsn_client_app diff --git a/client/app/common/ProgramOptions.h b/client/app/common/ProgramOptions.h index abd572c1..b924298e 100644 --- a/client/app/common/ProgramOptions.h +++ b/client/app/common/ProgramOptions.h @@ -33,6 +33,7 @@ class ProgramOptions void addConnect(); void addWill(); void addPublish(); + void addSubscribe(); void printHelp(); @@ -68,7 +69,15 @@ class ProgramOptions std::uint16_t pubTopicId() const; std::string pubMessage() const; unsigned pubQos() const; - bool pubRetain() const; + bool pubRetain() const; + bool pubNoDisconnect() const; + + // Subscribe Options + std::vector subTopics() const; + std::vector subTopicIds() const; + unsigned subQos() const; + bool subNoRetained() const; + bool subBinary() const; private: boost::program_options::variables_map m_vm; diff --git a/client/app/pub/Pub.cpp b/client/app/pub/Pub.cpp index bc4703f9..c06039bd 100644 --- a/client/app/pub/Pub.cpp +++ b/client/app/pub/Pub.cpp @@ -98,7 +98,12 @@ void Pub::publishCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_Mqttsn logInfo() << "Publish complete" << std::endl; } - doComplete(); + if (opts().pubNoDisconnect()) { + doComplete(); + return; + } + + doDisconnect(); } void Pub::publishCompleteCb( diff --git a/client/app/sub/CMakeLists.txt b/client/app/sub/CMakeLists.txt new file mode 100644 index 00000000..465d515d --- /dev/null +++ b/client/app/sub/CMakeLists.txt @@ -0,0 +1,13 @@ +set (name "cc_mqttsn_client_sub") +set (src + main.cpp + Sub.cpp +) + +add_executable(${name} ${src}) +target_link_libraries(${name} ${COMMON_APPS_LIB}) + +install ( + TARGETS ${name} + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} +) \ No newline at end of file diff --git a/client/app/sub/Sub.cpp b/client/app/sub/Sub.cpp new file mode 100644 index 00000000..842045d8 --- /dev/null +++ b/client/app/sub/Sub.cpp @@ -0,0 +1,142 @@ +// +// Copyright 2024 - 2024 (C). Alex Robenko. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include "Sub.h" + +#include +#include +#include +#include + +namespace cc_mqttsn_client_app +{ + +namespace +{ + +Sub* asThis(void* data) +{ + return reinterpret_cast(data); +} + +} // namespace + + +Sub::Sub(boost::asio::io_context& io, int& result) : + Base(io, result) +{ + opts().addCommon(); + opts().addNetwork(); + opts().addConnect(); + opts().addWill(); + opts().addSubscribe(); +} + +bool Sub::startImpl() +{ + return doConnect(); +} + +void Sub::messageReceivedImpl(const CC_MqttsnMessageInfo* info) +{ + assert(info != nullptr); + assert(info->m_topic != nullptr); + if (info->m_retained && opts().subNoRetained()) { + return; + } + + if (opts().verbose()) { + print(*info); + } + else { + std::cout << info->m_topic << ": " << toString(info->m_data, info->m_dataLen, opts().subBinary()) << std::endl; + } +} + +void Sub::connectCompleteImpl() +{ + auto topics = opts().subTopics(); + auto topicIds = opts().subTopicIds(); + + if (topics.empty() && topicIds.empty()) { + logInfo() << "Not subscription topics provided"; + return; + } + + if (opts().verbose()) { + logInfo() << "Subscribing..." << std::endl; + } + + auto doSubscribe = + [this](const std::string& topic, std::uint16_t topicId) + { + auto config = CC_MqttsnSubscribeConfig(); + cc_mqttsn_client_subscribe_init_config(&config); + + if (!topic.empty()) { + config.m_topic = topic.c_str(); + } + else { + config.m_topicId = topicId; + } + + config.m_qos = static_cast(opts().subQos()); + + auto ec = cc_mqttsn_client_subscribe(client(), &config, &Sub::subscribeCompleteCb, this); + if (ec != CC_MqttsnErrorCode_Success) { + if (!topic.empty()) { + logError() << "Failed to initiate subscribe subscribe to topic \"" << topic << "\" with error: " << toString(ec) << std::endl; + } + else { + logError() << "Failed to initiate subscribe subscribe to topic ID " << topicId << " with error: " << toString(ec) << std::endl; + } + return; + } + + ++m_subCount; + }; + + for (auto& t : topics) { + doSubscribe(t, 0U); + } + + for (auto id : topicIds) { + doSubscribe(std::string(), id); + } +} + +void Sub::subscribeCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_MqttsnSubscribeInfo* info) +{ + do { + if (status != CC_MqttsnAsyncOpStatus_Complete) { + logError() << "Subscribe failed with status: " << toString(status) << std::endl; + break; + } + + if ((info != nullptr) && (info->m_returnCode != CC_MqttsnReturnCode_Accepted)) { + logError() << "Subscribe rejected with return code: " << toString(info->m_returnCode) << std::endl; + break; + } + + } while (false); + + --m_subCount; + if ((m_subCount == 0) && opts().verbose()) { + logInfo() << "Subscription complete, waiting for messages" << std::endl; + } +} + +void Sub::subscribeCompleteCb( + void* data, + [[maybe_unused]] CC_MqttsnSubscribeHandle handle, + CC_MqttsnAsyncOpStatus status, + const CC_MqttsnSubscribeInfo* info) +{ + asThis(data)->subscribeCompleteInternal(status, info); +} + +} // namespace cc_mqttsn_client_app diff --git a/client/app/sub/Sub.h b/client/app/sub/Sub.h new file mode 100644 index 00000000..eb522ee4 --- /dev/null +++ b/client/app/sub/Sub.h @@ -0,0 +1,36 @@ +// +// Copyright 2024 - 2024 (C). Alex Robenko. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#pragma once + +#include "AppClient.h" +#include "ProgramOptions.h" + +#include + +namespace cc_mqttsn_client_app +{ + +class Sub : public AppClient +{ + using Base = AppClient; +public: + Sub(boost::asio::io_context& io, int& result); + +protected: + virtual bool startImpl() override; + virtual void messageReceivedImpl(const CC_MqttsnMessageInfo* info) override; + virtual void connectCompleteImpl() override; + +private: + void subscribeCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_MqttsnSubscribeInfo* info); + static void subscribeCompleteCb(void* data, CC_MqttsnSubscribeHandle handle, CC_MqttsnAsyncOpStatus status, const CC_MqttsnSubscribeInfo* info); + + unsigned m_subCount = 0U; +}; + +} // namespace cc_mqttsn_client_app diff --git a/client/app/sub/main.cpp b/client/app/sub/main.cpp new file mode 100644 index 00000000..d90f1b4c --- /dev/null +++ b/client/app/sub/main.cpp @@ -0,0 +1,51 @@ + +#include "Sub.h" + +#include + +#include +#include +#include + +int main(int argc, const char* argv[]) +{ + int result = 0U; + try { + boost::asio::io_context io; + + boost::asio::signal_set signals(io, SIGINT, SIGTERM); + signals.async_wait( + [&io, &result](const boost::system::error_code& ec, int sigNum) + { + if (ec == boost::asio::error::operation_aborted) { + return; + } + + if (ec) { + std::cerr << "ERROR: Unexpected error in signal handling: " << ec.message() << std::endl; + result = 150; + io.stop(); + return; + } + + std::cerr << "Terminated with signal " << sigNum << std::endl; + result = 100; + io.stop(); + }); + + cc_mqttsn_client_app::Sub app(io, result); + + if (!app.start(argc, argv)) { + return -1; + } + + io.run(); + } + catch (const std::exception& ec) + { + std::cerr << "ERROR: Unexpected exception: " << ec.what() << std::endl; + result = 200; + } + + return result; +} \ No newline at end of file diff --git a/client/lib/include/cc_mqttsn_client/common.h b/client/lib/include/cc_mqttsn_client/common.h index 62745fcc..7b994afe 100644 --- a/client/lib/include/cc_mqttsn_client/common.h +++ b/client/lib/include/cc_mqttsn_client/common.h @@ -43,7 +43,8 @@ typedef enum { CC_MqttsnQoS_AtMostOnceDelivery = 0, ///< QoS=0. At most once delivery. CC_MqttsnQoS_AtLeastOnceDelivery = 1, ///< QoS=1. At least once delivery. - CC_MqttsnQoS_ExactlyOnceDelivery = 2 ///< QoS=2. Exactly once delivery. + CC_MqttsnQoS_ExactlyOnceDelivery = 2, ///< QoS=2. Exactly once delivery. + CC_MqttsnQoS_ValuesLimit ///< Limit for the values } CC_MqttsnQoS; /// @brief Error code returned by various API functions. diff --git a/client/lib/src/op/SendOp.cpp b/client/lib/src/op/SendOp.cpp index 043316d1..610b3717 100644 --- a/client/lib/src/op/SendOp.cpp +++ b/client/lib/src/op/SendOp.cpp @@ -298,7 +298,13 @@ void SendOp::handle(PubackMsg& msg) } using TopicIdType = PublishMsg::Field_flags::Field_topicIdType::ValueType; - if (m_publishMsg.field_flags().field_topicIdType().value() != TopicIdType::Normal) { + auto topicIdType = m_publishMsg.field_flags().field_topicIdType().value(); + + if (topicIdType == TopicIdType::PredefinedTopicId) { + break; + } + + if (topicIdType != TopicIdType::Normal) { errorLog("Unexpected return code for the publish"); break; } diff --git a/client/lib/test/bm/UnitTestBmConnect.th b/client/lib/test/bm/UnitTestBmConnect.th index 08cc13eb..e5fcfd6a 100644 --- a/client/lib/test/bm/UnitTestBmConnect.th +++ b/client/lib/test/bm/UnitTestBmConnect.th @@ -27,6 +27,6 @@ void UnitTestBmConnect::test1() auto* client = clientPtr.get(); TS_ASSERT_DIFFERS(client, nullptr); - unitTestDoConnectBasic(client, __FUNCTION__); + unitTestDoConnectBasic(client, "test1"); } diff --git a/client/lib/test/bm/UnitTestBmPublish.th b/client/lib/test/bm/UnitTestBmPublish.th index a3076169..3b6a81b9 100644 --- a/client/lib/test/bm/UnitTestBmPublish.th +++ b/client/lib/test/bm/UnitTestBmPublish.th @@ -27,7 +27,7 @@ void UnitTestBmPublish::test1() auto* client = clientPtr.get(); TS_ASSERT_DIFFERS(client, nullptr); - unitTestDoConnectBasic(client, __FUNCTION__); + unitTestDoConnectBasic(client, "test1"); const CC_MqttsnTopicId TopicId = 123; const UnitTestData Data = { 0x1, 0x2, 0x3, 0x4, 0x5};