diff --git a/cypress/e2e/cloud/deepLinks.test.ts b/cypress/e2e/cloud/deepLinks.test.ts index a7191d0878..f17db59f95 100644 --- a/cypress/e2e/cloud/deepLinks.test.ts +++ b/cypress/e2e/cloud/deepLinks.test.ts @@ -132,6 +132,12 @@ describe('Deep linking', () => { cy.visit('/me/tasks') cy.location('pathname').should('eq', `/orgs/${org.id}/tasks`) + cy.visit('/me/telegraf-mqtt') + cy.location('pathname').should( + 'eq', + `/orgs/${org.id}/load-data/telegraf-plugins/mqtt_consumer` + ) + cy.visit('/me/telegrafs') cy.location('pathname').should( 'eq', diff --git a/src/shared/constants/fluxFunctions.ts b/src/shared/constants/fluxFunctions.ts index 112571ce56..c80a363cba 100644 --- a/src/shared/constants/fluxFunctions.ts +++ b/src/shared/constants/fluxFunctions.ts @@ -4390,6 +4390,77 @@ export const FLUX_FUNCTIONS: FluxToolbarFunction[] = [ category: 'Transformations', link: `https://docs.influxdata.com/influxdb/${DOCS_URL_VERSION}/reference/flux/stdlib/built-in/transformations/movingaverage/`, }, + { + name: 'mqtt.to', + args: [ + { + name: 'broker', + desc: 'The MQTT broker connection string.', + type: 'String', + }, + { + name: 'topic', + desc: 'The MQTT topic to send data to.', + type: 'String', + }, + { + name: 'message', + desc: 'The message or payload to send to the MQTT broker. The default payload is an output table.', + type: 'String', + }, + { + name: 'qos', + desc: 'The MQTT Quality of Service (QoS) level. Values range from 0-2. Default is 0.', + type: 'Integer', + }, + { + name: 'clientid', + desc: 'The MQTT client ID.', + type: 'String', + }, + { + name: 'username', + desc: 'The username to send to the MQTT broker.', + type: 'String', + }, + { + name: 'password', + desc: 'The password to send to the MQTT broker.', + type: 'String', + }, + { + name: 'name', + desc: 'The name for the MQTT message.', + type: 'String', + }, + { + name: 'timeout', + desc: 'The MQTT connection timeout. Default is 1s.', + type: 'Duration', + }, + { + name: 'timeColumn', + desc: 'The column to use as time values in the output line protocol. Default is `"_time"`.', + type: 'String', + }, + { + name: 'tagColumns', + desc: 'The columns to use as tag sets in the output line protocol. Default is `[]`.', + type: 'Array of Strings', + }, + { + name: 'valueColumns', + desc: 'The columns to use as field values in the output line protocol. Default is `["_value"]`.', + type: 'Array of Strings', + }, + ], + package: 'experimental/mqtt', + desc: 'Outputs data to an MQTT broker using MQTT protocol.', + example: + 'mqtt.to(broker: "tcp://localhost:8883", topic: "example-topic", clientid: "exampleID", tagColumns: ["exampleTagKey"], valueColumns: ["_value"])', + category: 'Outputs', + link: `https://docs.influxdata.com/influxdb/${DOCS_URL_VERSION}/reference/flux/stdlib/experimental/mqtt/to/`, + }, { name: 'now', args: [], diff --git a/src/utils/deepLinks.ts b/src/utils/deepLinks.ts index 7983a46fd8..efc87cb3ce 100644 --- a/src/utils/deepLinks.ts +++ b/src/utils/deepLinks.ts @@ -29,6 +29,7 @@ export const buildDeepLinkingMap = (orgId: string) => ({ '/me/setup-nodejs': `/orgs/${orgId}/new-user-setup/nodejs`, '/me/setup-python': `/orgs/${orgId}/new-user-setup/python`, '/me/tasks': `/orgs/${orgId}/tasks`, + '/me/telegraf-mqtt': `/orgs/${orgId}/load-data/telegraf-plugins/mqtt_consumer`, '/me/telegrafs': `/orgs/${orgId}/load-data/telegrafs`, '/me/templates': `/orgs/${orgId}/settings/templates`, '/me/tokens': `/orgs/${orgId}/load-data/tokens`, diff --git a/src/writeData/components/telegrafInputPluginsConfigurationText/mqtt_consumer.conf b/src/writeData/components/telegrafInputPluginsConfigurationText/mqtt_consumer.conf new file mode 100644 index 0000000000..65b78d623e --- /dev/null +++ b/src/writeData/components/telegrafInputPluginsConfigurationText/mqtt_consumer.conf @@ -0,0 +1,79 @@ +# Read metrics from MQTT topic(s) +[[inputs.mqtt_consumer]] + ## Broker URLs for the MQTT server or cluster. To connect to multiple + ## clusters or standalone servers, use a separate plugin instance. + ## example: servers = ["tcp://localhost:1883"] + ## servers = ["ssl://localhost:1883"] + ## servers = ["ws://localhost:1883"] + servers = ["tcp://127.0.0.1:1883"] + + ## Topics that will be subscribed to. + topics = [ + "telegraf/host01/cpu", + "telegraf/+/mem", + "sensors/#", + ] + + ## The message topic will be stored in a tag specified by this value. If set + ## to the empty string no topic tag will be created. + # topic_tag = "topic" + + ## QoS policy for messages + ## 0 = at most once + ## 1 = at least once + ## 2 = exactly once + ## + ## When using a QoS of 1 or 2, you should enable persistent_session to allow + ## resuming unacknowledged messages. + # qos = 0 + + ## Connection timeout for initial connection in seconds + # connection_timeout = "30s" + + ## Maximum messages to read from the broker that have not been written by an + ## output. For best throughput set based on the number of metrics within + ## each message and the size of the output's metric_batch_size. + ## + ## For example, if each message from the queue contains 10 metrics and the + ## output metric_batch_size is 1000, setting this to 100 will ensure that a + ## full batch is collected and the write is triggered immediately without + ## waiting until the next flush_interval. + # max_undelivered_messages = 1000 + + ## Persistent session disables clearing of the client session on connection. + ## In order for this option to work you must also set client_id to identify + ## the client. To receive messages that arrived while the client is offline, + ## also set the qos option to 1 or 2 and don't forget to also set the QoS when + ## publishing. + # persistent_session = false + + ## If unset, a random client ID will be generated. + # client_id = "" + + ## Username and password to connect MQTT server. + # username = "telegraf" + # password = "metricsmetricsmetricsmetrics" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Enable extracting tag values from MQTT topics + ## _ denotes an ignored entry in the topic path + # [[inputs.mqtt_consumer.topic_parsing]] + # topic = "" + # measurement = "" + # tags = "" + # fields = "" + ## Value supported is int, float, unit + # [[inputs.mqtt_consumer.topic.types]] + # key = type diff --git a/src/writeData/components/telegrafPlugins/mqtt_consumer.md b/src/writeData/components/telegrafPlugins/mqtt_consumer.md new file mode 100644 index 0000000000..d4633e8541 --- /dev/null +++ b/src/writeData/components/telegrafPlugins/mqtt_consumer.md @@ -0,0 +1,208 @@ +# MQTT Consumer Input Plugin + +The [MQTT][mqtt] consumer plugin reads from the specified MQTT topics +and creates metrics using one of the supported [input data formats][]. + +## Configuration + +```toml @sample.conf +# Read metrics from MQTT topic(s) +[[inputs.mqtt_consumer]] + ## Broker URLs for the MQTT server or cluster. To connect to multiple + ## clusters or standalone servers, use a separate plugin instance. + ## example: servers = ["tcp://localhost:1883"] + ## servers = ["ssl://localhost:1883"] + ## servers = ["ws://localhost:1883"] + servers = ["tcp://127.0.0.1:1883"] + + ## Topics that will be subscribed to. + topics = [ + "telegraf/host01/cpu", + "telegraf/+/mem", + "sensors/#", + ] + + ## The message topic will be stored in a tag specified by this value. If set + ## to the empty string no topic tag will be created. + # topic_tag = "topic" + + ## QoS policy for messages + ## 0 = at most once + ## 1 = at least once + ## 2 = exactly once + ## + ## When using a QoS of 1 or 2, you should enable persistent_session to allow + ## resuming unacknowledged messages. + # qos = 0 + + ## Connection timeout for initial connection in seconds + # connection_timeout = "30s" + + ## Maximum messages to read from the broker that have not been written by an + ## output. For best throughput set based on the number of metrics within + ## each message and the size of the output's metric_batch_size. + ## + ## For example, if each message from the queue contains 10 metrics and the + ## output metric_batch_size is 1000, setting this to 100 will ensure that a + ## full batch is collected and the write is triggered immediately without + ## waiting until the next flush_interval. + # max_undelivered_messages = 1000 + + ## Persistent session disables clearing of the client session on connection. + ## In order for this option to work you must also set client_id to identify + ## the client. To receive messages that arrived while the client is offline, + ## also set the qos option to 1 or 2 and don't forget to also set the QoS when + ## publishing. + # persistent_session = false + + ## If unset, a random client ID will be generated. + # client_id = "" + + ## Username and password to connect MQTT server. + # username = "telegraf" + # password = "metricsmetricsmetricsmetrics" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Enable extracting tag values from MQTT topics + ## _ denotes an ignored entry in the topic path + # [[inputs.mqtt_consumer.topic_parsing]] + # topic = "" + # measurement = "" + # tags = "" + # fields = "" + ## Value supported is int, float, unit + # [[inputs.mqtt_consumer.topic.types]] + # key = type +``` + +## Example Output + +```text +mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=45i 1653579140440951943 +mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=100i 1653579153147395661 +``` + +## About Topic Parsing + +The MQTT topic as a whole is stored as a tag, but this can be far too coarse to +be easily used when utilizing the data further down the line. This change allows +tag values to be extracted from the MQTT topic letting you store the information +provided in the topic in a meaningful way. An `_` denotes an ignored entry in +the topic path. Please see the following example. + +### Topic Parsing Example + +```toml +[[inputs.mqtt_consumer]] + ## Broker URLs for the MQTT server or cluster. To connect to multiple + ## clusters or standalone servers, use a separate plugin instance. + ## example: servers = ["tcp://localhost:1883"] + ## servers = ["ssl://localhost:1883"] + ## servers = ["ws://localhost:1883"] + servers = ["tcp://127.0.0.1:1883"] + + ## Topics that will be subscribed to. + topics = [ + "telegraf/+/cpu/23", + ] + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "value" + data_type = "float" + + [[inputs.mqtt_consumer.topic_parsing]] + topic = "telegraf/one/cpu/23" + measurement = "_/_/measurement/_" + tags = "tag/_/_/_" + fields = "_/_/_/test" + [inputs.mqtt_consumer.topic_parsing.types] + test = "int" +``` + +Will result in the following metric: + +```text +cpu,host=pop-os,tag=telegraf,topic=telegraf/one/cpu/23 value=45,test=23i 1637014942460689291 +``` + +## Field Pivoting Example + +You can use the pivot processor to rotate single +valued metrics into a multi field metric. +For more info check out the pivot processors +[here][1]. + +For this example these are the topics: + +```text +/sensors/CLE/v1/device5/temp +/sensors/CLE/v1/device5/rpm +/sensors/CLE/v1/device5/ph +/sensors/CLE/v1/device5/spin +``` + +And these are the metrics: + +```text +sensors,site=CLE,version=v1,device_name=device5,field=temp value=390 +sensors,site=CLE,version=v1,device_name=device5,field=rpm value=45.0 +sensors,site=CLE,version=v1,device_name=device5,field=ph value=1.45 +``` + +Using pivot in the config will rotate the metrics into a multi field metric. +The config: + +```toml +[[inputs.mqtt_consumer]] + .... + topics = "/sensors/#" + [[inputs.mqtt_consumer.topic_parsing]] + measurement = "/measurement/_/_/_/_" + tags = "/_/site/version/device_name/field" +[[processors.pivot]] + tag_key = "field" + value_key = "value" +``` + +Will result in the following metric: + +```text +sensors,site=CLE,version=v1,device_name=device5 temp=390,rpm=45.0,ph=1.45 +``` + +[1]: "Pivot Processor" + +## Metrics + +- All measurements are tagged with the incoming topic, ie +`topic=telegraf/host01/cpu` + +- example when [[inputs.mqtt_consumer.topic_parsing]] is set + +- when [[inputs.internal]] is set: + - payload_size (int): get the cumulative size in bytes that have been received from incoming messages + - messages_received (int): count of the number of messages that have been received from mqtt + +This will result in the following metric: + +```text +internal_mqtt_consumer host=pop-os version=1.24.0 messages_received=622i payload_size=37942i 1657282270000000000 +``` + +[mqtt]: https://mqtt.org +[input data formats]: /docs/DATA_FORMATS_INPUT.md diff --git a/src/writeData/constants/contentTelegrafPlugins.ts b/src/writeData/constants/contentTelegrafPlugins.ts index 819f703b9b..d42b2146e7 100644 --- a/src/writeData/constants/contentTelegrafPlugins.ts +++ b/src/writeData/constants/contentTelegrafPlugins.ts @@ -116,6 +116,7 @@ import mockMarkdown from 'src/writeData/components/telegrafPlugins/mock.md' import modbusMarkdown from 'src/writeData/components/telegrafPlugins/modbus.md' import mongodbMarkdown from 'src/writeData/components/telegrafPlugins/mongodb.md' import monitMarkdown from 'src/writeData/components/telegrafPlugins/monit.md' +import mqtt_consumerMarkdown from 'src/writeData/components/telegrafPlugins/mqtt_consumer.md' import multifileMarkdown from 'src/writeData/components/telegrafPlugins/multifile.md' import mysqlMarkdown from 'src/writeData/components/telegrafPlugins/mysql.md' import natsMarkdown from 'src/writeData/components/telegrafPlugins/nats.md' @@ -299,6 +300,7 @@ import modbusLogo from 'src/writeData/graphics/modbus.svg' import mongodbLogo from 'src/writeData/graphics/mongodb.svg' import monitLogo from 'src/writeData/graphics/monit.svg' import monitor_with_workerLogo from 'src/writeData/graphics/monitor_with_worker.svg' +import mqtt_consumerLogo from 'src/writeData/graphics/mqtt_consumer.svg' import multifileLogo from 'src/writeData/graphics/multifile.svg' import mysqlLogo from 'src/writeData/graphics/mysql.svg' import natsLogo from 'src/writeData/graphics/nats.svg' @@ -1075,6 +1077,12 @@ export const WRITE_DATA_TELEGRAF_PLUGINS: TelegrafPluginAssets[] = [ markdown: monitMarkdown, image: monitLogo, }, + { + id: 'mqtt_consumer', + name: 'MQTT Consumer', + markdown: mqtt_consumerMarkdown, + image: mqtt_consumerLogo, + }, { id: 'multifile', name: 'Multifile', diff --git a/src/writeData/utils/updateTelegrafPlugins.mjs b/src/writeData/utils/updateTelegrafPlugins.mjs index 09c3598453..7b15a3105d 100644 --- a/src/writeData/utils/updateTelegrafPlugins.mjs +++ b/src/writeData/utils/updateTelegrafPlugins.mjs @@ -126,6 +126,7 @@ const inputPluginsList = [ 'modbus', 'mongodb', 'monit', + 'mqtt_consumer', 'multifile', 'mysql', 'nats',