Skip to content

Commit

Permalink
First version of mqtt client support.
Browse files Browse the repository at this point in the history
Adds
- ENABLE_MQTT cmake option. Defaults to On but turns off automatically
  if libmosquitto not found.
- basic mqtt client support using libmosquitto
  - topics generated are:
    vzlogger/<channel-id>/uuid
    vzlogger/<channel-id>/raw
    vzlogger/<channel-id>/agg
- added config options for
  - host
  - port (currently no tls/sll support!)
  - user
  - password
  - topic (prefix used instead of vzlogger in above example)
  - (some more, see etc/vzlogger.conf)
- agg values get's preferred instead of raw. Using config option
  rawAndAgg this can be changed.
  • Loading branch information
mbehr1 committed Aug 4, 2018
1 parent 6b6daaa commit a343483
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 12 deletions.
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ dist: trusty

language: cpp

compiler:
- clang
compiler:
- clang
- gcc

env:
Expand Down Expand Up @@ -36,11 +36,11 @@ before_install:
- sudo apt-get install -y libcurl4-openssl-dev openssl libmicrohttpd-dev uuid-dev uuid-runtime libunistring-dev
# -- get libsml --
- git clone https://github.com/volkszaehler/libsml.git # or github.com/TheCount/libsml.git # or https://github.com/dailab/libsml.git
- cd libsml
- cd libsml
- # git checkout develop # only dev branch seems to work
- make
# -- install libsml --
- sudo cp sml/lib/libsml.* /usr/lib/.
- sudo cp sml/lib/libsml.* /usr/lib/.
- sudo cp -R sml/include/* /usr/include/.
- sudo cp sml.pc /usr/lib/pkgconfig/.
- cd ..
Expand Down
14 changes: 14 additions & 0 deletions etc/vzlogger.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@
}
],

// mqtt client support (if ENABLE_MQTT set at cmake generation)
"mqtt": {
"enabled": false, // enable mqtt client. needs host and port as well
"host": "test.mosquitto.org", // mqtt server addr
"port": 1883, // 1883 for unencrypted, 8883 enc, 8884 enc cert needed,
"keepalive": 30, // optional keepalive in seconds.
"topic": "vzlogger/data", // optional topic dont use $ at start and no / at end
"user": "", // optional user name for the mqtt server
"pass": "", // optional password for the mqtt server
"retain": false, // optional use retain message flag
"rawAndAgg": false, // optional publish raw values even if agg mode is used
},


// Meter configuration
"meters": [
{
Expand Down
1 change: 1 addition & 0 deletions include/Buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Buffer {
inline void have_newValues() { _newValues = true; }

inline void set_aggmode(Buffer::aggmode m) {_aggmode=m;}
inline aggmode get_aggmode() const { return _aggmode; }

private:
Buffer(const Buffer &); // don't allow copy constructor
Expand Down
2 changes: 1 addition & 1 deletion include/Channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class Channel {

bool running() const { return _thread_running; }

const char* name() { return _name.c_str(); }
const char* name() const { return _name.c_str(); }
std::list<Option> &options() { return _options; }

ReadingIdentifier::Ptr identifier() {
Expand Down
65 changes: 65 additions & 0 deletions include/mqtt.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Author: Matthias Behr, mbehr (a) mcbehr dot de
* (c) 2018
* */

#ifndef __mqtt_hpp_
#define __mqtt_hpp_

#include <string>
#include <unordered_map>
#include "Channel.hpp"
#include "Reading.hpp"

struct mosquitto; // forward decl. to avoid pulling the header here

class MqttClient
{
public:
MqttClient(struct json_object *option);
MqttClient() = delete; // no default constr.
MqttClient(const MqttClient &) = delete; // no copy constr.
~MqttClient();
bool isConfigured() const;

void publish(Channel::Ptr ch, Reading &rds, bool aggregate = false); // thread safe, non blocking
protected:
friend void *mqtt_client_thread(void *);
void connect_callback(struct mosquitto *mosq, int result);
void disconnect_callback(struct mosquitto *mosq, int result);
void message_callback(struct mosquitto *mosq, const struct mosquitto_message *msg);

bool _enabled;
std::string _host;
int _port = 0;
int _keepalive = 10;
std::string _user;
std::string _pwd;
bool _retain = false;
bool _rawAndAgg = false;
std::string _topic;

bool _isConnected = false;

struct mosquitto *_mcs = nullptr; // mosquitto client session data

struct ChannelEntry
{
bool _announced = false;
bool _sendRaw = true;
bool _sendAgg = true;
std::string _fullTopicRaw;
std::string _fullTopicAgg;
std::string _announceName;
std::string _announceValue;
void generateNames(const std::string &prefix, Channel &ch);
};
std::unordered_map<std::string, ChannelEntry> _chMap;
};

extern MqttClient *mqttClient;

void *mqtt_client_thread(void *arg);
void end_mqtt_client_thread(); // notifies the thread to stop. does not wait for the thread

#endif
19 changes: 15 additions & 4 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@ else(LOCAL_SUPPORT)
set(local_srcs "")
endif(LOCAL_SUPPORT)

if(ENABLE_MQTT)
set(mqtt_srcs mqtt.cpp)
else(ENABLE_MQTT)
set(mqtt_srcs "")
endif(ENABLE_MQTT)

configure_file("${CMAKE_SOURCE_DIR}/src/gitSha1.cpp.in" "${CMAKE_BINARY_DIR}/gitSha1.cpp" @ONLY)

set(vzlogger_srcs
vzlogger.cpp
vzlogger.cpp
ltqnorm.cpp
Meter.cpp
${CMAKE_BINARY_DIR}/gitSha1.cpp
CurlSessionProvider.cpp
PushData.cpp ../include/PushData.hpp
)

set(libvz_srcs
set(libvz_srcs
Channel.cpp
Config_Options.cpp
threads.cpp
Expand All @@ -37,6 +43,7 @@ set(libvz_srcs
Reading.cpp
exception.cpp
${local_srcs}
${mqtt_srcs}
MeterMap.cpp
)

Expand All @@ -58,6 +65,10 @@ if(LOCAL_SUPPORT)
target_link_libraries(vzlogger ${MICROHTTPD_LIBRARY})
endif(LOCAL_SUPPORT)

if(ENABLE_MQTT)
target_link_libraries(vzlogger ${MQTT_LIBRARY})
endif(ENABLE_MQTT)

target_link_libraries(vzlogger ${LIBGCRYPT})
target_link_libraries(vzlogger pthread m ${LIBUUID})
target_link_libraries(vzlogger dl)
Expand All @@ -72,8 +83,8 @@ if( TARGET )
endif( TARGET )
target_link_libraries(vzlogger ${CURL_STATIC_LIBRARIES} ${CURL_LIBRARIES} unistring ${GNUTLS_LIBRARIES} ${OPENSSL_LIBRARIES} )

# add programs to the install target
INSTALL(PROGRAMS
# add programs to the install target
INSTALL(PROGRAMS
${CMAKE_CURRENT_BINARY_DIR}/vzlogger
DESTINATION bin)
install(TARGETS vz
Expand Down
21 changes: 20 additions & 1 deletion src/Config_Options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
#include <ctype.h>
#include <regex>

#include "config.hpp"
#include <Config_Options.hpp>
#include "Channel.hpp"
#include <VZException.hpp>

#ifdef ENABLE_MQTT
#include "mqtt.hpp"
#endif

static const char *option_type_str[] = { "null", "boolean", "double", "int", "object", "array", "string" };

Expand Down Expand Up @@ -184,6 +187,22 @@ void Config_Options::config_parse(
} else
print(log_error, "Ignoring push entry due to empty array or duplicate section", "push");
}
#ifdef ENABLE_MQTT
else if ((strcmp(key, "mqtt") == 0) && type == json_type_object ) {
if (!mqttClient)
{
mqttClient = new MqttClient(value);
if (!mqttClient->isConfigured())
{
delete mqttClient;
mqttClient = 0;
print(log_debug, "mqtt client not configured. stopped.", "mqtt");
}
}
else
print(log_error, "Ignoring mqtt entry due to empty array or duplicate section", "mqtt");
}
#endif
else {
print(log_alert, "Ignoring invalid field or type: %s=%s (%s)",
NULL, key, json_object_get_string(value), option_type_str[type]);
Expand Down
Loading

0 comments on commit a343483

Please sign in to comment.