Skip to content

Commit

Permalink
chore: revert changes made to mqtt telegraf plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
blegesse-w committed Nov 1, 2024
1 parent 9e59f51 commit 113bff0
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 0 deletions.
6 changes: 6 additions & 0 deletions cypress/e2e/cloud/deepLinks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ describe('Deep linking', () => {

cy.visit('/me/tasks')
cy.location('pathname').should('eq', `/orgs/${org.id}/tasks`)

cy.visit('/me/telegraf-mqtt')
cy.location('pathname').should(
'eq',
`/orgs/${org.id}/load-data/telegraf-plugins/mqtt_consumer`
)

cy.visit('/me/telegrafs')
cy.location('pathname').should(
Expand Down
71 changes: 71 additions & 0 deletions src/shared/constants/fluxFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4390,6 +4390,77 @@ export const FLUX_FUNCTIONS: FluxToolbarFunction[] = [
category: 'Transformations',
link: `https://docs.influxdata.com/influxdb/${DOCS_URL_VERSION}/reference/flux/stdlib/built-in/transformations/movingaverage/`,
},
{
name: 'mqtt.to',
args: [
{
name: 'broker',
desc: 'The MQTT broker connection string.',
type: 'String',
},
{
name: 'topic',
desc: 'The MQTT topic to send data to.',
type: 'String',
},
{
name: 'message',
desc: 'The message or payload to send to the MQTT broker. The default payload is an output table.',
type: 'String',
},
{
name: 'qos',
desc: 'The MQTT Quality of Service (QoS) level. Values range from 0-2. Default is 0.',
type: 'Integer',
},
{
name: 'clientid',
desc: 'The MQTT client ID.',
type: 'String',
},
{
name: 'username',
desc: 'The username to send to the MQTT broker.',
type: 'String',
},
{
name: 'password',
desc: 'The password to send to the MQTT broker.',
type: 'String',
},
{
name: 'name',
desc: 'The name for the MQTT message.',
type: 'String',
},
{
name: 'timeout',
desc: 'The MQTT connection timeout. Default is 1s.',
type: 'Duration',
},
{
name: 'timeColumn',
desc: 'The column to use as time values in the output line protocol. Default is `"_time"`.',
type: 'String',
},
{
name: 'tagColumns',
desc: 'The columns to use as tag sets in the output line protocol. Default is `[]`.',
type: 'Array of Strings',
},
{
name: 'valueColumns',
desc: 'The columns to use as field values in the output line protocol. Default is `["_value"]`.',
type: 'Array of Strings',
},
],
package: 'experimental/mqtt',
desc: 'Outputs data to an MQTT broker using MQTT protocol.',
example:
'mqtt.to(broker: "tcp://localhost:8883", topic: "example-topic", clientid: "exampleID", tagColumns: ["exampleTagKey"], valueColumns: ["_value"])',
category: 'Outputs',
link: `https://docs.influxdata.com/influxdb/${DOCS_URL_VERSION}/reference/flux/stdlib/experimental/mqtt/to/`,
},
{
name: 'now',
args: [],
Expand Down
1 change: 1 addition & 0 deletions src/utils/deepLinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const buildDeepLinkingMap = (orgId: string) => ({
'/me/setup-nodejs': `/orgs/${orgId}/new-user-setup/nodejs`,
'/me/setup-python': `/orgs/${orgId}/new-user-setup/python`,
'/me/tasks': `/orgs/${orgId}/tasks`,
'/me/telegraf-mqtt': `/orgs/${orgId}/load-data/telegraf-plugins/mqtt_consumer`,
'/me/telegrafs': `/orgs/${orgId}/load-data/telegrafs`,
'/me/templates': `/orgs/${orgId}/settings/templates`,
'/me/tokens': `/orgs/${orgId}/load-data/tokens`,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
## Broker URLs for the MQTT server or cluster. To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
## example: servers = ["tcp://localhost:1883"]
## servers = ["ssl://localhost:1883"]
## servers = ["ws://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]

## Topics that will be subscribed to.
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]

## The message topic will be stored in a tag specified by this value. If set
## to the empty string no topic tag will be created.
# topic_tag = "topic"

## QoS policy for messages
## 0 = at most once
## 1 = at least once
## 2 = exactly once
##
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
# qos = 0

## Connection timeout for initial connection in seconds
# connection_timeout = "30s"

## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000

## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identify
## the client. To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false

## If unset, a random client ID will be generated.
# client_id = ""

## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
# [[inputs.mqtt_consumer.topic_parsing]]
# topic = ""
# measurement = ""
# tags = ""
# fields = ""
## Value supported is int, float, unit
# [[inputs.mqtt_consumer.topic.types]]
# key = type

208 changes: 208 additions & 0 deletions src/writeData/components/telegrafPlugins/mqtt_consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# MQTT Consumer Input Plugin

The [MQTT][mqtt] consumer plugin reads from the specified MQTT topics
and creates metrics using one of the supported [input data formats][].

## Configuration

```toml @sample.conf
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
## Broker URLs for the MQTT server or cluster. To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
## example: servers = ["tcp://localhost:1883"]
## servers = ["ssl://localhost:1883"]
## servers = ["ws://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]

## Topics that will be subscribed to.
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]

## The message topic will be stored in a tag specified by this value. If set
## to the empty string no topic tag will be created.
# topic_tag = "topic"

## QoS policy for messages
## 0 = at most once
## 1 = at least once
## 2 = exactly once
##
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
# qos = 0

## Connection timeout for initial connection in seconds
# connection_timeout = "30s"

## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000

## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identify
## the client. To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false

## If unset, a random client ID will be generated.
# client_id = ""

## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
# [[inputs.mqtt_consumer.topic_parsing]]
# topic = ""
# measurement = ""
# tags = ""
# fields = ""
## Value supported is int, float, unit
# [[inputs.mqtt_consumer.topic.types]]
# key = type
```

## Example Output

```text
mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=45i 1653579140440951943
mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=100i 1653579153147395661
```

## About Topic Parsing

The MQTT topic as a whole is stored as a tag, but this can be far too coarse to
be easily used when utilizing the data further down the line. This change allows
tag values to be extracted from the MQTT topic letting you store the information
provided in the topic in a meaningful way. An `_` denotes an ignored entry in
the topic path. Please see the following example.

### Topic Parsing Example

```toml
[[inputs.mqtt_consumer]]
## Broker URLs for the MQTT server or cluster. To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
## example: servers = ["tcp://localhost:1883"]
## servers = ["ssl://localhost:1883"]
## servers = ["ws://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]

## Topics that will be subscribed to.
topics = [
"telegraf/+/cpu/23",
]

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "value"
data_type = "float"

[[inputs.mqtt_consumer.topic_parsing]]
topic = "telegraf/one/cpu/23"
measurement = "_/_/measurement/_"
tags = "tag/_/_/_"
fields = "_/_/_/test"
[inputs.mqtt_consumer.topic_parsing.types]
test = "int"
```

Will result in the following metric:

```text
cpu,host=pop-os,tag=telegraf,topic=telegraf/one/cpu/23 value=45,test=23i 1637014942460689291
```

## Field Pivoting Example

You can use the pivot processor to rotate single
valued metrics into a multi field metric.
For more info check out the pivot processors
[here][1].

For this example these are the topics:

```text
/sensors/CLE/v1/device5/temp
/sensors/CLE/v1/device5/rpm
/sensors/CLE/v1/device5/ph
/sensors/CLE/v1/device5/spin
```

And these are the metrics:

```text
sensors,site=CLE,version=v1,device_name=device5,field=temp value=390
sensors,site=CLE,version=v1,device_name=device5,field=rpm value=45.0
sensors,site=CLE,version=v1,device_name=device5,field=ph value=1.45
```

Using pivot in the config will rotate the metrics into a multi field metric.
The config:

```toml
[[inputs.mqtt_consumer]]
....
topics = "/sensors/#"
[[inputs.mqtt_consumer.topic_parsing]]
measurement = "/measurement/_/_/_/_"
tags = "/_/site/version/device_name/field"
[[processors.pivot]]
tag_key = "field"
value_key = "value"
```

Will result in the following metric:

```text
sensors,site=CLE,version=v1,device_name=device5 temp=390,rpm=45.0,ph=1.45
```

[1]: <https://github.com/influxdata/telegraf/tree/master/plugins/processors/pivot> "Pivot Processor"

## Metrics

- All measurements are tagged with the incoming topic, ie
`topic=telegraf/host01/cpu`

- example when [[inputs.mqtt_consumer.topic_parsing]] is set

- when [[inputs.internal]] is set:
- payload_size (int): get the cumulative size in bytes that have been received from incoming messages
- messages_received (int): count of the number of messages that have been received from mqtt

This will result in the following metric:

```text
internal_mqtt_consumer host=pop-os version=1.24.0 messages_received=622i payload_size=37942i 1657282270000000000
```

[mqtt]: https://mqtt.org
[input data formats]: /docs/DATA_FORMATS_INPUT.md
Loading

0 comments on commit 113bff0

Please sign in to comment.