Skip to content

Commit

Permalink
Added publish client application.
Browse files Browse the repository at this point in the history
  • Loading branch information
arobenko committed Aug 14, 2024
1 parent ec777dd commit e93eac7
Show file tree
Hide file tree
Showing 14 changed files with 460 additions and 31 deletions.
2 changes: 1 addition & 1 deletion client/app/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ set (COMMON_APPS_LIB "cc_mqttsn_client_apps_lib")

add_subdirectory (common)
add_subdirectory (gw_discover)
#add_subdirectory (pub)
add_subdirectory (pub)
#add_subdirectory (sub)
120 changes: 105 additions & 15 deletions client/app/common/AppClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ bool AppClient::start(int argc, const char* argv[])
return false;
}

if (m_opts.connectNoCleanSession()) {
auto ec = cc_mqttsn_client_set_verify_incoming_msg_subscribed(m_client.get(), false);
if (ec != CC_MqttsnErrorCode_Success) {
logError() << "Failed to disable incoming message subscribed verification" << std::endl;
}
}

return startImpl();
}

Expand Down Expand Up @@ -187,6 +194,11 @@ std::ostream& AppClient::logError()
return std::cerr << "ERROR: ";
}

std::ostream& AppClient::logInfo()
{
return std::cout << "INFO: ";
}

void AppClient::doTerminate(int result)
{
m_result = result;
Expand All @@ -195,15 +207,6 @@ void AppClient::doTerminate(int result)

void AppClient::doComplete()
{
// if (m_opts.willTopic().empty()) {
// auto ec = ::cc_mqttsn_client_disconnect(m_client.get());
// if (ec != CC_MqttsnErrorCode_Success) {
// logError() << "Failed to send disconnect with ec=" << toString(ec) << std::endl;
// doTerminate();
// return;
// }
// }

boost::asio::post(
m_io,
[this]()
Expand All @@ -212,6 +215,45 @@ void AppClient::doComplete()
});
}

bool AppClient::doConnect()
{
auto config = CC_MqttsnConnectConfig();
cc_mqttsn_client_connect_init_config(&config);

auto clientId = m_opts.connectClientId();
if (!clientId.empty()) {
config.m_clientId = clientId.c_str();
}

config.m_duration = m_opts.connectKeepAlive();
config.m_cleanSession = !m_opts.connectNoCleanSession();

auto willConfig = CC_MqttsnWillConfig();
CC_MqttsnWillConfig* willConfigPtr = nullptr;
auto willTopic = m_opts.willTopic();
auto willData = parseBinaryData(m_opts.willMessage());
if (!willTopic.empty()) {
cc_mqttsn_client_connect_init_config_will(&willConfig);
willConfig.m_topic = willTopic.c_str();
willConfig.m_data = willData.data();
willConfig.m_dataLen = static_cast<decltype(willConfig.m_dataLen)>(willData.size());
willConfig.m_qos = static_cast<decltype(willConfig.m_qos)>(m_opts.willQos());
willConfigPtr = &willConfig;
}

if (m_opts.verbose()) {
logInfo() << "Attempting connection" << std::endl;
}

auto ec = cc_mqttsn_client_connect(m_client.get(), &config, willConfigPtr, &AppClient::connectCompleteCb, this);
if (ec != CC_MqttsnErrorCode_Success) {
logError() << "Failed to initiate connection to the gateway" << std::endl;
return false;
}

return true;
}

bool AppClient::startImpl()
{
return true;
Expand All @@ -222,6 +264,10 @@ void AppClient::messageReceivedImpl([[maybe_unused]] const CC_MqttsnMessageInfo*
{
}

void AppClient::connectCompleteImpl()
{
}

std::vector<std::uint8_t> AppClient::parseBinaryData(const std::string& val)
{
std::vector<std::uint8_t> result;
Expand Down Expand Up @@ -308,6 +354,28 @@ std::string AppClient::toString(CC_MqttsnAsyncOpStatus val)
return Map[idx] + " (" + std::to_string(val) + ')';
}


std::string AppClient::toString(CC_MqttsnReturnCode val)
{
static const std::string Map[] = {
/* CC_MqttsnReturnCode_Accepted */ "Accepted",
/* CC_MqttsnReturnCode_Conjestion */ "Conjestion",
/* CC_MqttsnReturnCode_InvalidTopicId */ "Invalid Topic ID",
/* CC_MqttsnReturnCode_NotSupported */ "Not supported",
};

static constexpr std::size_t MapSize = std::extent<decltype(Map)>::value;
static_assert(MapSize == CC_MqttsnReturnCode_ValuesLimit);

auto idx = static_cast<unsigned>(val);
if (MapSize <= idx) {
assert(false); // Should not happen
return std::to_string(val);
}

return Map[idx] + " (" + std::to_string(val) + ')';
}

void AppClient::nextTickProgramInternal(unsigned duration)
{
m_lastWaitProgram = Clock::now();
Expand Down Expand Up @@ -354,15 +422,10 @@ bool AppClient::createSession()
}

m_session->setDataReportCb(
[this](const Addr& addr, const std::uint8_t* buf, std::size_t bufLen)
[this](const std::uint8_t* buf, std::size_t bufLen, const Addr& addr, CC_MqttsnDataOrigin origin)
{
assert(m_client);
m_lastAddr = addr;
auto origin = CC_MqttsnDataOrigin_Any;
if (addr == m_gwAddr) {
origin = CC_MqttsnDataOrigin_ConnectedGw;
}

::cc_mqttsn_client_process_data(m_client.get(), buf, static_cast<unsigned>(bufLen), origin);
});

Expand All @@ -382,6 +445,28 @@ bool AppClient::createSession()
return true;
}

void AppClient::connectCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info)
{
if (status != CC_MqttsnAsyncOpStatus_Complete) {
logError() << "Failed to connect with status: " << toString(status) << std::endl;
doTerminate();
return;
}

assert(info != nullptr);
if (info->m_returnCode != CC_MqttsnReturnCode_Accepted) {
logError() << "Connection rejected with return code: " << toString(info->m_returnCode) << std::endl;
doTerminate();
return;
}

if (m_opts.verbose()) {
logInfo() << "Connection established" << std::endl;
}

connectCompleteImpl();
}

void AppClient::sendDataCb(void* data, const unsigned char* buf, unsigned bufLen, unsigned broadcastRadius)
{
asThis(data)->sendDataInternal(buf, bufLen, broadcastRadius);
Expand All @@ -407,4 +492,9 @@ unsigned AppClient::cancelNextTickWaitCb(void* data)
return asThis(data)->cancelNextTickWaitInternal();
}

void AppClient::connectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info)
{
return asThis(data)->connectCompleteInternal(status, info);
}

} // namespace cc_mqttsn_client_app
12 changes: 6 additions & 6 deletions client/app/common/AppClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ class AppClient
}

static std::ostream& logError();
static std::ostream& logInfo();

void doTerminate(int result = 1);
void doComplete();
bool doConnect();

virtual bool startImpl();
virtual void messageReceivedImpl(const CC_MqttsnMessageInfo* info);
virtual void connectCompleteImpl();

static std::vector<std::uint8_t> parseBinaryData(const std::string& val);

Expand All @@ -74,12 +77,8 @@ class AppClient
return m_lastAddr;
}

void setGwAddr(const Addr& addr)
{
m_gwAddr = addr;
}

static std::string toString(CC_MqttsnAsyncOpStatus val);
static std::string toString(CC_MqttsnReturnCode val);

private:
using ClientPtr = std::unique_ptr<CC_MqttsnClient, ClientDeleter>;
Expand All @@ -90,12 +89,14 @@ class AppClient
unsigned cancelNextTickWaitInternal();
void sendDataInternal(const unsigned char* buf, unsigned bufLen, unsigned broadcastRadius);
bool createSession();
void connectCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info);

static void sendDataCb(void* data, const unsigned char* buf, unsigned bufLen, unsigned broadcastRadius);
static void messageReceivedCb(void* data, const CC_MqttsnMessageInfo* info);
static void logMessageCb(void* data, const char* msg);
static void nextTickProgramCb(void* data, unsigned duration);
static unsigned cancelNextTickWaitCb(void* data);
static void connectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info);

boost::asio::io_context& m_io;
int& m_result;
Expand All @@ -105,7 +106,6 @@ class AppClient
ClientPtr m_client;
SessionPtr m_session;
Addr m_lastAddr;
Addr m_gwAddr;
};

} // namespace cc_mqttsn_client_app
96 changes: 95 additions & 1 deletion client/app/common/ProgramOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace
{

constexpr std::uint16_t DefaultPort = 1883U;
constexpr unsigned DefaultKeepAlive = 60;

} // namespace

Expand All @@ -39,7 +40,7 @@ void ProgramOptions::addNetwork()
opts.add_options()
("network-gateway,g", po::value<std::string>()->default_value("127.0.0.1"), "Gateway address to connect to")
("network-broadcast,b", po::value<std::string>()->default_value("255.255.255.255"), "Address to broadcast to")
("network-port,p", po::value<std::uint16_t>()->default_value(DefaultPort), "Network remove port")
("network-port,p", po::value<std::uint16_t>()->default_value(DefaultPort), "Network remote port")
("network-local-port,P", po::value<std::uint16_t>()->default_value(0), "Network local port")
;

Expand All @@ -57,6 +58,44 @@ void ProgramOptions::addDiscover()
m_desc.add(opts);
}

void ProgramOptions::addConnect()
{
po::options_description opts("Connect Options");
opts.add_options()
("connect-client-id,c", po::value<std::string>()->default_value(std::string()), "Client ID")
("connect-keep-alive,k", po::value<unsigned>()->default_value(DefaultKeepAlive), "Protocol \"keep alive\" configuration")
("connect-no-clean-session,l", "Do not force clean session upon connection")
;

m_desc.add(opts);
}

void ProgramOptions::addWill()
{
po::options_description opts("Will Options");
opts.add_options()
("will-topic", po::value<std::string>()->default_value(std::string()), "Will topic, when not provided means no will")
("will-message", po::value<std::string>()->default_value(std::string()), "Will message data, use \"\\x\" prefix before hex value of each byte for binary string")
("will-qos", po::value<unsigned>()->default_value(0U), "Will QoS value")
;

m_desc.add(opts);
}

void ProgramOptions::addPublish()
{
po::options_description opts("Publish Options");
opts.add_options()
("pub-topic,t", po::value<std::string>()->default_value(std::string()), "Publish topic, must be empty when topic ID is not 0")
("pub-topic-id,i", po::value<std::uint16_t>()->default_value(0U), "Publish topic id, must be 0 when topic is specified")
("pub-message,m", po::value<std::string>()->default_value(std::string()), "Publish message data, use \"\\x\" prefix before hex value of each byte for binary string")
("pub-qos,q", po::value<unsigned>()->default_value(0U), "Publish QoS value")
("pub-retain", "Publish retained message")
;

m_desc.add(opts);
}

void ProgramOptions::printHelp()
{
std::cout << m_desc << std::endl;
Expand Down Expand Up @@ -116,4 +155,59 @@ unsigned ProgramOptions::discoverTimeout() const
return m_vm["discover-timeout"].as<unsigned>();
}

std::string ProgramOptions::connectClientId() const
{
return m_vm["connect-client-id"].as<std::string>();
}

unsigned ProgramOptions::connectKeepAlive() const
{
return m_vm["connect-keep-alive"].as<unsigned>();
}

bool ProgramOptions::connectNoCleanSession() const
{
return m_vm.count("connect-no-clean-session") > 0U;
}

std::string ProgramOptions::willTopic() const
{
return m_vm["will-topic"].as<std::string>();
}

std::string ProgramOptions::willMessage() const
{
return m_vm["will-message"].as<std::string>();
}

unsigned ProgramOptions::willQos() const
{
return m_vm["will-qos"].as<unsigned>();
}

std::string ProgramOptions::pubTopic() const
{
return m_vm["pub-topic"].as<std::string>();
}

std::uint16_t ProgramOptions::pubTopicId() const
{
return m_vm["pub-topic-id"].as<std::uint16_t>();
}

std::string ProgramOptions::pubMessage() const
{
return m_vm["pub-message"].as<std::string>();
}

unsigned ProgramOptions::pubQos() const
{
return m_vm["pub-qos"].as<unsigned>();
}

bool ProgramOptions::pubRetain() const
{
return m_vm.count("pub-retain") > 0U;
}

} // namespace cc_mqttsn_client_app
Loading

0 comments on commit e93eac7

Please sign in to comment.