diff --git a/.github/workflows/cppcheck.yml b/.github/workflows/cppcheck.yml index b80aa1c..c1b95ee 100644 --- a/.github/workflows/cppcheck.yml +++ b/.github/workflows/cppcheck.yml @@ -19,4 +19,4 @@ jobs: pip install platformio - name: Cppcheck run: | - pio check --fail-on-defect=medium --fail-on-defect=high --flags "--enable=warning --enable=style --enable=performance --suppress=unusedFunction --suppress=preprocessorErrorDirective" --skip-packages \ No newline at end of file + pio check --fail-on-defect=medium --fail-on-defect=high --flags "--enable=warning --enable=style --enable=performance --suppress=unusedFunction --suppress=preprocessorErrorDirective" --skip-packages diff --git a/docs/index.md b/docs/index.md index daa3d48..152690a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -398,6 +398,10 @@ The (maximum) length of the client ID. (Keep in mind that this is a c-string. Yo Only used on ESP32. Sets the stack size (in words) of the MQTT client worker task. +### EMC_MULTIPLE_CALLBACKS + +This macro is by default not enabled so you can add a single callbacks to an event. Assigning a second will overwrite the existing callback. When enabling multiple callbacks, multiple callbacks (with uint32_t id) can be assigned. Removing is done by referencing the id. + ### EMC_USE_WATCHDOG 0 (ESP32 only) diff --git a/platformio.ini b/platformio.ini index 43e3953..ccfac9c 100644 --- a/platformio.ini +++ b/platformio.ini @@ -29,5 +29,6 @@ build_flags = --coverage -D EMC_RX_BUFFER_SIZE=100 -D EMC_TX_BUFFER_SIZE=10 + -D EMC_MULTIPLE_CALLBACKS=1 ;extra_scripts = test-coverage.py build_type = debug diff --git a/src/Config.h b/src/Config.h index e6e35d9..9c60911 100644 --- a/src/Config.h +++ b/src/Config.h @@ -53,6 +53,10 @@ the LICENSE file. #define EMC_TASK_STACK_SIZE 5120 #endif +#ifndef EMC_MULTIPLE_CALLBACKS +#define EMC_MULTIPLE_CALLBACKS 0 +#endif + #ifndef EMC_USE_WATCHDOG #define EMC_USE_WATCHDOG 0 #endif diff --git a/src/MqttClientSetup.h b/src/MqttClientSetup.h index 73458d4..67f46a0 100644 --- a/src/MqttClientSetup.h +++ b/src/MqttClientSetup.h @@ -11,6 +11,11 @@ the LICENSE file. #pragma once +#if EMC_MULTIPLE_CALLBACKS +#include +#include +#endif + #include "MqttClient.h" template @@ -73,35 +78,127 @@ class MqttClientSetup : public MqttClient { return static_cast(*this); } - T& onConnect(espMqttClientTypes::OnConnectCallback callback) { + T& onConnect(espMqttClientTypes::OnConnectCallback callback, uint32_t id = 0) { + #if EMC_MULTIPLE_CALLBACKS + _onConnectCallbacks.emplace_back(callback, id); + #else + (void) id; _onConnectCallback = callback; + #endif return static_cast(*this); } - T& onDisconnect(espMqttClientTypes::OnDisconnectCallback callback) { + T& onDisconnect(espMqttClientTypes::OnDisconnectCallback callback, uint32_t id = 0) { + #if EMC_MULTIPLE_CALLBACKS + _onDisconnectCallbacks.emplace_back(callback, id); + #else + (void) id; _onDisconnectCallback = callback; + #endif return static_cast(*this); } - T& onSubscribe(espMqttClientTypes::OnSubscribeCallback callback) { + T& onSubscribe(espMqttClientTypes::OnSubscribeCallback callback, uint32_t id = 0) { + #if EMC_MULTIPLE_CALLBACKS + _onSubscribeCallbacks.emplace_back(callback, id); + #else + (void) id; _onSubscribeCallback = callback; + #endif return static_cast(*this); } - T& onUnsubscribe(espMqttClientTypes::OnUnsubscribeCallback callback) { + T& onUnsubscribe(espMqttClientTypes::OnUnsubscribeCallback callback, uint32_t id = 0) { + #if EMC_MULTIPLE_CALLBACKS + _onUnsubscribeCallbacks.emplace_back(callback, id); + #else + (void) id; _onUnsubscribeCallback = callback; + #endif return static_cast(*this); } - T& onMessage(espMqttClientTypes::OnMessageCallback callback) { + T& onMessage(espMqttClientTypes::OnMessageCallback callback, uint32_t id = 0) { + #if EMC_MULTIPLE_CALLBACKS + _onMessageCallbacks.emplace_back(callback, id); + #else + (void) id; _onMessageCallback = callback; + #endif return static_cast(*this); } - T& onPublish(espMqttClientTypes::OnPublishCallback callback) { + T& onPublish(espMqttClientTypes::OnPublishCallback callback, uint32_t id = 0) { + #if EMC_MULTIPLE_CALLBACKS + _onPublishCallbacks.emplace_back(callback, id); + #else + (void) id; _onPublishCallback = callback; + #endif + return static_cast(*this); + } + + #if EMC_MULTIPLE_CALLBACKS + T& removeOnConnect(uint32_t id) { + for (auto it = _onConnectCallbacks.begin(); it != _onConnectCallbacks.end(); ++it) { + if (it->second == id) { + _onConnectCallbacks.erase(it); + break; + } + } + return static_cast(*this); + } + + T& removeOnDisconnect(uint32_t id) { + for (auto it = _onDisconnectCallbacks.begin(); it != _onDisconnectCallbacks.end(); ++it) { + if (it->second == id) { + _onDisconnectCallbacks.erase(it); + break; + } + } + return static_cast(*this); + } + + T& removeOnSubscribe(uint32_t id) { + for (auto it = _onSubscribeCallbacks.begin(); it != _onSubscribeCallbacks.end(); ++it) { + if (it->second == id) { + _onSubscribeCallbacks.erase(it); + break; + } + } + return static_cast(*this); + } + + T& removeOnUnsubscribe(uint32_t id) { + for (auto it = _onUnsubscribeCallbacks.begin(); it != _onUnsubscribeCallbacks.end(); ++it) { + if (it->second == id) { + _onUnsubscribeCallbacks.erase(it); + break; + } + } + return static_cast(*this); + } + + T& removeOnMessage(uint32_t id) { + for (auto it = _onMessageCallbacks.begin(); it != _onMessageCallbacks.end(); ++it) { + if (it->second == id) { + _onMessageCallbacks.erase(it); + break; + } + } + return static_cast(*this); + } + + T& removeOnPublish(uint32_t id) { + for (auto it = _onPublishCallbacks.begin(); it != _onPublishCallbacks.end(); ++it) { + if (it->second == id) { + _onPublishCallbacks.erase(it); + break; + } + } return static_cast(*this); } + #endif /* T& onError(espMqttClientTypes::OnErrorCallback callback) { @@ -112,5 +209,37 @@ class MqttClientSetup : public MqttClient { protected: explicit MqttClientSetup(espMqttClientTypes::UseInternalTask useInternalTask, uint8_t priority = 1, uint8_t core = 1) - : MqttClient(useInternalTask, priority, core) {} + : MqttClient(useInternalTask, priority, core) { + #if EMC_MULTIPLE_CALLBACKS + _onConnectCallback = [this](bool sessionPresent) { + for (auto callback : _onConnectCallbacks) if (callback.first) callback.first(sessionPresent); + }; + _onDisconnectCallback = [this](espMqttClientTypes::DisconnectReason reason) { + for (auto callback : _onDisconnectCallbacks) if (callback.first) callback.first(reason); + }; + _onSubscribeCallback = [this](uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* returncodes, size_t len) { + for (auto callback : _onSubscribeCallbacks) if (callback.first) callback.first(packetId, returncodes, len); + }; + _onUnsubscribeCallback = [this](int16_t packetId) { + for (auto callback : _onUnsubscribeCallbacks) if (callback.first) callback.first(packetId); + }; + _onMessageCallback = [this](const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + for (auto callback : _onMessageCallbacks) if (callback.first) callback.first(properties, topic, payload, len, index, total); + }; + _onPublishCallback = [this](uint16_t packetId) { + for (auto callback : _onPublishCallbacks) if (callback.first) callback.first(packetId); + }; + #else + // empty + #endif + } + + #if EMC_MULTIPLE_CALLBACKS + std::list> _onConnectCallbacks; + std::list> _onDisconnectCallbacks; + std::list> _onSubscribeCallbacks; + std::list> _onUnsubscribeCallbacks; + std::list> _onMessageCallbacks; + std::list> _onPublishCallbacks; + #endif }; diff --git a/test/test_client_native/test_client_native.cpp b/test/test_client_native/test_client_native.cpp index 7afd7e1..db70a54 100644 --- a/test/test_client_native/test_client_native.cpp +++ b/test/test_client_native/test_client_native.cpp @@ -7,6 +7,12 @@ void setUp() {} void tearDown() {} espMqttClient mqttClient; +uint32_t onConnectCbId = 1; +uint32_t onDisconnectCbId = 2; +uint32_t onSubscribeCbId = 3; +uint32_t onUnsubscribeCbId = 4; +uint32_t onMessageCbId = 5; +uint32_t onPublishCbId = 6; std::atomic_bool exitProgram(false); std::thread t; @@ -30,7 +36,7 @@ void test_connect() { .onConnect([&](bool sessionPresent) mutable { sessionPresentTest = sessionPresent; onConnectCalledTest = true; - }); + }, onConnectCbId); mqttClient.connect(); uint32_t start = millis(); while (millis() - start < 2000) { @@ -44,7 +50,7 @@ void test_connect() { TEST_ASSERT_TRUE(onConnectCalledTest); TEST_ASSERT_FALSE(sessionPresentTest); - mqttClient.onConnect(nullptr); + mqttClient.removeOnConnect(onConnectCbId); } /* @@ -83,7 +89,7 @@ void test_subscribe() { if (len == 1 && returncodes[0] == espMqttClientTypes::SubscribeReturncode::QOS0) { subscribeTest = true; } - }); + }, onSubscribeCbId); mqttClient.subscribe("test/test", 0); uint32_t start = millis(); while (millis() - start < 2000) { @@ -96,7 +102,7 @@ void test_subscribe() { TEST_ASSERT_TRUE(mqttClient.connected()); TEST_ASSERT_TRUE(subscribeTest); - mqttClient.onSubscribe(nullptr); + mqttClient.removeOnSubscribe(onSubscribeCbId); } /* @@ -112,7 +118,7 @@ void test_publish() { mqttClient.onPublish([&](uint16_t packetId) mutable { (void) packetId; publishSendTest++; - }); + }, onPublishCbId); std::atomic publishReceiveTest(0); mqttClient.onMessage([&](const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) mutable { (void) properties; @@ -122,7 +128,7 @@ void test_publish() { (void) index; (void) total; publishReceiveTest++; - }); + }, onMessageCbId); uint16_t sendQos0Test = mqttClient.publish("test/test", 0, false, "test0"); uint16_t sendQos1Test = mqttClient.publish("test/test", 1, false, "test1"); uint16_t sendQos2Test = mqttClient.publish("test/test", 2, false, "test2"); @@ -138,8 +144,8 @@ void test_publish() { TEST_ASSERT_EQUAL_INT(2, publishSendTest); TEST_ASSERT_EQUAL_INT(3, publishReceiveTest); - mqttClient.onPublish(nullptr); - mqttClient.onMessage(nullptr); + mqttClient.removeOnPublish(onPublishCbId); + mqttClient.removeOnMessage(onMessageCbId); } void test_publish_empty() { @@ -147,7 +153,7 @@ void test_publish_empty() { mqttClient.onPublish([&](uint16_t packetId) mutable { (void) packetId; publishSendEmptyTest++; - }); + }, onPublishCbId); std::atomic publishReceiveEmptyTest(0); mqttClient.onMessage([&](const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) mutable { (void) properties; @@ -157,7 +163,7 @@ void test_publish_empty() { (void) index; (void) total; publishReceiveEmptyTest++; - }); + }, onMessageCbId); uint16_t sendQos0Test = mqttClient.publish("test/test", 0, false, nullptr, 0); uint16_t sendQos1Test = mqttClient.publish("test/test", 1, false, nullptr, 0); uint16_t sendQos2Test = mqttClient.publish("test/test", 2, false, nullptr, 0); @@ -173,8 +179,8 @@ void test_publish_empty() { TEST_ASSERT_EQUAL_INT(2, publishSendEmptyTest); TEST_ASSERT_EQUAL_INT(3, publishReceiveEmptyTest); - mqttClient.onPublish(nullptr); - mqttClient.onMessage(nullptr); + mqttClient.removeOnPublish(onPublishCbId); + mqttClient.removeOnMessage(onMessageCbId); } /* @@ -195,13 +201,13 @@ void test_receive1() { (void) index; (void) total; publishReceive1Test++; - }); + }, onMessageCbId); mqttClient.onSubscribe([&](uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* returncodes, size_t len) mutable { (void) packetId; if (len == 1 && returncodes[0] == espMqttClientTypes::SubscribeReturncode::QOS1) { mqttClient.publish("test/test", 1, false, ""); } - }); + }, onSubscribeCbId); mqttClient.subscribe("test/test", 1); uint32_t start = millis(); while (millis() - start < 6000) { @@ -211,8 +217,8 @@ void test_receive1() { TEST_ASSERT_TRUE(mqttClient.connected()); TEST_ASSERT_GREATER_THAN_INT(0, publishReceive1Test); - mqttClient.onMessage(nullptr); - mqttClient.onSubscribe(nullptr); + mqttClient.removeOnMessage(onMessageCbId); + mqttClient.removeOnSubscribe(onSubscribeCbId); } /* @@ -233,13 +239,13 @@ void test_receive2() { (void) index; (void) total; publishReceive2Test++; - }); + }, onMessageCbId); mqttClient.onSubscribe([&](uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* returncodes, size_t len) mutable { (void) packetId; if (len == 1 && returncodes[0] == espMqttClientTypes::SubscribeReturncode::QOS2) { mqttClient.publish("test/test", 2, false, ""); } - }); + }, onSubscribeCbId); mqttClient.subscribe("test/test", 2); uint32_t start = millis(); while (millis() - start < 6000) { @@ -249,8 +255,8 @@ void test_receive2() { TEST_ASSERT_TRUE(mqttClient.connected()); TEST_ASSERT_EQUAL_INT(1, publishReceive2Test); - mqttClient.onMessage(nullptr); - mqttClient.onSubscribe(nullptr); + mqttClient.removeOnMessage(onMessageCbId); + mqttClient.removeOnSubscribe(onSubscribeCbId); } @@ -265,7 +271,7 @@ void test_unsubscribe() { mqttClient.onUnsubscribe([&](uint16_t packetId) mutable { (void) packetId; unsubscribeTest = true; - }); + }, onUnsubscribeCbId); mqttClient.unsubscribe("test/test"); uint32_t start = millis(); while (millis() - start < 2000) { @@ -278,7 +284,7 @@ void test_unsubscribe() { TEST_ASSERT_TRUE(mqttClient.connected()); TEST_ASSERT_TRUE(unsubscribeTest); - mqttClient.onUnsubscribe(nullptr); + mqttClient.removeOnUnsubscribe(onUnsubscribeCbId); } /* @@ -293,7 +299,7 @@ void test_disconnect() { mqttClient.onDisconnect([&](espMqttClientTypes::DisconnectReason reason) mutable { reasonTest = reason; onDisconnectCalled = true; - }); + }, onDisconnectCbId); mqttClient.disconnect(); uint32_t start = millis(); while (millis() - start < 2000) { @@ -307,7 +313,7 @@ void test_disconnect() { TEST_ASSERT_EQUAL_UINT8(espMqttClientTypes::DisconnectReason::USER_OK, reasonTest); TEST_ASSERT_TRUE(mqttClient.disconnected()); - mqttClient.onDisconnect(nullptr); + mqttClient.removeOnDisconnect(onDisconnectCbId); } void test_pub_before_connect() { @@ -320,11 +326,11 @@ void test_pub_before_connect() { .onConnect([&](bool sessionPresent) mutable { sessionPresentTest = sessionPresent; onConnectCalledTest = true; - }) + }, onConnectCbId) .onPublish([&](uint16_t packetId) mutable { (void) packetId; publishSendTest++; - }); + }, onPublishCbId); uint16_t sendQos0Test = mqttClient.publish("test/test", 0, false, "test0"); uint16_t sendQos1Test = mqttClient.publish("test/test", 1, false, "test1"); uint16_t sendQos2Test = mqttClient.publish("test/test", 2, false, "test2"); @@ -349,8 +355,8 @@ void test_pub_before_connect() { TEST_ASSERT_GREATER_THAN_UINT16(0, sendQos2Test); TEST_ASSERT_EQUAL_INT(2, publishSendTest); - mqttClient.onConnect(nullptr); - mqttClient.onPublish(nullptr); + mqttClient.removeOnConnect(onConnectCbId); + mqttClient.removeOnPublish(onPublishCbId); } void final_disconnect() { @@ -358,7 +364,7 @@ void final_disconnect() { mqttClient.onDisconnect([&](espMqttClientTypes::DisconnectReason reason) mutable { (void) reason; onDisconnectCalled = true; - }); + }, onDisconnectCbId); mqttClient.disconnect(); uint32_t start = millis(); while (millis() - start < 2000) { @@ -370,7 +376,7 @@ void final_disconnect() { if (mqttClient.connected()) { mqttClient.disconnect(true); } - mqttClient.onDisconnect(nullptr); + mqttClient.removeOnDisconnect(onDisconnectCbId); } int main() {