Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT - Fix Interrupt in connection on request files #13

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/AgrirouterClient/inc/Definitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#define MG_PARAMETER_CONNECTION_TYPE (MG_PARAMETER_BASE + 17)
#define MG_PARAMETER_POLLING_INTERVAL (MG_PARAMETER_BASE + 18)
#define MG_PARAMETER_POLLING_MAX_TIME (MG_PARAMETER_BASE + 19)
#define MG_PARAMETER_MQTT_KEEP_ALIVE_TIME (MG_PARAMETER_BASE + 20)

#define MG_EV_BASE 200
#define MG_EV_CAPABILITIES (MG_EV_BASE + CAPABILITIES)
Expand Down Expand Up @@ -93,6 +94,7 @@

// Other definitions
#define DEFAULT_CHUNK_SIZE 300000 // 0,3 MB
#define DEFAULT_KEEP_ALIVE_TIME 240 // 240 s

// Protobuf typedefs
typedef agrirouter::request::RequestEnvelope RequestEnvelope;
Expand Down
7 changes: 7 additions & 0 deletions lib/AgrirouterClient/inc/MqttConnectionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include "Settings.h"
#include "third_party/mosquitto/mosquitto.h"

#include <mutex>

class MqttConnectionClient {

public:
Expand Down Expand Up @@ -53,6 +55,11 @@ class MqttConnectionClient {
static void subscribeCallback(struct mosquitto *mosq, void *obj, int messageId, int qosCount, const int *grantedQos);
static void unsubscribeCallback(struct mosquitto *mosq, void *obj, int messageId);
static void messageCallback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message);

static std::string globalSecret;
static std::mutex mutexSecret;
static std::string getStaticSecret();
static void setStaticSecret(std::string secret);
};

#endif // LIB_AGRIROUTERCLIENT_INC_MQTTCONNECTIONCLIENT_H_
3 changes: 3 additions & 0 deletions lib/AgrirouterClient/inc/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class Settings
int getPollingInterval();
void setPollingMaxTime(int pollingMaxTime);
int getPollingMaxTime();
void setMqttKeepAliveTime(int keepAliveTime);
int getMqttKeepAliveTime();

private:
onParameterChangeCallback m_onParameter;
Expand Down Expand Up @@ -128,6 +130,7 @@ class Settings
// For general purposes
int m_pollingInterval = 0;
int m_pollingMaxTime = 0;
int m_mqttKeepAliveTime = 0;
};

#endif // LIB_AGRIROUTERCLIENT_INC_SETTINGS_H_
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

#include "third_party/mosquitto/mosquitto_internal.h"

std::string MqttConnectionClient::globalSecret = "";
std::mutex MqttConnectionClient::mutexSecret;

MqttConnectionClient::MqttConnectionClient(const std::string& clientId, const std::string& host, int port, Settings *settings)
{
m_clientId = clientId;
m_port = port;
m_host = host;
m_settings = settings;
setStaticSecret(m_settings->getConnectionParameters().secret);
}

MqttConnectionClient::~MqttConnectionClient()
Expand All @@ -27,11 +31,6 @@ int MqttConnectionClient::init()

if (m_mosq != nullptr)
{
// set this to mosq for the pw_callback
// the function mosquitto_user_data_set(..) not working for this callback
// the direct set to struct, not the best solution but it works
m_mosq->userdata = this;

mosquitto_connect_callback_set(m_mosq, connectCallback);
mosquitto_disconnect_callback_set(m_mosq, disconnectCallback);
mosquitto_publish_callback_set(m_mosq, publishCallback);
Expand Down Expand Up @@ -83,8 +82,13 @@ int MqttConnectionClient::init()
(m_mqttErrorCallback) (MG_ERROR_MISSING_OR_EXPIRED_CERTIFICATE, "MqttConnectionClient: MQTT TLS Failed", errorJSON, m_member);
return EXIT_FAILURE;
}
int keepAliveTime = m_settings->getMqttKeepAliveTime();
if(keepAliveTime == 0)
{
keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
}

int connect = mosquitto_connect_async(m_mosq, m_host.c_str(), m_port, 20);
int connect = mosquitto_connect_async(m_mosq, m_host.c_str(), m_port, keepAliveTime);
if(connect == MOSQ_ERR_SUCCESS)
{
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: connect set successful - " + std::to_string(connect) + ": " + mosquitto_strerror(connect));
Expand All @@ -109,7 +113,7 @@ int MqttConnectionClient::init()
(m_mqttErrorCallback) (loop, errorMessage, "", m_member);
return EXIT_FAILURE;
}

if ((connect == MOSQ_ERR_SUCCESS) && (loop == MOSQ_ERR_SUCCESS))
{
return EXIT_SUCCESS;
Expand Down Expand Up @@ -148,11 +152,7 @@ void* MqttConnectionClient::getMember() { return m_member; }

int MqttConnectionClient::onPWCallback(char *buf, int size, int rwflag, void *userdata)
{
struct mosquitto *mosq = static_cast<struct mosquitto *>(userdata);
MqttConnectionClient *self = static_cast<MqttConnectionClient *>(mosq->userdata);
std::string secret = self->m_settings->getConnectionParameters().secret.c_str();

strncpy(buf, secret.c_str(), size);
strncpy(buf, getStaticSecret().c_str(), size);
buf[size-1] = '\0';
return strlen(buf);
}
Expand All @@ -173,7 +173,6 @@ void MqttConnectionClient::connectCallback(struct mosquitto *mosq, void *obj, in
self->m_settings->callOnLog(MG_LFL_ERR, errorMessage);
(self->m_mqttErrorCallback) (reasonCode, errorMessage, "", self->m_member);
}

}

void MqttConnectionClient::disconnectCallback(struct mosquitto *mosq, void *obj, int reasonCode)
Expand All @@ -182,13 +181,13 @@ void MqttConnectionClient::disconnectCallback(struct mosquitto *mosq, void *obj,
self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: disconnect callback with result: '" + std::to_string(reasonCode) + ":" + mosquitto_strerror(reasonCode) + "'");
self->m_connected = false;

// reasonCode 0 disconnect is called by client, so no reconnect on destruct mqtt client
// reasonCode 0 disconnect is called by client, so no reconnect on destruct mqtt client
if(reasonCode > 0)
{
std::string errorMessage = "MqttConnectionClient: disconnect unexpected " + std::to_string(reasonCode) + ": " + mosquitto_connack_string(reasonCode);
self->m_settings->callOnLog(MG_LFL_ERR, errorMessage);
(self->m_mqttErrorCallback) (reasonCode, errorMessage, "", self->m_member);


// try to reconnect
int reconn = mosquitto_reconnect(self->m_mosq);
Expand All @@ -214,7 +213,7 @@ void MqttConnectionClient::publishCallback(struct mosquitto *mosq, void *obj, in
void MqttConnectionClient::loggingCallback(struct mosquitto *mosq, void *obj, int level, const char *message)
{
MqttConnectionClient *self = static_cast<MqttConnectionClient *>(obj);
self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: logging callback with message: '" + std::string(message) +
self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: logging callback with message: '" + std::string(message) +
"' and Log level: '" + std::to_string(level) + "'");

if(std::string(message).find("tls_process_server_certificate:certificate verify failed") != std::string::npos)
Expand All @@ -238,7 +237,7 @@ void MqttConnectionClient::subscribeCallback(struct mosquitto *mosq, void *obj,
}
else
{
std::string errorMessage = "MqttConnectionClient: subscribe callback count " + std::to_string(i) + " failed with " + std::to_string(grantedQos[i]);
std::string errorMessage = "MqttConnectionClient: subscribe callback count " + std::to_string(i) + " failed with " + std::to_string(grantedQos[i]);
self->m_settings->callOnLog(MG_LFL_ERR, errorMessage);
(self->m_mqttErrorCallback) (grantedQos[i], errorMessage, "", self->m_member);
}
Expand All @@ -254,15 +253,15 @@ void MqttConnectionClient::unsubscribeCallback(struct mosquitto *mosq, void *obj
void MqttConnectionClient::messageCallback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
MqttConnectionClient *self = static_cast<MqttConnectionClient *>(obj);
self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(message->mid) + "] messageCallback on topic " +
self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(message->mid) + "] messageCallback on topic " +
message->topic + " with qos " + std::to_string(message->qos));

(self->m_mqttCallback)(message->topic, message->payload, message->payloadlen, self->m_member);
}

void MqttConnectionClient::subscribe(const std::string& topic, int qos)
{
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] subscribing on topic " +
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] subscribing on topic " +
topic.c_str() + " with qos " + std::to_string(qos));

mosquitto_subscribe(m_mosq, &(m_messageId), topic.c_str(), qos);
Expand All @@ -271,7 +270,7 @@ void MqttConnectionClient::subscribe(const std::string& topic, int qos)

void MqttConnectionClient::publish(const std::string& topic, const std::string& payload, int qos)
{
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
topic.c_str() + " with qos " + std::to_string(qos) + " and payload-length " + std::to_string(payload.length()));

mosquitto_publish(m_mosq, &(m_messageId), topic.c_str(), strlen(payload.c_str()), payload.c_str(), qos, 0);
Expand All @@ -280,7 +279,7 @@ void MqttConnectionClient::publish(const std::string& topic, const std::string&

void MqttConnectionClient::publish(const std::string& topic, char *payload, int size, int qos)
{
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
topic.c_str() + " with qos " + std::to_string(qos) + " and size " + std::to_string(size));

mosquitto_publish(m_mosq, &(m_messageId), topic.c_str(), size, payload, qos, false);
Expand All @@ -289,7 +288,7 @@ void MqttConnectionClient::publish(const std::string& topic, char *payload, int

void MqttConnectionClient::publish(const std::string& topic, char *payload, int size, int qos, bool retain)
{
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
topic.c_str() + " with qos " + std::to_string(qos) + " and size " + std::to_string(size) + " and retain " + std::to_string(retain));

mosquitto_publish(m_mosq, &(m_messageId), topic.c_str(), size, payload, qos, retain);
Expand All @@ -300,3 +299,15 @@ bool MqttConnectionClient::isConnected()
{
return m_connected;
}

void MqttConnectionClient::setStaticSecret(std::string secret)
{
std::lock_guard<std::mutex> lock(mutexSecret);
globalSecret = secret;
}

std::string MqttConnectionClient::getStaticSecret()
{
std::lock_guard<std::mutex> lock(mutexSecret);
return globalSecret;
}
10 changes: 10 additions & 0 deletions lib/AgrirouterClient/src/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,13 @@ void Settings::setPollingMaxTime(int pollingMaxTime)
}

int Settings::getPollingMaxTime() { return m_pollingMaxTime; }

void Settings::setMqttKeepAliveTime(int keepAliveTime)
{
m_mqttKeepAliveTime = keepAliveTime;
m_onParameter(MG_PARAMETER_MQTT_KEEP_ALIVE_TIME,
static_cast<void *>(&keepAliveTime),
m_callbackCallee);
}

int Settings::getMqttKeepAliveTime() { return m_mqttKeepAliveTime; }