diff --git a/gateway/app/gateway/GatewayApp.cpp b/gateway/app/gateway/GatewayApp.cpp index 0be3e5e..1b7ba8b 100644 --- a/gateway/app/gateway/GatewayApp.cpp +++ b/gateway/app/gateway/GatewayApp.cpp @@ -90,6 +90,10 @@ bool GatewayApp::start(int argc, const char* argv[]) session->setClientIdReportCb( [this, sessionPtr = session.get()](const std::string& clientId) { + if (clientId.empty()) { + return; + } + auto iter = std::find_if( m_sessions.begin(), m_sessions.end(), diff --git a/gateway/app/gateway/GatewayIoBrokerSocket_Tcp.cpp b/gateway/app/gateway/GatewayIoBrokerSocket_Tcp.cpp index 24713df..2c400bf 100644 --- a/gateway/app/gateway/GatewayIoBrokerSocket_Tcp.cpp +++ b/gateway/app/gateway/GatewayIoBrokerSocket_Tcp.cpp @@ -28,6 +28,60 @@ GatewayIoBrokerSocket_Tcp::Ptr GatewayIoBrokerSocket_Tcp::create(boost::asio::io bool GatewayIoBrokerSocket_Tcp::startImpl() { + // Do lazy connect of the first send + boost::asio::post( + io(), + [this]() + { + reportConnected(); + } + ); + + return true; +} + +void GatewayIoBrokerSocket_Tcp::sendDataImpl(const std::uint8_t* buf, std::size_t bufSize) +{ + bool sendRightAway = (m_state == State_Connected) && m_sentData.empty(); + m_sentData.emplace_back(buf, buf + bufSize); + + if (sendRightAway) { + sendPendingWrites(); + return; + } + + if (m_state == State_Disconnected) { + doConnect(); + return; + } +} + +void GatewayIoBrokerSocket_Tcp::doRead() +{ + assert(m_state == State_Connected); + m_socket.async_read_some( + boost::asio::buffer(m_inData), + [this](const boost::system::error_code& ec, std::size_t bytesCount) + { + if (ec == boost::asio::error::operation_aborted) { + return; + } + + if (ec) { + logger().error() << "TCP/IP read error: " << ec.message() << std::endl; + reportError(); + return; + } + + reportClientData(m_inData.data(), bytesCount); + doRead(); + } + ); +} + +void GatewayIoBrokerSocket_Tcp::doConnect() +{ + m_state = State_TryingToConnect; m_resolver.async_resolve( m_addr, std::to_string(m_port), [this](const boost::system::error_code& ec, const auto& results) @@ -63,46 +117,12 @@ bool GatewayIoBrokerSocket_Tcp::startImpl() return; } - m_connected = true; + m_state = State_Connected; doRead(); sendPendingWrites(); - reportConnected(); } - ); - - }); - return true; -} - -void GatewayIoBrokerSocket_Tcp::sendDataImpl(const std::uint8_t* buf, std::size_t bufSize) -{ - bool sendRightAway = m_connected && m_sentData.empty(); - m_sentData.emplace_back(buf, buf + bufSize); - if (sendRightAway) { - sendPendingWrites(); - } -} - -void GatewayIoBrokerSocket_Tcp::doRead() -{ - m_socket.async_read_some( - boost::asio::buffer(m_inData), - [this](const boost::system::error_code& ec, std::size_t bytesCount) - { - if (ec == boost::asio::error::operation_aborted) { - return; - } - - if (ec) { - logger().error() << "TCP/IP read error: " << ec.message() << std::endl; - reportError(); - return; - } - - reportClientData(m_inData.data(), bytesCount); - doRead(); - } - ); + ); + }); } void GatewayIoBrokerSocket_Tcp::sendPendingWrites() @@ -111,6 +131,7 @@ void GatewayIoBrokerSocket_Tcp::sendPendingWrites() return; } + assert(m_state == State_Connected); auto& buf = m_sentData.front(); m_socket.async_write_some( boost::asio::buffer(buf), diff --git a/gateway/app/gateway/GatewayIoBrokerSocket_Tcp.h b/gateway/app/gateway/GatewayIoBrokerSocket_Tcp.h index 9c89489..ac42e44 100644 --- a/gateway/app/gateway/GatewayIoBrokerSocket_Tcp.h +++ b/gateway/app/gateway/GatewayIoBrokerSocket_Tcp.h @@ -31,19 +31,27 @@ class GatewayIoBrokerSocket_Tcp final : public GatewayIoBrokerSocket virtual void sendDataImpl(const std::uint8_t* buf, std::size_t bufSize) override; private: + enum State + { + State_Disconnected, + State_TryingToConnect, + State_Connected, + }; + using Socket = boost::asio::ip::tcp::socket; using Resolver = boost::asio::ip::tcp::resolver; using DataBuf = std::vector; using DataBufsList = std::list; void doRead(); + void doConnect(); void sendPendingWrites(); Socket m_socket; Resolver m_resolver; std::string m_addr; std::uint16_t m_port = 0; - bool m_connected = false; + State m_state = State_Disconnected; DataBufsList m_sentData; std::array m_inData; }; diff --git a/gateway/app/gateway/GatewaySession.cpp b/gateway/app/gateway/GatewaySession.cpp index 98ecc64..4458760 100644 --- a/gateway/app/gateway/GatewaySession.cpp +++ b/gateway/app/gateway/GatewaySession.cpp @@ -32,6 +32,8 @@ GatewaySession::GatewaySession( m_sessionPtr(std::make_unique()), m_session(m_sessionPtr.get()) { + + logInfo() << "New primary session" << std::endl; } GatewaySession::GatewaySession( @@ -46,11 +48,12 @@ GatewaySession::GatewaySession( m_reconnectTimer(io), m_session(session) { + logInfo() << "New enapsulated session" << std::endl; } GatewaySession::~GatewaySession() { - m_logger.info() << "Terminating session for client: " << m_clientId << std::endl; + logInfo() << "Terminating session for client: " << m_clientId << std::endl; m_destructing = true; m_fwdEncSessions.clear(); } @@ -71,7 +74,7 @@ bool GatewaySession::start() }); if (!m_clientSocket->start()) { - m_logger.error() << "Failed to start client socket" << std::endl; + logError() << "Failed to start client socket" << std::endl; return false; } } @@ -95,11 +98,12 @@ void GatewaySession::doBrokerConnect() { if (m_brokerConnected) { m_session->setBrokerConnected(false); + m_brokerConnected = false; } m_brokerSocket = GatewayIoBrokerSocket::create(m_io, m_logger, m_config); if (!m_brokerSocket) { - m_logger.error() << "Failed to allocate broker socket " << std::endl; + logError() << "Failed to allocate broker socket " << std::endl; doTerminate(); return; } @@ -133,8 +137,8 @@ void GatewaySession::doBrokerConnect() m_brokerSocket->setConnectedReportCb( [this]() { - m_session->setBrokerConnected(true); m_brokerConnected = true; + m_session->setBrokerConnected(true); } ); @@ -150,7 +154,7 @@ void GatewaySession::doBrokerConnect() }); if (!m_brokerSocket->start()) { - m_logger.error() << "Failed to start TCP/IP socket" << std::endl; + logError() << "Failed to start TCP/IP socket" << std::endl; doTerminate(); return; } @@ -210,10 +214,10 @@ bool GatewaySession::startSession() } ); - m_session->setSendDataBrokerReqCb( [this](const std::uint8_t* buf, std::size_t bufSize) { + m_hadBrokerData = true; assert(m_brokerSocket); m_brokerSocket->sendData(buf, bufSize); }); @@ -227,7 +231,7 @@ bool GatewaySession::startSession() m_session->setClientConnectedReportCb( [this](const std::string& clientId) { - m_logger.info() << "Connected client: " << clientId << std::endl; + logInfo() << "Connected client: " << clientId << std::endl; m_clientId = clientId; assert(m_clientIdReportCb); @@ -303,6 +307,10 @@ bool GatewaySession::startSession() assert(m_clientIdReportCb); m_clientIdReportCb(clientId); + if (m_clientId.empty()) { + return; + } + auto iter = std::find_if( m_fwdEncSessions.begin(), m_fwdEncSessions.end(), @@ -325,7 +333,7 @@ bool GatewaySession::startSession() }); if (!sessionPtr->start()) { - m_logger.error() << "Failed to start forwarder encapsulated session" << std::endl; + logError() << "Failed to start forwarder encapsulated session" << std::endl; return false; } @@ -360,13 +368,17 @@ bool GatewaySession::startSession() } m_fwdEncSessions.erase(iter); + + if (m_fwdEncSessions.empty() && (!m_hadBrokerData)) { + doTerminate(); + } }); }); m_session->setErrorReportCb( [this](const char* msg) { - m_logger.error() << msg << std::endl; + logError() << msg << std::endl; }); if (!m_sessionPtr) { @@ -390,7 +402,7 @@ bool GatewaySession::startSession() if (!m_session->start()) { - m_logger.error() << "Failed to start client session" << std::endl; + logError() << "Failed to start client session" << std::endl; return false; } @@ -465,4 +477,14 @@ GatewaySession::AuthInfo GatewaySession::getAuthInfoFor(const std::string& clien return std::make_pair(iter->username, std::move(data)); } +std::ostream& GatewaySession::logError() +{ + return (m_logger.error() << "(" << this << ") "); +} + +std::ostream& GatewaySession::logInfo() +{ + return (m_logger.info() << "(" << this << ") "); +} + } // namespace cc_mqttsn_gateway_app diff --git a/gateway/app/gateway/GatewaySession.h b/gateway/app/gateway/GatewaySession.h index 6142385..14b61cc 100644 --- a/gateway/app/gateway/GatewaySession.h +++ b/gateway/app/gateway/GatewaySession.h @@ -79,6 +79,9 @@ class GatewaySession void doBrokerReconnect(); bool startSession(); AuthInfo getAuthInfoFor(const std::string& clientId); + + std::ostream& logError(); + std::ostream& logInfo(); boost::asio::io_context& m_io; GatewayLogger& m_logger; @@ -97,6 +100,7 @@ class GatewaySession std::list m_fwdEncSessions; bool m_brokerConnected = false; bool m_destructing = false; + bool m_hadBrokerData = false; }; using GatewaySessionPtr = GatewaySession::Ptr;