Skip to content

Commit

Permalink
Merge branch 'multiple_callbacks' into main
Browse files Browse the repository at this point in the history
Signed-off-by: Bert Melis <[email protected]>
  • Loading branch information
bertmelis authored Nov 16, 2023
2 parents 0edc772 + 42be3b5 commit a4a390f
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cppcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ jobs:
pip install platformio
- name: Cppcheck
run: |
pio check --fail-on-defect=medium --fail-on-defect=high --flags "--enable=warning --enable=style --enable=performance --suppress=unusedFunction --suppress=preprocessorErrorDirective" --skip-packages
pio check --fail-on-defect=medium --fail-on-defect=high --flags "--enable=warning --enable=style --enable=performance --suppress=unusedFunction --suppress=preprocessorErrorDirective" --skip-packages
4 changes: 4 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ The (maximum) length of the client ID. (Keep in mind that this is a c-string. Yo

Only used on ESP32. Sets the stack size (in words) of the MQTT client worker task.

### EMC_MULTIPLE_CALLBACKS

This macro is by default not enabled so you can add a single callbacks to an event. Assigning a second will overwrite the existing callback. When enabling multiple callbacks, multiple callbacks (with uint32_t id) can be assigned. Removing is done by referencing the id.

### EMC_USE_WATCHDOG 0

(ESP32 only)
Expand Down
1 change: 1 addition & 0 deletions platformio.ini
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ build_flags =
--coverage
-D EMC_RX_BUFFER_SIZE=100
-D EMC_TX_BUFFER_SIZE=10
-D EMC_MULTIPLE_CALLBACKS=1
;extra_scripts = test-coverage.py
build_type = debug
4 changes: 4 additions & 0 deletions src/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ the LICENSE file.
#define EMC_TASK_STACK_SIZE 5120
#endif

#ifndef EMC_MULTIPLE_CALLBACKS
#define EMC_MULTIPLE_CALLBACKS 0
#endif

#ifndef EMC_USE_WATCHDOG
#define EMC_USE_WATCHDOG 0
#endif
143 changes: 136 additions & 7 deletions src/MqttClientSetup.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ the LICENSE file.

#pragma once

#if EMC_MULTIPLE_CALLBACKS
#include <list>
#include <utility>
#endif

#include "MqttClient.h"

template <typename T>
Expand Down Expand Up @@ -73,35 +78,127 @@ class MqttClientSetup : public MqttClient {
return static_cast<T&>(*this);
}

T& onConnect(espMqttClientTypes::OnConnectCallback callback) {
T& onConnect(espMqttClientTypes::OnConnectCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onConnectCallbacks.emplace_back(callback, id);
#else
(void) id;
_onConnectCallback = callback;
#endif
return static_cast<T&>(*this);
}

T& onDisconnect(espMqttClientTypes::OnDisconnectCallback callback) {
T& onDisconnect(espMqttClientTypes::OnDisconnectCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onDisconnectCallbacks.emplace_back(callback, id);
#else
(void) id;
_onDisconnectCallback = callback;
#endif
return static_cast<T&>(*this);
}

T& onSubscribe(espMqttClientTypes::OnSubscribeCallback callback) {
T& onSubscribe(espMqttClientTypes::OnSubscribeCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onSubscribeCallbacks.emplace_back(callback, id);
#else
(void) id;
_onSubscribeCallback = callback;
#endif
return static_cast<T&>(*this);
}

T& onUnsubscribe(espMqttClientTypes::OnUnsubscribeCallback callback) {
T& onUnsubscribe(espMqttClientTypes::OnUnsubscribeCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onUnsubscribeCallbacks.emplace_back(callback, id);
#else
(void) id;
_onUnsubscribeCallback = callback;
#endif
return static_cast<T&>(*this);
}

T& onMessage(espMqttClientTypes::OnMessageCallback callback) {
T& onMessage(espMqttClientTypes::OnMessageCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onMessageCallbacks.emplace_back(callback, id);
#else
(void) id;
_onMessageCallback = callback;
#endif
return static_cast<T&>(*this);
}

T& onPublish(espMqttClientTypes::OnPublishCallback callback) {
T& onPublish(espMqttClientTypes::OnPublishCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onPublishCallbacks.emplace_back(callback, id);
#else
(void) id;
_onPublishCallback = callback;
#endif
return static_cast<T&>(*this);
}

#if EMC_MULTIPLE_CALLBACKS
T& removeOnConnect(uint32_t id) {
for (auto it = _onConnectCallbacks.begin(); it != _onConnectCallbacks.end(); ++it) {
if (it->second == id) {
_onConnectCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}

T& removeOnDisconnect(uint32_t id) {
for (auto it = _onDisconnectCallbacks.begin(); it != _onDisconnectCallbacks.end(); ++it) {
if (it->second == id) {
_onDisconnectCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}

T& removeOnSubscribe(uint32_t id) {
for (auto it = _onSubscribeCallbacks.begin(); it != _onSubscribeCallbacks.end(); ++it) {
if (it->second == id) {
_onSubscribeCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}

T& removeOnUnsubscribe(uint32_t id) {
for (auto it = _onUnsubscribeCallbacks.begin(); it != _onUnsubscribeCallbacks.end(); ++it) {
if (it->second == id) {
_onUnsubscribeCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}

T& removeOnMessage(uint32_t id) {
for (auto it = _onMessageCallbacks.begin(); it != _onMessageCallbacks.end(); ++it) {
if (it->second == id) {
_onMessageCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}

T& removeOnPublish(uint32_t id) {
for (auto it = _onPublishCallbacks.begin(); it != _onPublishCallbacks.end(); ++it) {
if (it->second == id) {
_onPublishCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}
#endif

/*
T& onError(espMqttClientTypes::OnErrorCallback callback) {
Expand All @@ -112,5 +209,37 @@ class MqttClientSetup : public MqttClient {

protected:
explicit MqttClientSetup(espMqttClientTypes::UseInternalTask useInternalTask, uint8_t priority = 1, uint8_t core = 1)
: MqttClient(useInternalTask, priority, core) {}
: MqttClient(useInternalTask, priority, core) {
#if EMC_MULTIPLE_CALLBACKS
_onConnectCallback = [this](bool sessionPresent) {
for (auto callback : _onConnectCallbacks) if (callback.first) callback.first(sessionPresent);
};
_onDisconnectCallback = [this](espMqttClientTypes::DisconnectReason reason) {
for (auto callback : _onDisconnectCallbacks) if (callback.first) callback.first(reason);
};
_onSubscribeCallback = [this](uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* returncodes, size_t len) {
for (auto callback : _onSubscribeCallbacks) if (callback.first) callback.first(packetId, returncodes, len);
};
_onUnsubscribeCallback = [this](int16_t packetId) {
for (auto callback : _onUnsubscribeCallbacks) if (callback.first) callback.first(packetId);
};
_onMessageCallback = [this](const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) {
for (auto callback : _onMessageCallbacks) if (callback.first) callback.first(properties, topic, payload, len, index, total);
};
_onPublishCallback = [this](uint16_t packetId) {
for (auto callback : _onPublishCallbacks) if (callback.first) callback.first(packetId);
};
#else
// empty
#endif
}

#if EMC_MULTIPLE_CALLBACKS
std::list<std::pair<espMqttClientTypes::OnConnectCallback, uint32_t>> _onConnectCallbacks;
std::list<std::pair<espMqttClientTypes::OnDisconnectCallback, uint32_t>> _onDisconnectCallbacks;
std::list<std::pair<espMqttClientTypes::OnSubscribeCallback, uint32_t>> _onSubscribeCallbacks;
std::list<std::pair<espMqttClientTypes::OnUnsubscribeCallback, uint32_t>> _onUnsubscribeCallbacks;
std::list<std::pair<espMqttClientTypes::OnMessageCallback, uint32_t>> _onMessageCallbacks;
std::list<std::pair<espMqttClientTypes::OnPublishCallback, uint32_t>> _onPublishCallbacks;
#endif
};
Loading

0 comments on commit a4a390f

Please sign in to comment.