From c91fec11ab4fba2a3157d933508edd4fec791838 Mon Sep 17 00:00:00 2001 From: Thomas Meyerdirks Date: Mon, 18 Sep 2023 07:09:33 +0200 Subject: [PATCH 1/2] add higher keep alive time to prevent connection refused --- lib/AgrirouterClient/inc/Definitions.h | 2 ++ lib/AgrirouterClient/inc/Settings.h | 3 +++ .../src/ConnectionProvider/MqttConnectionClient.cpp | 7 ++++++- lib/AgrirouterClient/src/Settings.cpp | 10 ++++++++++ 4 files changed, 21 insertions(+), 1 deletion(-) diff --git a/lib/AgrirouterClient/inc/Definitions.h b/lib/AgrirouterClient/inc/Definitions.h index dcca17d..9e2a75e 100644 --- a/lib/AgrirouterClient/inc/Definitions.h +++ b/lib/AgrirouterClient/inc/Definitions.h @@ -55,6 +55,7 @@ #define MG_PARAMETER_CONNECTION_TYPE (MG_PARAMETER_BASE + 17) #define MG_PARAMETER_POLLING_INTERVAL (MG_PARAMETER_BASE + 18) #define MG_PARAMETER_POLLING_MAX_TIME (MG_PARAMETER_BASE + 19) +#define MG_PARAMETER_MQTT_KEEP_ALIVE_TIME (MG_PARAMETER_BASE + 20) #define MG_EV_BASE 200 #define MG_EV_CAPABILITIES (MG_EV_BASE + CAPABILITIES) @@ -93,6 +94,7 @@ // Other definitions #define DEFAULT_CHUNK_SIZE 300000 // 0,3 MB +#define DEFAULT_KEEP_ALIVE_TIME 240 // 240 s // Protobuf typedefs typedef agrirouter::request::RequestEnvelope RequestEnvelope; diff --git a/lib/AgrirouterClient/inc/Settings.h b/lib/AgrirouterClient/inc/Settings.h index 2f02a90..ee94f87 100644 --- a/lib/AgrirouterClient/inc/Settings.h +++ b/lib/AgrirouterClient/inc/Settings.h @@ -94,6 +94,8 @@ class Settings int getPollingInterval(); void setPollingMaxTime(int pollingMaxTime); int getPollingMaxTime(); + void setMqttKeepAliveTime(int keepAliveTime); + int getMqttKeepAliveTime(); private: onParameterChangeCallback m_onParameter; @@ -128,6 +130,7 @@ class Settings // For general purposes int m_pollingInterval = 0; int m_pollingMaxTime = 0; + int m_mqttKeepAliveTime = 0; }; #endif // LIB_AGRIROUTERCLIENT_INC_SETTINGS_H_ diff --git a/lib/AgrirouterClient/src/ConnectionProvider/MqttConnectionClient.cpp b/lib/AgrirouterClient/src/ConnectionProvider/MqttConnectionClient.cpp index 3496343..73b442d 100644 --- a/lib/AgrirouterClient/src/ConnectionProvider/MqttConnectionClient.cpp +++ b/lib/AgrirouterClient/src/ConnectionProvider/MqttConnectionClient.cpp @@ -83,8 +83,13 @@ int MqttConnectionClient::init() (m_mqttErrorCallback) (MG_ERROR_MISSING_OR_EXPIRED_CERTIFICATE, "MqttConnectionClient: MQTT TLS Failed", errorJSON, m_member); return EXIT_FAILURE; } + int keepAliveTime = m_settings->getMqttKeepAliveTime(); + if(keepAliveTime == 0) + { + keepAliveTime = DEFAULT_KEEP_ALIVE_TIME; + } - int connect = mosquitto_connect_async(m_mosq, m_host.c_str(), m_port, 20); + int connect = mosquitto_connect_async(m_mosq, m_host.c_str(), m_port, keepAliveTime); if(connect == MOSQ_ERR_SUCCESS) { m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: connect set successful - " + std::to_string(connect) + ": " + mosquitto_strerror(connect)); diff --git a/lib/AgrirouterClient/src/Settings.cpp b/lib/AgrirouterClient/src/Settings.cpp index 90f4726..681fb53 100644 --- a/lib/AgrirouterClient/src/Settings.cpp +++ b/lib/AgrirouterClient/src/Settings.cpp @@ -312,3 +312,13 @@ void Settings::setPollingMaxTime(int pollingMaxTime) } int Settings::getPollingMaxTime() { return m_pollingMaxTime; } + +void Settings::setMqttKeepAliveTime(int keepAliveTime) +{ + m_mqttKeepAliveTime = keepAliveTime; + m_onParameter(MG_PARAMETER_MQTT_KEEP_ALIVE_TIME, + static_cast(&keepAliveTime), + m_callbackCallee); +} + +int Settings::getMqttKeepAliveTime() { return m_mqttKeepAliveTime; } From 520709dcee76dcde08f35959b3d16041b90babc0 Mon Sep 17 00:00:00 2001 From: Thomas Meyerdirks <75115970+ThomasMeyerdirks@users.noreply.github.com> Date: Thu, 18 Jan 2024 13:43:57 +0100 Subject: [PATCH 2/2] MQTT - segmentation fault on init mqtt conncetion with an old mosquitto version (#14) * segmentation fault init mqtt with an old mosquitto version * use private methods * first loop and then connect to connect on broker that missing on startup * revert change loop and connect - no connection in this case --- .../inc/MqttConnectionClient.h | 7 +++ .../MqttConnectionClient.cpp | 48 +++++++++++-------- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/lib/AgrirouterClient/inc/MqttConnectionClient.h b/lib/AgrirouterClient/inc/MqttConnectionClient.h index f86e7f9..7f0e910 100644 --- a/lib/AgrirouterClient/inc/MqttConnectionClient.h +++ b/lib/AgrirouterClient/inc/MqttConnectionClient.h @@ -5,6 +5,8 @@ #include "Settings.h" #include "third_party/mosquitto/mosquitto.h" +#include + class MqttConnectionClient { public: @@ -53,6 +55,11 @@ class MqttConnectionClient { static void subscribeCallback(struct mosquitto *mosq, void *obj, int messageId, int qosCount, const int *grantedQos); static void unsubscribeCallback(struct mosquitto *mosq, void *obj, int messageId); static void messageCallback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message); + + static std::string globalSecret; + static std::mutex mutexSecret; + static std::string getStaticSecret(); + static void setStaticSecret(std::string secret); }; #endif // LIB_AGRIROUTERCLIENT_INC_MQTTCONNECTIONCLIENT_H_ diff --git a/lib/AgrirouterClient/src/ConnectionProvider/MqttConnectionClient.cpp b/lib/AgrirouterClient/src/ConnectionProvider/MqttConnectionClient.cpp index 73b442d..6ba079f 100644 --- a/lib/AgrirouterClient/src/ConnectionProvider/MqttConnectionClient.cpp +++ b/lib/AgrirouterClient/src/ConnectionProvider/MqttConnectionClient.cpp @@ -2,12 +2,16 @@ #include "third_party/mosquitto/mosquitto_internal.h" +std::string MqttConnectionClient::globalSecret = ""; +std::mutex MqttConnectionClient::mutexSecret; + MqttConnectionClient::MqttConnectionClient(const std::string& clientId, const std::string& host, int port, Settings *settings) { m_clientId = clientId; m_port = port; m_host = host; m_settings = settings; + setStaticSecret(m_settings->getConnectionParameters().secret); } MqttConnectionClient::~MqttConnectionClient() @@ -27,11 +31,6 @@ int MqttConnectionClient::init() if (m_mosq != nullptr) { - // set this to mosq for the pw_callback - // the function mosquitto_user_data_set(..) not working for this callback - // the direct set to struct, not the best solution but it works - m_mosq->userdata = this; - mosquitto_connect_callback_set(m_mosq, connectCallback); mosquitto_disconnect_callback_set(m_mosq, disconnectCallback); mosquitto_publish_callback_set(m_mosq, publishCallback); @@ -114,7 +113,7 @@ int MqttConnectionClient::init() (m_mqttErrorCallback) (loop, errorMessage, "", m_member); return EXIT_FAILURE; } - + if ((connect == MOSQ_ERR_SUCCESS) && (loop == MOSQ_ERR_SUCCESS)) { return EXIT_SUCCESS; @@ -153,11 +152,7 @@ void* MqttConnectionClient::getMember() { return m_member; } int MqttConnectionClient::onPWCallback(char *buf, int size, int rwflag, void *userdata) { - struct mosquitto *mosq = static_cast(userdata); - MqttConnectionClient *self = static_cast(mosq->userdata); - std::string secret = self->m_settings->getConnectionParameters().secret.c_str(); - - strncpy(buf, secret.c_str(), size); + strncpy(buf, getStaticSecret().c_str(), size); buf[size-1] = '\0'; return strlen(buf); } @@ -178,7 +173,6 @@ void MqttConnectionClient::connectCallback(struct mosquitto *mosq, void *obj, in self->m_settings->callOnLog(MG_LFL_ERR, errorMessage); (self->m_mqttErrorCallback) (reasonCode, errorMessage, "", self->m_member); } - } void MqttConnectionClient::disconnectCallback(struct mosquitto *mosq, void *obj, int reasonCode) @@ -187,13 +181,13 @@ void MqttConnectionClient::disconnectCallback(struct mosquitto *mosq, void *obj, self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: disconnect callback with result: '" + std::to_string(reasonCode) + ":" + mosquitto_strerror(reasonCode) + "'"); self->m_connected = false; - // reasonCode 0 disconnect is called by client, so no reconnect on destruct mqtt client + // reasonCode 0 disconnect is called by client, so no reconnect on destruct mqtt client if(reasonCode > 0) { std::string errorMessage = "MqttConnectionClient: disconnect unexpected " + std::to_string(reasonCode) + ": " + mosquitto_connack_string(reasonCode); self->m_settings->callOnLog(MG_LFL_ERR, errorMessage); (self->m_mqttErrorCallback) (reasonCode, errorMessage, "", self->m_member); - + // try to reconnect int reconn = mosquitto_reconnect(self->m_mosq); @@ -219,7 +213,7 @@ void MqttConnectionClient::publishCallback(struct mosquitto *mosq, void *obj, in void MqttConnectionClient::loggingCallback(struct mosquitto *mosq, void *obj, int level, const char *message) { MqttConnectionClient *self = static_cast(obj); - self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: logging callback with message: '" + std::string(message) + + self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: logging callback with message: '" + std::string(message) + "' and Log level: '" + std::to_string(level) + "'"); if(std::string(message).find("tls_process_server_certificate:certificate verify failed") != std::string::npos) @@ -243,7 +237,7 @@ void MqttConnectionClient::subscribeCallback(struct mosquitto *mosq, void *obj, } else { - std::string errorMessage = "MqttConnectionClient: subscribe callback count " + std::to_string(i) + " failed with " + std::to_string(grantedQos[i]); + std::string errorMessage = "MqttConnectionClient: subscribe callback count " + std::to_string(i) + " failed with " + std::to_string(grantedQos[i]); self->m_settings->callOnLog(MG_LFL_ERR, errorMessage); (self->m_mqttErrorCallback) (grantedQos[i], errorMessage, "", self->m_member); } @@ -259,7 +253,7 @@ void MqttConnectionClient::unsubscribeCallback(struct mosquitto *mosq, void *obj void MqttConnectionClient::messageCallback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { MqttConnectionClient *self = static_cast(obj); - self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(message->mid) + "] messageCallback on topic " + + self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(message->mid) + "] messageCallback on topic " + message->topic + " with qos " + std::to_string(message->qos)); (self->m_mqttCallback)(message->topic, message->payload, message->payloadlen, self->m_member); @@ -267,7 +261,7 @@ void MqttConnectionClient::messageCallback(struct mosquitto *mosq, void *obj, co void MqttConnectionClient::subscribe(const std::string& topic, int qos) { - m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] subscribing on topic " + + m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] subscribing on topic " + topic.c_str() + " with qos " + std::to_string(qos)); mosquitto_subscribe(m_mosq, &(m_messageId), topic.c_str(), qos); @@ -276,7 +270,7 @@ void MqttConnectionClient::subscribe(const std::string& topic, int qos) void MqttConnectionClient::publish(const std::string& topic, const std::string& payload, int qos) { - m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " + + m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " + topic.c_str() + " with qos " + std::to_string(qos) + " and payload-length " + std::to_string(payload.length())); mosquitto_publish(m_mosq, &(m_messageId), topic.c_str(), strlen(payload.c_str()), payload.c_str(), qos, 0); @@ -285,7 +279,7 @@ void MqttConnectionClient::publish(const std::string& topic, const std::string& void MqttConnectionClient::publish(const std::string& topic, char *payload, int size, int qos) { - m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " + + m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " + topic.c_str() + " with qos " + std::to_string(qos) + " and size " + std::to_string(size)); mosquitto_publish(m_mosq, &(m_messageId), topic.c_str(), size, payload, qos, false); @@ -294,7 +288,7 @@ void MqttConnectionClient::publish(const std::string& topic, char *payload, int void MqttConnectionClient::publish(const std::string& topic, char *payload, int size, int qos, bool retain) { - m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " + + m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " + topic.c_str() + " with qos " + std::to_string(qos) + " and size " + std::to_string(size) + " and retain " + std::to_string(retain)); mosquitto_publish(m_mosq, &(m_messageId), topic.c_str(), size, payload, qos, retain); @@ -305,3 +299,15 @@ bool MqttConnectionClient::isConnected() { return m_connected; } + +void MqttConnectionClient::setStaticSecret(std::string secret) +{ + std::lock_guard lock(mutexSecret); + globalSecret = secret; +} + +std::string MqttConnectionClient::getStaticSecret() +{ + std::lock_guard lock(mutexSecret); + return globalSecret; +}