- About this Project
- Installation
- How To Use
- Writes
- Queries
- Serializations
- InfluxDatabaseService for InfluxDB 1.x
- InfluxDatabaseService for InfluxDB 2.0
- Demo
- Contributing
- License
NiFi in current version has already built-in data processor org.apache.nifi.processors.influxdb.PutInfluxDB
(doc)
that accepts flow-files in InfluxDB’s Line Protocol
format and stores the data into InfluxDB.
This processor is useful only for processing data that are generated by InfluxDB Telegraf or other applications that provides data directly in InfluxDB’s Line Protocol. It is not possible to use it to store structured data (json, avro, csv ...).
This motivates us to implement new processor org.influxdata.nifi.processors.PutInfluxDatabaseRecord
that is based
on NiFi Record Design. This processor allows
to write any NiFi Record structured data into InfluxDB by PutInfluxDatabaseRecord
.
The processor works similarly as others NiFi built-in NiFi Record based
Put*Record
processors (PutDatabaseRecord
, PutHBase
,PutMongoRecord
, ...).
We also support the InfluxDB 2.0 with several new processors:
- Writes
- Queries
To install the InfluxDB Processors you will need to copy the appropriate nar file into the lib directory of your NiFi installation ($NIFI_HOME/lib) and restart NiFi.
The Nar compatibility matrix:
For example, to install the nar after download it to ~/Downloads
:
$ cp ~/Downloads/nifi-influx-database-nar-1.1.nar $NIFI_HOME/lib
Uses a specified RecordReader to write the content of a FlowFile into InfluxDB database.
- Input can be any built-in or custom implemented NiFi RecordReader (json, avro, csv,
InfluxLineProtocolReader
...) - Configurable mapping between NiFi Records and InfluxDB measurement, field and tags
- Configurable timestamp precision
- Reusable connection settings (InfluxDB url, password) for more processors via
InfluxDatabaseService
controller - Advanced InfluxDB client settings
- Gzip compression
- Batching, jitter, flush settings
The value is determined from the field in the Record Schema. If the field is not found in the schema then is used the value of Measurement property
.
Any data type is converted into a String type and used as the value.
The name of the field in the Record Schema is used as the key of the Tag. The value of the field is used as the value of the Tag. Any data type is converted into a String type and used as the Tag value see also handling complex types.
The value is determined from the field in the Record Schema. If the field is not found in the schema
or field has not defined value the timestamp is not specified for the Data Point.
The precision for the supplied time value is determined from the property Timestamp precision
.
The Apache NiFi complex Record fields are handled by different strategy:
Map
- keys are mapped as keys of Tags or Fields, values are mapped as values of Tags or FieldsChoice
- for the value is used the compatible type from Choice definitionArray
- based on property theComplex Field Behavior
Record
- based on property theComplex Field Behavior
Enabled batching will reduce reliability in the cost of better performance. The PutInfluxDatabaseRecord processor uses batching/buffering implemented in influxdb-java client. Processor can route flow file to the success relation before the batch buffer is flushed into the database. The batch buffer is stored in the system memory, so in the case of power failure or process kill, is content of buffer not written into InfluxDB.
Batching is useful when the flow file contains large number of records. Records are sent into InfluxDB in batching points with preconfigured size.
Property | Description |
---|---|
Record Reader | Specifies the Controller Service to use for parsing incoming data and determining the data's schema |
InfluxDB Controller Service | A controller service that provides connection to InfluxDB |
Database Name | InfluxDB database to connect to |
Enable gzip compression | Enable gzip compression for InfluxDB http request body |
Log Level | Controls the level of logging for the REST layer of InfluxDB client |
Consistency Level | InfluxDB consistency level |
Retention Policy | Retention policy for the saving the records |
Enable InfluxDB batching | Enabled batching speed up writes significantly but in the cost of loosing reliability. Flow file can be transfered to success releation before the batch buffer is flushed into database. For additional information see processor documentation. |
Batch flush duration | Flush at least every specified time |
Batch actions | The number of batch actions to collect |
Batch flush jitter | Jitters the batch flush interval by a random amount. |
Batch flush buffer limit | The client maintains a buffer for failed writes so that the writes will be retried later on. |
Measurement | The name of the measurement. If the Record contains a field with measurement property value, then value of the Record field is use as InfluxDB measurement |
Tags | A comma-separated list of record fields stored in InfluxDB as 'tag' |
Missing Tag Behavior | If the specified tag is not present in the document, this property specifies how to handle the situation. |
Fields | A comma-separated list of record fields stored in InfluxDB as 'field'. At least one field must be defined |
Missing Field Behavior | If the specified field is not present in the document, this property specifies how to handle the situation |
Timestamp field | A name of the record field that used as a 'timestamp' |
Timestamp precision | The timestamp precision is ignore when the 'Timestamp field' value is 'java.util.Date' |
Complex Field Behavior | Indicates how to handle complex fields, i.e. fields that do not have a primitive value |
Null Values Behavior | Indicates how to handle null fields, i.e. fields that do not have a defined value |
Max size of records | Maximum size of records allowed to be posted in one batch |
Property | Description |
---|---|
success | All FlowFiles that are written into InfluxDB are routed to this relationship |
retry | A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed. |
failure | All FlowFiles that cannot be written to InfluxDB are routed to this relationship |
Uses a specified RecordReader to write the content of a FlowFile into InfluxDB 2.0 database.
- Input can be any built-in or custom implemented NiFi RecordReader (json, avro, csv,
InfluxLineProtocolReader
...) - Configurable mapping between NiFi Records and InfluxDB measurement, field and tags
- Configurable timestamp precision
- Reusable connection settings (InfluxDB url, password) for more processors via
InfluxDatabaseService_2
controller - Advanced InfluxDB client settings
- Gzip compression
The value is determined from the field in the Record Schema. If the field is not found in the schema then is used the value of Measurement property
.
Any data type is converted into a String type and used as the value.
The name of the field in the Record Schema is used as the key of the Tag. The value of the field is used as the value of the Tag. Any data type is converted into a String type and used as the Tag value see also handling complex types.
The value is determined from the field in the Record Schema. If the field is not found in the schema
or field has not defined value the timestamp is not specified for the Data Point.
The precision for the supplied time value is determined from the property Timestamp precision
.
The Apache NiFi complex Record fields are handled by different strategy:
Map
- keys are mapped as keys of Tags or Fields, values are mapped as values of Tags or FieldsChoice
- for the value is used the compatible type from Choice definitionArray
- based on property theComplex Field Behavior
Record
- based on property theComplex Field Behavior
Property | Description |
---|---|
Record Reader | Specifies the Controller Service to use for parsing incoming data and determining the data's schema |
InfluxDB Controller Service | A controller service that provides connection to InfluxDB |
Bucket | Specifies the destination bucket for writes |
Organization | Specifies the destination organization for writes |
Enable gzip compression | Enable gzip compression for InfluxDB http request body |
Log Level | Controls the level of logging for the REST layer of InfluxDB client |
Measurement | The name of the measurement. If the Record contains a field with measurement property value, then value of the Record field is use as InfluxDB measurement |
Tags | A comma-separated list of record fields stored in InfluxDB as 'tag' |
Missing Tag Behavior | If the specified tag is not present in the document, this property specifies how to handle the situation. |
Fields | A comma-separated list of record fields stored in InfluxDB as 'field'. At least one field must be defined |
Missing Field Behavior | If the specified field is not present in the document, this property specifies how to handle the situation |
Timestamp field | A name of the record field that used as a 'timestamp' |
Timestamp precision | The timestamp precision is ignore when the 'Timestamp field' value is 'java.util.Date' |
Complex Field Behavior | Indicates how to handle complex fields, i.e. fields that do not have a primitive value |
Null Values Behavior | Indicates how to handle null fields, i.e. fields that do not have a defined value |
Max size of records | Maximum size of records allowed to be posted in one batch |
Property | Description |
---|---|
success | All FlowFiles that are written into InfluxDB are routed to this relationship |
retry | A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed. |
failure | All FlowFiles that cannot be written to InfluxDB are routed to this relationship |
Parses the InfluxDB Line Protocol into NiFi Record. This allows processing, filtering and partitioning data in NiFi obtained from Telegraf agents, IoT devices, InfluxDB subscriptions and other InfluxDB Line protocol devices.
Property | Description |
---|---|
Character Set | The Character Encoding that is used to decode the Line Protocol data |
Allows sharing connection configuration to InfluxDB 1.x among more NiFi processors. Also support a SSL connection.
Property | Description |
---|---|
SSL Context Service | The SSL Context Service used to provide client certificate information for TLS/SSL connections |
Client Auth | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. |
InfluxDB connection URL | InfluxDB URL to connect to. Eg: http://influxdb:8086 |
InfluxDB Max Connection Time Out | The maximum time for establishing connection to the InfluxDB |
Username | Username which is used to authorize against the InfluxDB |
Password | Password for the username which is used to authorize against the InfluxDB. If the authorization fail the FlowFile will be penalized and routed to 'retry' relationship. |
Allows sharing connection configuration to InfluxDB 2.0 among more NiFi processors. Also support a SSL connection.
Property | Description |
---|---|
SSL Context Service | The SSL Context Service used to provide client certificate information for TLS/SSL connections |
Client Auth | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. |
InfluxDB connection URL | InfluxDB URL to connect to. Eg: http://influxdb:8086 |
InfluxDB Max Connection Time Out | The maximum time for establishing connection to the InfluxDB |
InfluxDB Access Token | Access Token used for authenticating/authorizing the InfluxDB request sent by NiFi. |
Processor to write the content of a FlowFile in 'line protocol'. Please check details of the 'line protocol' in InfluxDB documentation (https://www.influxdb.com/). The flow file can contain single measurement point or multiple measurement points separated by line seperator. The timestamp precision is defined by Timestamp property. If you do not specify precision then the InfluxDB assumes that timestamps are in nanoseconds.
Property | Description |
---|---|
Database Name | InfluxDB database to connect to |
InfluxDB connection URL | InfluxDB URL to connect to. Eg: http://influxdb:8086 |
InfluxDB Max Connection Time Out | The maximum time for establishing connection to the InfluxDB |
Username | Username for accessing InfluxDB |
Password | Password for user |
Character Set | Specifies the character set of the document data |
Consistency Level | InfluxDB consistency level |
Retention Policy | Retention policy for the saving the records |
Timestamp precisions | The precision of the time stamps. InfluxDB assumes that timestamps are in nanoseconds if you do not specify precision |
Max size of records | Maximum size of records allowed to be posted in one batch |
Processor to write the content of a FlowFile in 'line protocol'. Please check details of the 'line protocol' in InfluxDB 2.0 documentation (https://www.influxdb.com/). The flow file can contain single measurement point or multiple measurement points separated by line separator. The timestamp precision is defined by Timestamp property.
Property | Description |
---|---|
InfluxDB Controller Service | A controller service that provides connection to InfluxDB |
Bucket | Specifies the destination bucket for writes |
Organization | Specifies the destination organization for writes |
Timestamp precisions | The precision of the time stamps |
Enable gzip compression | Enable gzip compression for InfluxDB http request body |
Log Level | Controls the level of logging for the REST layer of InfluxDB client |
Character Set | Specifies the character set of the document data |
Max size of records | Maximum size of records allowed to be posted in one batch |
Creates FlowFiles from records in InfluxDB 2.0 loaded by a user-specified Flux query.
Property | Description |
---|---|
InfluxDB Controller Service | A controller service that provides connection to InfluxDB |
Organization | Specifies the source organization |
Query | A valid Flux query to use to execute against InfluxDB |
Dialect Header | If true, the results will contain a header row |
Dialect Delimiter | Separator between cells; the default is "," |
Dialect Annotations | Describing properties about the columns of the table. More than one can be supplied if comma separated. Allowable values: "group", "datatype", "default". |
Dialect Comment Prefix | Character prefixed to comment strings. |
Dialect Date Time Format | Format of timestamps. |
Results Per FlowFile | How many records to put into a FlowFile at once. The whole body will be treated as a CSV file. |
Enable gzip compression | Enable gzip compression for InfluxDB http request body |
Log Level | Controls the level of logging for the REST layer of InfluxDB client |
A record-based version of GetInfluxDatabase_2 that uses the Record writers to write the Flux result set.
Property | Description |
---|---|
InfluxDB Controller Service | A controller service that provides connection to InfluxDB |
Record Writer | The record writer to use to write the result sets |
Organization | Specifies the source organization |
Query | A valid Flux query to use to execute against InfluxDB |
Dialect Date Time Format | Format of timestamps. |
Results Per FlowFile | How many records to put into a FlowFile at once. The whole body will be treated as a set of Records. |
Enable gzip compression | Enable gzip compression for InfluxDB http request body |
Log Level | Controls the level of logging for the REST layer of InfluxDB client |
Writes the contents of a RecordSet as Line Protocol. The configured writer is able to make Line Protocol by the Expression Language that reference each of the fields that are available in a Record. Each record in the RecordSet will be separated by a single newline character. The Record Schema is read from the incoming FlowFile.
Property | Description |
---|---|
Measurement | The name of the measurement. If the Record contains a field with measurement property value, then value of the Record field is use as InfluxDB measurement |
Tags | A comma-separated list of record fields stored in InfluxDB as 'tag' |
Missing Tag Behavior | If the specified tag is not present in the document, this property specifies how to handle the situation |
Fields | A comma-separated list of record fields stored in InfluxDB as 'field'. At least one field must be defined |
Missing Field Behavior | If the specified field is not present in the document, this property specifies how to handle the situation |
Timestamp field | A name of the record field that used as a 'timestamp' |
Timestamp precision | The timestamp precision is ignore when the 'Timestamp field' value is 'java.util.Date' |
Complex Field Behavior | Indicates how to handle complex fields, i.e. fields that do not have a primitive value |
Null Values Behavior | Indicates how to handle null fields, i.e. fields that do not have a defined value |
Character Set | The Character Encoding that is used to encode/decode the Line Protocol |
The demo requires Docker Engine, GNU gzip and curl on classpath.
- Download and unpack sources: download ZIP
- Run start script from the source directory:
./scripts/nifi-restart.sh
- Open Apache NiFi flow in browser: http://localhost:8080/nifi/
- Open Telegraf Dashboards in browser: Twitter, NiFi Container or NiFi Logs
As NiFi user we want to put data (complex json structure) to InfluxDB in order to work with time series.
The demo reads data from Twitter in complex JSON format based on supplied keywords and writes them into InfluxDB.
Data from Twitter are streamed into NiFi using built-in org.apache.nifi.processors.twitter.GetTwitter
processor.
Select Twitter Filter Endpoint API, Auth keys and tokens, and fill keywords to be searched in Terms to Filter On field. To access the Twitter API you need authorization keys that can be obtained from Twitter Apps.
Note, that the credentials embedded in demo may not work in shared environment, it is better to generate new for testing.
First we need to configure a new controller service called TwitterJSONReader
that maps Tweets JSON
into NiFi Records.
Record schema is specified using Schema Text field. In this demo we use following Apache Avro scheme:
{
"type": "record",
"name": "twitter_schema",
"namespace": "io.bonitoo.nifi",
"doc:" : "AVRO scheme for Tweets",
"fields": [
{ "name": "id", "type": "long" },
{ "name": "text", "type": "string" },
{ "name": "lang", "type": "string" },
{ "name": "keyword", "type": "string" },
{ "name": "retweet_count", "type": "int" },
{ "name": "tweet_id", "type": "string" },
{ "name": "followers_count", "type": "int" },
{ "name": "screen_name", "type": "string" },
{ "name": "friends_count", "type": "int" },
{ "name": "favourites_count", "type": "int" },
{ "name": "user_verified", "type": "boolean" },
{ "name": "timestamp", "type" :
{ "type" : "long", "logicalType" : "timestamp-millis" }
}
]
}
The mapping between NiFi Record fields and JSON is configured in dynamic properties.
screen_name -> $.user.screen_name
Next we set mapping between NiFi Record and InfluxDB measurement/tags/field/timestamp).
- Measurement -
tweets
- Fields - record field values:
tweet_id
,retweet_count
,followers_count
,friends_count
,favourites_count
,screen_name
,text
- Tags - record field values:
lang,keyword
,user_verified
- Timestamp - record field value:
timestamp
The InfluxDB has a database twitter_demo with measurement tweets and schema:
SHOW TAG KEYS ON twitter_demo FROM tweets
name: tweets
tagKey
------
keyword
lang
user_verified
SHOW FIELD KEYS ON twitter_demo
name: tweets
fieldKey fieldType
-------- ---------
favourites_count integer
followers_count integer
friends_count integer
retweet_count integer
screen_name string
text string
tweet_id string
select * from tweets
name: tweets
time favourites_count followers_count friends_count keyword lang retweet_count screen_name text
---- ---------------- --------------- ------------- ------- ---- ------------- --------------- ----
1550133996000000000 1651 304 699 truth en 0 TheeSeanH ...
1550133997000000000 10 12 66 en 0 black_vadik ...
1550133998000000000 0 22 41 BITMEX nl 0 100btcP ...
1550133998000000000 24078 1025 4894 en 0 SolarCoinNews ...
1550133999000000000 12406 474 761 en 0 Airdrop_BOMBER ...
...
This example show how to process structured metrics from Telegraf in NiFi.
The Telegraf send metrics into NiFi using SocketWriter output plugin.
Metrics data are sent as InfluxDB’s Line Protocol.
The NiFi parse Line Protocol through the org.influxdata.nifi.serialization.InfluxLineProtocolReader
and allow user to process data with Record processors (SplitRecord
, UpdateRecord
, ValidateRecord
, ...).
The metrics from monitoring Docker containers are filtered in the NiFi. NiFi container metrics are stored in InfluxDB and metrics from other containers are logged.
- ListenTelegraf - Listens for incoming TCP connections and transform incoming Line Protocol to NiFi Record
- PartitionRecord - Group incoming records by container name
- RouteOnAttribute - Routes incoming container metrics: NiFi container metrics are routed to
PutInfluxDatabaseRecord
other metrics toLogAttribute
- PutInfluxDatabaseRecord - Writes NiFi container metrics to the InfluxDB
- PutInfluxDatabaseRecord_2 - Writes NiFi container metrics to the InfluxDB 2.0
- LogAttribute - Log metrics that aren't written to the InfluxDB
The InfluxDB has a database telegraf_nifi_demo with measurements:
show measurements
name: measurements
name
----
docker_container_blkio
docker_container_cpu
docker_container_mem
docker_container_net
docker_container_status
For example the docker_container_status
measurement contains:
select * from docker_container_status
name: docker_container_status
time container_image container_name container_status container_version engine_host exitcode host maintainer oomkilled pid server_version site started_at
---- --------------- -------------- ---------------- ----------------- ----------- -------- ---- ---------- --------- --- -------------- ---- ----------
1550148042000000000 nifi nifi running unknown linuxkit-025000000001 0 0c79c2e451ca Apache NiFi <dev@nifi.apache.org> false 43685 18.09.1 https://nifi.apache.org 1550147980248481800
1550148052000000000 nifi nifi running unknown linuxkit-025000000001 0 0c79c2e451ca Apache NiFi <dev@nifi.apache.org> false 43685 18.09.1 https://nifi.apache.org 1550147980248481800
1550148062000000000 nifi nifi running unknown linuxkit-025000000001 0 0c79c2e451ca Apache NiFi <dev@nifi.apache.org> false 43685 18.09.1 https://nifi.apache.org 1550147980248481800
1550148072000000000 nifi nifi running unknown linuxkit-025000000001 0 0c79c2e451ca Apache NiFi <dev@nifi.apache.org> false 43685 18.09.1 https://nifi.apache.org 1550147980248481800
1550148082000000000 nifi nifi running unknown linuxkit-025000000001 0 0c79c2e451ca Apache NiFi <dev@nifi.apache.org> false 43685 18.09.1 https://nifi.apache.org 1550147980248481800
...
This example show how to store NiFi Records as a LineProtocol into multiple environments: InfluxDB, InfluxDB 2.0 and Kafka.
This example show how to exposing InfluxDB 2.0 data by NiFi.
The processor is configured to invoke static flux query:
from(bucket: "my-bucket")
|> range(start: 0)
|> filter(fn: (r) => r._measurement == "tweets")
|> drop(columns: ["keyword", "lang", "user_verified"])
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> limit(n:1)
|> keep(columns: ["tweet_id", "screen_name", "text"])
The result is mapped as CSV and returned as a response to incoming HTTP request.
The processor invoke a flux query that is pass as a http query parameter:
curl -i -X GET -G http://localhost:8234 \
--data-urlencode 'accept=xml' \
--data-urlencode 'query=from(bucket: "my-bucket")
|> range(start: 0) |> filter(fn: (r) => r._measurement == "docker_container_status")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> limit(n:10, offset: 0)'
The result is mapped to format that is specified in request accept
parameter.
If you would like to contribute code you can do through GitHub by forking the repository and sending a pull request into the master
branch.
InfluxDB Processors For Apache NiFi are released under the Apache License, Version 2.0.