Skip to content

Commit

Permalink
mqtt: add the topic name to $MQTT_TOPIC
Browse files Browse the repository at this point in the history
Signed-off-by: Balazs Scheidler <[email protected]>
  • Loading branch information
bazsi committed Feb 8, 2024
1 parent 4cb39e1 commit 92cd98f
Showing 1 changed file with 8 additions and 10 deletions.
18 changes: 8 additions & 10 deletions modules/mqtt/source/mqtt-source.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#define RECEIVE_TIMEOUT 1000

static NVHandle handle_mqtt_topic = 0;

void
mqtt_sd_set_topic(LogDriver *s, const gchar *topic)
{
Expand Down Expand Up @@ -60,15 +62,6 @@ _format_stats_key(LogThreadedSourceDriver *s, StatsClusterKeyBuilder *kb)
stats_cluster_key_builder_add_legacy_label(kb, stats_cluster_label("topic", self->topic));
}

static LogMessage *
_create_message(MQTTSourceDriver *self, const gchar *message, gint length)
{
LogMessage *msg = log_msg_new_empty();
log_msg_set_value(msg, LM_V_MESSAGE, message, length);
log_msg_set_value_to_string(msg, LM_V_TRANSPORT, "mqtt");
return msg;
}

static gboolean
_client_init(MQTTSourceDriver *self)
{
Expand Down Expand Up @@ -197,7 +190,11 @@ _fetch(LogThreadedFetcherDriver *s)

if (result == THREADED_FETCH_SUCCESS)
{
msg = _create_message(self, (gchar *)message->payload, message->payloadlen);
LogMessage *msg = log_msg_new_empty();
log_msg_set_value(msg, LM_V_MESSAGE, message, length);
log_msg_set_value(msg, handle_mqtt_topic, topicName, topicLen);
log_msg_set_value_to_string(msg, LM_V_TRANSPORT, "mqtt");

MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
}
Expand All @@ -221,6 +218,7 @@ _init(LogPipe *s)
{
MQTTSourceDriver *self = (MQTTSourceDriver *)s;

handle_mqtt_topic = log_msg_get_value_handle("MQTT_TOPIC");
if (!self->topic)
{
msg_error("mqtt: the topic() argument is required for mqtt source",
Expand Down

0 comments on commit 92cd98f

Please sign in to comment.