Skip to content

Commit

Permalink
Introducing lazy connection to the broker.
Browse files Browse the repository at this point in the history
  • Loading branch information
arobenko committed Aug 29, 2024
1 parent 00f1729 commit f08770c
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 48 deletions.
4 changes: 4 additions & 0 deletions gateway/app/gateway/GatewayApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ bool GatewayApp::start(int argc, const char* argv[])
session->setClientIdReportCb(
[this, sessionPtr = session.get()](const std::string& clientId)
{
if (clientId.empty()) {
return;
}

auto iter =
std::find_if(
m_sessions.begin(), m_sessions.end(),
Expand Down
95 changes: 58 additions & 37 deletions gateway/app/gateway/GatewayIoBrokerSocket_Tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,60 @@ GatewayIoBrokerSocket_Tcp::Ptr GatewayIoBrokerSocket_Tcp::create(boost::asio::io

bool GatewayIoBrokerSocket_Tcp::startImpl()
{
// Do lazy connect of the first send
boost::asio::post(
io(),
[this]()
{
reportConnected();
}
);

return true;
}

void GatewayIoBrokerSocket_Tcp::sendDataImpl(const std::uint8_t* buf, std::size_t bufSize)
{
bool sendRightAway = (m_state == State_Connected) && m_sentData.empty();
m_sentData.emplace_back(buf, buf + bufSize);

if (sendRightAway) {
sendPendingWrites();
return;
}

if (m_state == State_Disconnected) {
doConnect();
return;
}
}

void GatewayIoBrokerSocket_Tcp::doRead()
{
assert(m_state == State_Connected);
m_socket.async_read_some(
boost::asio::buffer(m_inData),
[this](const boost::system::error_code& ec, std::size_t bytesCount)
{
if (ec == boost::asio::error::operation_aborted) {
return;
}

if (ec) {
logger().error() << "TCP/IP read error: " << ec.message() << std::endl;
reportError();
return;
}

reportClientData(m_inData.data(), bytesCount);
doRead();
}
);
}

void GatewayIoBrokerSocket_Tcp::doConnect()
{
m_state = State_TryingToConnect;
m_resolver.async_resolve(
m_addr, std::to_string(m_port),
[this](const boost::system::error_code& ec, const auto& results)
Expand Down Expand Up @@ -63,46 +117,12 @@ bool GatewayIoBrokerSocket_Tcp::startImpl()
return;
}

m_connected = true;
m_state = State_Connected;
doRead();
sendPendingWrites();
reportConnected();
}
);

});
return true;
}

void GatewayIoBrokerSocket_Tcp::sendDataImpl(const std::uint8_t* buf, std::size_t bufSize)
{
bool sendRightAway = m_connected && m_sentData.empty();
m_sentData.emplace_back(buf, buf + bufSize);
if (sendRightAway) {
sendPendingWrites();
}
}

void GatewayIoBrokerSocket_Tcp::doRead()
{
m_socket.async_read_some(
boost::asio::buffer(m_inData),
[this](const boost::system::error_code& ec, std::size_t bytesCount)
{
if (ec == boost::asio::error::operation_aborted) {
return;
}

if (ec) {
logger().error() << "TCP/IP read error: " << ec.message() << std::endl;
reportError();
return;
}

reportClientData(m_inData.data(), bytesCount);
doRead();
}
);
);
});
}

void GatewayIoBrokerSocket_Tcp::sendPendingWrites()
Expand All @@ -111,6 +131,7 @@ void GatewayIoBrokerSocket_Tcp::sendPendingWrites()
return;
}

assert(m_state == State_Connected);
auto& buf = m_sentData.front();
m_socket.async_write_some(
boost::asio::buffer(buf),
Expand Down
10 changes: 9 additions & 1 deletion gateway/app/gateway/GatewayIoBrokerSocket_Tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,27 @@ class GatewayIoBrokerSocket_Tcp final : public GatewayIoBrokerSocket
virtual void sendDataImpl(const std::uint8_t* buf, std::size_t bufSize) override;

private:
enum State
{
State_Disconnected,
State_TryingToConnect,
State_Connected,
};

using Socket = boost::asio::ip::tcp::socket;
using Resolver = boost::asio::ip::tcp::resolver;
using DataBuf = std::vector<std::uint8_t>;
using DataBufsList = std::list<DataBuf>;

void doRead();
void doConnect();
void sendPendingWrites();

Socket m_socket;
Resolver m_resolver;
std::string m_addr;
std::uint16_t m_port = 0;
bool m_connected = false;
State m_state = State_Disconnected;
DataBufsList m_sentData;
std::array<std::uint8_t, 4096> m_inData;
};
Expand Down
42 changes: 32 additions & 10 deletions gateway/app/gateway/GatewaySession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ GatewaySession::GatewaySession(
m_sessionPtr(std::make_unique<cc_mqttsn_gateway::Session>()),
m_session(m_sessionPtr.get())
{

logInfo() << "New primary session" << std::endl;
}

GatewaySession::GatewaySession(
Expand All @@ -46,11 +48,12 @@ GatewaySession::GatewaySession(
m_reconnectTimer(io),
m_session(session)
{
logInfo() << "New enapsulated session" << std::endl;
}

GatewaySession::~GatewaySession()
{
m_logger.info() << "Terminating session for client: " << m_clientId << std::endl;
logInfo() << "Terminating session for client: " << m_clientId << std::endl;
m_destructing = true;
m_fwdEncSessions.clear();
}
Expand All @@ -71,7 +74,7 @@ bool GatewaySession::start()
});

if (!m_clientSocket->start()) {
m_logger.error() << "Failed to start client socket" << std::endl;
logError() << "Failed to start client socket" << std::endl;
return false;
}
}
Expand All @@ -95,11 +98,12 @@ void GatewaySession::doBrokerConnect()
{
if (m_brokerConnected) {
m_session->setBrokerConnected(false);
m_brokerConnected = false;
}

m_brokerSocket = GatewayIoBrokerSocket::create(m_io, m_logger, m_config);
if (!m_brokerSocket) {
m_logger.error() << "Failed to allocate broker socket " << std::endl;
logError() << "Failed to allocate broker socket " << std::endl;
doTerminate();
return;
}
Expand Down Expand Up @@ -133,8 +137,8 @@ void GatewaySession::doBrokerConnect()
m_brokerSocket->setConnectedReportCb(
[this]()
{
m_session->setBrokerConnected(true);
m_brokerConnected = true;
m_session->setBrokerConnected(true);
}
);

Expand All @@ -150,7 +154,7 @@ void GatewaySession::doBrokerConnect()
});

if (!m_brokerSocket->start()) {
m_logger.error() << "Failed to start TCP/IP socket" << std::endl;
logError() << "Failed to start TCP/IP socket" << std::endl;
doTerminate();
return;
}
Expand Down Expand Up @@ -210,10 +214,10 @@ bool GatewaySession::startSession()
}
);


m_session->setSendDataBrokerReqCb(
[this](const std::uint8_t* buf, std::size_t bufSize)
{
m_hadBrokerData = true;
assert(m_brokerSocket);
m_brokerSocket->sendData(buf, bufSize);
});
Expand All @@ -227,7 +231,7 @@ bool GatewaySession::startSession()
m_session->setClientConnectedReportCb(
[this](const std::string& clientId)
{
m_logger.info() << "Connected client: " << clientId << std::endl;
logInfo() << "Connected client: " << clientId << std::endl;

m_clientId = clientId;
assert(m_clientIdReportCb);
Expand Down Expand Up @@ -303,6 +307,10 @@ bool GatewaySession::startSession()
assert(m_clientIdReportCb);
m_clientIdReportCb(clientId);

if (m_clientId.empty()) {
return;
}

auto iter =
std::find_if(
m_fwdEncSessions.begin(), m_fwdEncSessions.end(),
Expand All @@ -325,7 +333,7 @@ bool GatewaySession::startSession()
});

if (!sessionPtr->start()) {
m_logger.error() << "Failed to start forwarder encapsulated session" << std::endl;
logError() << "Failed to start forwarder encapsulated session" << std::endl;
return false;
}

Expand Down Expand Up @@ -360,13 +368,17 @@ bool GatewaySession::startSession()
}

m_fwdEncSessions.erase(iter);

if (m_fwdEncSessions.empty() && (!m_hadBrokerData)) {
doTerminate();
}
});
});

m_session->setErrorReportCb(
[this](const char* msg)
{
m_logger.error() << msg << std::endl;
logError() << msg << std::endl;
});

if (!m_sessionPtr) {
Expand All @@ -390,7 +402,7 @@ bool GatewaySession::startSession()


if (!m_session->start()) {
m_logger.error() << "Failed to start client session" << std::endl;
logError() << "Failed to start client session" << std::endl;
return false;
}

Expand Down Expand Up @@ -465,4 +477,14 @@ GatewaySession::AuthInfo GatewaySession::getAuthInfoFor(const std::string& clien
return std::make_pair(iter->username, std::move(data));
}

std::ostream& GatewaySession::logError()
{
return (m_logger.error() << "(" << this << ") ");
}

std::ostream& GatewaySession::logInfo()
{
return (m_logger.info() << "(" << this << ") ");
}

} // namespace cc_mqttsn_gateway_app
4 changes: 4 additions & 0 deletions gateway/app/gateway/GatewaySession.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class GatewaySession
void doBrokerReconnect();
bool startSession();
AuthInfo getAuthInfoFor(const std::string& clientId);

std::ostream& logError();
std::ostream& logInfo();

boost::asio::io_context& m_io;
GatewayLogger& m_logger;
Expand All @@ -97,6 +100,7 @@ class GatewaySession
std::list<Ptr> m_fwdEncSessions;
bool m_brokerConnected = false;
bool m_destructing = false;
bool m_hadBrokerData = false;
};

using GatewaySessionPtr = GatewaySession::Ptr;
Expand Down

0 comments on commit f08770c

Please sign in to comment.