Skip to content

Commit

Permalink
Added client subscribe application.
Browse files Browse the repository at this point in the history
  • Loading branch information
arobenko committed Aug 14, 2024
1 parent e93eac7 commit c07ad63
Show file tree
Hide file tree
Showing 14 changed files with 447 additions and 94 deletions.
2 changes: 1 addition & 1 deletion client/app/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ set (COMMON_APPS_LIB "cc_mqttsn_client_apps_lib")
add_subdirectory (common)
add_subdirectory (gw_discover)
add_subdirectory (pub)
#add_subdirectory (sub)
add_subdirectory (sub)
202 changes: 116 additions & 86 deletions client/app/common/AppClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,44 +27,34 @@ AppClient* asThis(void* data)
return reinterpret_cast<AppClient*>(data);
}

// std::string toString(CC_MqttsnQoS val)
// {
// static const std::string Map[] = {
// /* CC_MqttsnQoS_AtMostOnceDelivery */ "QoS0 - At Most Once Delivery",
// /* CC_MqttsnQoS_AtLeastOnceDelivery */ "QoS1 - At Least Once Delivery",
// /* CC_MqttsnQoS_ExactlyOnceDelivery */ "QoS2 - Exactly Once Delivery",
// };
// static constexpr std::size_t MapSize = std::extent<decltype(Map)>::value;
// static_assert(MapSize == CC_MqttsnQoS_ValuesLimit);

// auto idx = static_cast<unsigned>(val);
// if (MapSize <= idx) {
// assert(false); // Should not happen
// return std::to_string(val);
// }

// return Map[idx] + " (" + std::to_string(val) + ')';
// }

// void printQos(const char* prefix, CC_MqttsnQoS val)
// {
// std::cout << "\t" << prefix << ": " << toString(val) << '\n';
// }

// void printBool(const char* prefix, bool val)
// {
// std::cout << '\t' << prefix << ": " << std::boolalpha << val << '\n';
// }

// void printConnectReturnCode(CC_MqttsnConnectReturnCode val)
// {
// std::cout << "\tReturn Code: " << AppClient::toString(val) << '\n';
// }

// void printSubscribeReturnCode(CC_MqttsnSubscribeReturnCode val)
// {
// std::cout << "\tReturn Code: " << AppClient::toString(val) << '\n';
// }
std::string toString(CC_MqttsnQoS val)
{
static const std::string Map[] = {
/* CC_MqttsnQoS_AtMostOnceDelivery */ "QoS0 - At Most Once Delivery",
/* CC_MqttsnQoS_AtLeastOnceDelivery */ "QoS1 - At Least Once Delivery",
/* CC_MqttsnQoS_ExactlyOnceDelivery */ "QoS2 - Exactly Once Delivery",
};
static constexpr std::size_t MapSize = std::extent<decltype(Map)>::value;
static_assert(MapSize == CC_MqttsnQoS_ValuesLimit);

auto idx = static_cast<unsigned>(val);
if (MapSize <= idx) {
assert(false); // Should not happen
return std::to_string(val);
}

return Map[idx] + " (" + std::to_string(val) + ')';
}

void printQos(const char* prefix, CC_MqttsnQoS val)
{
std::cout << "\t" << prefix << ": " << toString(val) << '\n';
}

void printBool(const char* prefix, bool val)
{
std::cout << '\t' << prefix << ": " << std::boolalpha << val << '\n';
}

} // namespace

Expand Down Expand Up @@ -128,52 +118,52 @@ std::string AppClient::toString(CC_MqttsnErrorCode val)
return Map[idx] + " (" + std::to_string(val) + ')';
}

// std::string AppClient::toString(const std::uint8_t* data, unsigned dataLen, bool forceBinary)
// {
// bool binary = forceBinary;
// if (!binary) {
// binary =
// std::any_of(
// data, data + dataLen,
// [](auto byte)
// {
// if (std::isprint(static_cast<int>(byte)) == 0) {
// return true;
// }

// if (byte > 0x7e) {
// return true;
// }

// return false;
// });
// }

// if (!binary) {
// return std::string(reinterpret_cast<const char*>(data), dataLen);
// }

// std::stringstream stream;
// stream << std::hex;
// for (auto idx = 0U; idx < dataLen; ++idx) {
// stream << std::setw(2) << std::setfill('0') << static_cast<unsigned>(data[idx]) << ' ';
// }
// return stream.str();
// }

// void AppClient::print(const CC_MqttsnMessageInfo& info, bool printMessage)
// {
// std::cout << "[INFO]: Message Info:\n";
// if (printMessage) {
// std::cout <<
// "\tTopic: " << info.m_topic << '\n' <<
// "\tData: " << toString(info.m_data, info.m_dataLen, m_opts.subBinary()) << '\n';
// }

// printQos("QoS", info.m_qos);
// printBool("Retained", info.m_retained);
// std::cout << std::endl;
// }
std::string AppClient::toString(const std::uint8_t* data, unsigned dataLen, bool forceBinary)
{
bool binary = forceBinary;
if (!binary) {
binary =
std::any_of(
data, data + dataLen,
[](auto byte)
{
if (std::isprint(static_cast<int>(byte)) == 0) {
return true;
}

if (byte > 0x7e) {
return true;
}

return false;
});
}

if (!binary) {
return std::string(reinterpret_cast<const char*>(data), dataLen);
}

std::stringstream stream;
stream << std::hex;
for (auto idx = 0U; idx < dataLen; ++idx) {
stream << std::setw(2) << std::setfill('0') << static_cast<unsigned>(data[idx]) << ' ';
}
return stream.str();
}

void AppClient::print(const CC_MqttsnMessageInfo& info, bool printMessage)
{
std::cout << "[INFO]: Message Info:\n";
if (printMessage) {
std::cout <<
"\tTopic: " << info.m_topic << '\n' <<
"\tData: " << toString(info.m_data, info.m_dataLen, m_opts.subBinary()) << '\n';
}

printQos("QoS", info.m_qos);
printBool("Retained", info.m_retained);
std::cout << std::endl;
}

AppClient::AppClient(boost::asio::io_context& io, int& result) :
m_io(io),
Expand All @@ -191,12 +181,12 @@ AppClient::AppClient(boost::asio::io_context& io, int& result) :

std::ostream& AppClient::logError()
{
return std::cerr << "ERROR: ";
return std::cerr << "[ERROR] ";
}

std::ostream& AppClient::logInfo()
{
return std::cout << "INFO: ";
return std::cout << "[INFO] ";
}

void AppClient::doTerminate(int result)
Expand Down Expand Up @@ -254,6 +244,21 @@ bool AppClient::doConnect()
return true;
}

bool AppClient::doDisconnect()
{
if (m_opts.verbose()) {
logInfo() << "Attempting disconnection" << std::endl;
}

auto ec = cc_mqttsn_client_disconnect(m_client.get(), &AppClient::disconnectCompleteCb, this);
if (ec != CC_MqttsnErrorCode_Success) {
logError() << "Failed to initiate disconnection from the gateway" << std::endl;
return false;
}

return true;
}

bool AppClient::startImpl()
{
return true;
Expand All @@ -268,6 +273,11 @@ void AppClient::connectCompleteImpl()
{
}

void AppClient::disconnectCompleteImpl()
{
doComplete();
}

std::vector<std::uint8_t> AppClient::parseBinaryData(const std::string& val)
{
std::vector<std::uint8_t> result;
Expand Down Expand Up @@ -467,6 +477,21 @@ void AppClient::connectCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_
connectCompleteImpl();
}

void AppClient::disconnectCompleteInternal(CC_MqttsnAsyncOpStatus status)
{
if (status != CC_MqttsnAsyncOpStatus_Complete) {
logError() << "Failed to disconnect with status: " << toString(status) << std::endl;
doTerminate();
return;
}

if (m_opts.verbose()) {
logInfo() << "Disconnected" << std::endl;
}

disconnectCompleteImpl();
}

void AppClient::sendDataCb(void* data, const unsigned char* buf, unsigned bufLen, unsigned broadcastRadius)
{
asThis(data)->sendDataInternal(buf, bufLen, broadcastRadius);
Expand Down Expand Up @@ -497,4 +522,9 @@ void AppClient::connectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status, con
return asThis(data)->connectCompleteInternal(status, info);
}

void AppClient::disconnectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status)
{
return asThis(data)->disconnectCompleteInternal(status);
}

} // namespace cc_mqttsn_client_app
7 changes: 6 additions & 1 deletion client/app/common/AppClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class AppClient

static std::string toString(CC_MqttsnErrorCode val);
//static std::string toString(CC_MqttsnAsyncOpStatus val);
//void print(const CC_MqttsnMessageInfo& info, bool printMessage = true);
static std::string toString(const std::uint8_t* data, unsigned dataLen, bool forceBinary = false);
void print(const CC_MqttsnMessageInfo& info, bool printMessage = true);

protected:
using Timer = boost::asio::steady_timer;
Expand Down Expand Up @@ -65,10 +66,12 @@ class AppClient
void doTerminate(int result = 1);
void doComplete();
bool doConnect();
bool doDisconnect();

virtual bool startImpl();
virtual void messageReceivedImpl(const CC_MqttsnMessageInfo* info);
virtual void connectCompleteImpl();
virtual void disconnectCompleteImpl();

static std::vector<std::uint8_t> parseBinaryData(const std::string& val);

Expand All @@ -90,13 +93,15 @@ class AppClient
void sendDataInternal(const unsigned char* buf, unsigned bufLen, unsigned broadcastRadius);
bool createSession();
void connectCompleteInternal(CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info);
void disconnectCompleteInternal(CC_MqttsnAsyncOpStatus status);

static void sendDataCb(void* data, const unsigned char* buf, unsigned bufLen, unsigned broadcastRadius);
static void messageReceivedCb(void* data, const CC_MqttsnMessageInfo* info);
static void logMessageCb(void* data, const char* msg);
static void nextTickProgramCb(void* data, unsigned duration);
static unsigned cancelNextTickWaitCb(void* data);
static void connectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info);
static void disconnectCompleteCb(void* data, CC_MqttsnAsyncOpStatus status);

boost::asio::io_context& m_io;
int& m_result;
Expand Down
55 changes: 55 additions & 0 deletions client/app/common/ProgramOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ void ProgramOptions::addPublish()
("pub-message,m", po::value<std::string>()->default_value(std::string()), "Publish message data, use \"\\x\" prefix before hex value of each byte for binary string")
("pub-qos,q", po::value<unsigned>()->default_value(0U), "Publish QoS value")
("pub-retain", "Publish retained message")
("pub-no-disconnect", "Do not gracefuly disconnect when publish is complete")
;

m_desc.add(opts);
}

void ProgramOptions::addSubscribe()
{
po::options_description opts("Subscribe Options");
opts.add_options()
("sub-topic,t", po::value<std::vector<std::string>>(), "Subscribe to topic, can be used multiple times.")
("sub-topic-id,i", po::value<std::vector<std::uint16_t>>(), "Subscribe topic id, can be used multiple times")
("sub-qos,q", po::value<unsigned>()->default_value(2U), "Subscribe max QoS value")
("sub-no-retained", "Ignore retained messages")
("sub-binary", "Force binary output of the received message data")
;

m_desc.add(opts);
Expand Down Expand Up @@ -210,4 +225,44 @@ bool ProgramOptions::pubRetain() const
return m_vm.count("pub-retain") > 0U;
}

bool ProgramOptions::pubNoDisconnect() const
{
return m_vm.count("pub-no-disconnect") > 0U;
}

std::vector<std::string> ProgramOptions::subTopics() const
{
std::vector<std::string> result;
if (m_vm.count("sub-topic") > 0U) {
result = m_vm["sub-topic"].as<std::vector<std::string>>();
}

return result;
}

std::vector<std::uint16_t> ProgramOptions::subTopicIds() const
{
std::vector<std::uint16_t> result;
if (m_vm.count("sub-topic-id") > 0U) {
result = m_vm["sub-topic-id"].as<std::vector<std::uint16_t>>();
}

return result;
}

unsigned ProgramOptions::subQos() const
{
return m_vm["sub-qos"].as<unsigned>();
}

bool ProgramOptions::subNoRetained() const
{
return m_vm.count("sub-no-retained") > 0U;
}

bool ProgramOptions::subBinary() const
{
return m_vm.count("sub-binary") > 0U;
}

} // namespace cc_mqttsn_client_app
11 changes: 10 additions & 1 deletion client/app/common/ProgramOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ProgramOptions
void addConnect();
void addWill();
void addPublish();
void addSubscribe();

void printHelp();

Expand Down Expand Up @@ -68,7 +69,15 @@ class ProgramOptions
std::uint16_t pubTopicId() const;
std::string pubMessage() const;
unsigned pubQos() const;
bool pubRetain() const;
bool pubRetain() const;
bool pubNoDisconnect() const;

// Subscribe Options
std::vector<std::string> subTopics() const;
std::vector<std::uint16_t> subTopicIds() const;
unsigned subQos() const;
bool subNoRetained() const;
bool subBinary() const;

private:
boost::program_options::variables_map m_vm;
Expand Down
Loading

0 comments on commit c07ad63

Please sign in to comment.