From 92cd98f61852ac64652cdd787dd99422d3f0ed2d Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Thu, 8 Feb 2024 23:42:08 +0100 Subject: [PATCH] mqtt: add the topic name to $MQTT_TOPIC Signed-off-by: Balazs Scheidler --- modules/mqtt/source/mqtt-source.c | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/modules/mqtt/source/mqtt-source.c b/modules/mqtt/source/mqtt-source.c index 6956f4aa9d3..0b864837deb 100644 --- a/modules/mqtt/source/mqtt-source.c +++ b/modules/mqtt/source/mqtt-source.c @@ -26,6 +26,8 @@ #define RECEIVE_TIMEOUT 1000 +static NVHandle handle_mqtt_topic = 0; + void mqtt_sd_set_topic(LogDriver *s, const gchar *topic) { @@ -60,15 +62,6 @@ _format_stats_key(LogThreadedSourceDriver *s, StatsClusterKeyBuilder *kb) stats_cluster_key_builder_add_legacy_label(kb, stats_cluster_label("topic", self->topic)); } -static LogMessage * -_create_message(MQTTSourceDriver *self, const gchar *message, gint length) -{ - LogMessage *msg = log_msg_new_empty(); - log_msg_set_value(msg, LM_V_MESSAGE, message, length); - log_msg_set_value_to_string(msg, LM_V_TRANSPORT, "mqtt"); - return msg; -} - static gboolean _client_init(MQTTSourceDriver *self) { @@ -197,7 +190,11 @@ _fetch(LogThreadedFetcherDriver *s) if (result == THREADED_FETCH_SUCCESS) { - msg = _create_message(self, (gchar *)message->payload, message->payloadlen); + LogMessage *msg = log_msg_new_empty(); + log_msg_set_value(msg, LM_V_MESSAGE, message, length); + log_msg_set_value(msg, handle_mqtt_topic, topicName, topicLen); + log_msg_set_value_to_string(msg, LM_V_TRANSPORT, "mqtt"); + MQTTClient_freeMessage(&message); MQTTClient_free(topicName); } @@ -221,6 +218,7 @@ _init(LogPipe *s) { MQTTSourceDriver *self = (MQTTSourceDriver *)s; + handle_mqtt_topic = log_msg_get_value_handle("MQTT_TOPIC"); if (!self->topic) { msg_error("mqtt: the topic() argument is required for mqtt source",