DeviceHive Kinesis Streaming plugin written in Node.js
This plugin allows you to stream your time series data to Kinesis Data Streams and Data Firehose as it goes through DeviceHive.
-
Start DeviceHive. Detailed instructions could be found here.
-
Set up and configure your Kinesis streams.
-
Create .env file with following configs and specify type of the destination stream: direct_put or kinesis_stream. Plugin authentication process and options are described here. To start plugin you also have to specify plugin topic, DeviceHive REST or Websocket API endpoints, AWS region, access and secret keys and kinesis stream names.
ENVSEPARATOR=_ plugin_plugin_access_token=plugin.JWT.accessToken plugin_plugin_refresh_token=plugin.JWT.refreshToken plugin_user_access_token=your.JWT.accessToken plugin_user_refresh_token=your.JWT.refreshToken plugin_user_login=username plugin_user_password=password plugin_plugin_topic=plugin topic plugin_device_hive_plugin_ws_endpoint=ws://localhost:3001 plugin_device_hive_auth_service_api_url=http://localhost:8090/dh/rest kinesis_dataDestination=kinesis_stream kinesis_aws_region=region kinesis_aws_accessKeyId=access_key_id kinesis_aws_secretAccessKey=secret_access_key kinesis_custom_commandStreams=command_streams kinesis_custom_notificationStreams=notification_streams kinesis_custom_commandUpdatesStreams=command_updates_streams
-
Run
docker-compose up
. -
Issue notification through DeviceHive.
-
Observe data in your destination source.
- Start DeviceHive.
- Set up and configure your Kinesis streams.
- Configure your plugin and Kinesis connections inside plugin/plugin-config.json and kinesisConfig/config.json (see Configuration).
- Execute:
npm start
to run plugin.
plugin/plugin-config.json
— is the main config file for plugin part.
kinesisConfig/config.json
— is the main config file for AWS part.
Plugin part of configuration you can find here.
You can configure Kinesis connection in two ways:
- Share
./kinesisConfig
directory as volume with docker container; - Set configs through environment variables;
Config file example:
{
"dataDestination": "kinesis_stream",
"aws": {
"region": "",
"accessKeyId": "",
"secretAccessKey": ""
},
"custom": {
"buffering": true,
"bufferSize": 10,
"bufferTimeout": 1000,
"commandStreams": "stream-1, stream-2",
"notificationStreams": "stream-3",
"commandUpdatesStreams": "stream-4"
}
}
dataDestination
— String, kinesis_stream by default; can be direct_put for direct put to Kinesis Firehose Delivery Streams or kinesis_stream for Kinesis Data Streams;aws
— Your AWS configuration, more details can be found here or here;custom
— Object of custom configs:buffering
— Boolean, true by default; if true will buffer messages instead of immediate put. Messages will be sent as single batch by reaching max buffer size or buffer timeout;bufferSize
— Number, 10 by default; max amount of messages for one stream buffer must reach to trigger put to stream;bufferTimeout
— Number, 1000 by default; timeout in ms which must occur to trigger put to stream;commandStreams
— Comma separated strings, streams that will be used for putting commands;notificationStreams
— Array or comma separated strings, streams that will be used for putting notifications;commandUpdatesStreams
— Array or comma separated strings, streams that will be used for putting command updates.
Example configuration via environment variables:
ENVSEPARATOR=_
DEBUG=kinesismodule
plugin_plugin_refresh_token=plugin.JWT.refreshToken
plugin_plugin_topic=plugin_topic_a28fcdee-02a1-4535-a97a-f37468461872
plugin_device_hive_plugin_ws_endpoint=ws://localhost:3001
plugin_subscription_group=kinesis_plugin
kinesis_aws_region=us-east-2
kinesis_aws_accessKeyId=myAccessKey
kinesis_aws_secretAccessKey=mySecretAccessKey
kinesis_custom_buffering=true
kinesis_custom_commandStreams=stream-1, stream-2
kinesis_custom_notificationStreams=stream-3
kinesis_custom_commandUpdatesStreams=stream-1
kinesis_custom_bufferTimeout=10000
kinesis_custom_bufferSize=5
kinesis_dataDestination=kinesis_stream
In order to configure Kinesis property using environment variables please use kinesis as a prefix and defined ENVSEPARATOR for nesting.