Skip to content

Commit

Permalink
Supporing PINGREQ from gateway to client when no data from client for…
Browse files Browse the repository at this point in the history
… too long.
  • Loading branch information
arobenko committed Jun 16, 2024
1 parent 18e5c8c commit d1c483b
Show file tree
Hide file tree
Showing 21 changed files with 303 additions and 16 deletions.
1 change: 1 addition & 0 deletions gateway/lib/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ function (lib_mqttsn_gateway)
session_op/AsleepMonitor.cpp
session_op/Encapsulate.cpp
session_op/Forward.cpp
session_op/Ping.cpp
session_op/PubRecv.cpp
session_op/PubSend.cpp
session_op/WillUpdate.cpp
Expand Down
9 changes: 9 additions & 0 deletions gateway/lib/src/SessionImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "session_op/Asleep.h"
#include "session_op/AsleepMonitor.h"
#include "session_op/Encapsulate.h"
#include "session_op/Ping.h"
#include "session_op/PubRecv.h"
#include "session_op/PubSend.h"
#include "session_op/Forward.h"
Expand Down Expand Up @@ -52,6 +53,7 @@ SessionImpl::SessionImpl()
m_ops.emplace_back(new session_op::PubSend(*this));
m_ops.emplace_back(new session_op::Forward(*this));
m_ops.emplace_back(new session_op::WillUpdate(*this));
m_ops.emplace_back(new session_op::Ping(*this));
m_ops.emplace_back(new session_op::Encapsulate(*this));
m_encapsulateOp = static_cast<decltype(m_encapsulateOp)>(m_ops.back().get());

Expand Down Expand Up @@ -266,6 +268,13 @@ void SessionImpl::clientConnectedReport(const std::string& clientId)
}
}

void SessionImpl::connStatusUpdated()
{
for (auto& op : m_ops) {
op->connStatusUpdated();
}
}

SessionImpl::AuthInfo SessionImpl::authInfoRequest(const std::string& clientId)
{
if (!m_authInfoReqCb) {
Expand Down
2 changes: 2 additions & 0 deletions gateway/lib/src/SessionImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "MsgHandler.h"
#include "SessionOp.h"
#include "session_op/Encapsulate.h"
#include "session_op/Ping.h"
#include "common.h"
#include "comms/util/ScopeGuard.h"

Expand Down Expand Up @@ -216,6 +217,7 @@ class SessionImpl : public MsgHandler
void termRequest();
void brokerReconnectRequest();
void clientConnectedReport(const std::string& clientId);
void connStatusUpdated();
AuthInfo authInfoRequest(const std::string& clientId);
void reportError(const char* str);

Expand Down
4 changes: 4 additions & 0 deletions gateway/lib/src/SessionOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,8 @@ void SessionOp::brokerConnectionUpdatedImpl()
{
}

void SessionOp::connStatusUpdatedImpl()
{
}

} // namespace cc_mqttsn_gateway
6 changes: 6 additions & 0 deletions gateway/lib/src/SessionOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class SessionOp : public MsgHandler
brokerConnectionUpdatedImpl();
}

void connStatusUpdated()
{
connStatusUpdatedImpl();
}

protected:
SessionOp(SessionImpl& session)
: m_session(session)
Expand Down Expand Up @@ -75,6 +80,7 @@ class SessionOp : public MsgHandler
virtual void tickImpl();
virtual void startImpl();
virtual void brokerConnectionUpdatedImpl();
virtual void connStatusUpdatedImpl();

private:
SessionImpl& m_session;
Expand Down
3 changes: 3 additions & 0 deletions gateway/lib/src/session_op/Asleep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include "Asleep.h"

#include "SessionImpl.h"

#include <cassert>
#include <algorithm>

Expand Down Expand Up @@ -52,6 +54,7 @@ void Asleep::handle(DisconnectMsg_SN& msg)

sendDisconnectToClient();
state().m_connStatus = ConnectionStatus::Asleep;
session().connStatusUpdated();
m_attempt = 0;
doPing();
}
Expand Down
2 changes: 1 addition & 1 deletion gateway/lib/src/session_op/Asleep.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace cc_mqttsn_gateway
namespace session_op
{

class Asleep : public SessionOp
class Asleep final : public SessionOp
{
typedef SessionOp Base;

Expand Down
6 changes: 3 additions & 3 deletions gateway/lib/src/session_op/AsleepMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ void AsleepMonitor::handle(DisconnectMsg_SN& msg)

m_lastPing = state().m_timestamp;
m_duration = ((static_cast<unsigned>(msg.field_duration().field().value()) * 3000) / 2);
reqNextTick();
reqNextTickInternal();
}

void AsleepMonitor::handle([[maybe_unused]] PingreqMsg_SN& msg)
{
m_lastPing = state().m_timestamp;
cancelTick();
if (state().m_connStatus == ConnectionStatus::Asleep) {
reqNextTick();
reqNextTickInternal();
}
}

Expand All @@ -75,7 +75,7 @@ void AsleepMonitor::checkTickRequired()
}
}

void AsleepMonitor::reqNextTick()
void AsleepMonitor::reqNextTickInternal()
{
assert(0 < m_lastPing);

Expand Down
4 changes: 2 additions & 2 deletions gateway/lib/src/session_op/AsleepMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace cc_mqttsn_gateway
namespace session_op
{

class AsleepMonitor : public SessionOp
class AsleepMonitor final : public SessionOp
{
typedef SessionOp Base;

Expand All @@ -35,7 +35,7 @@ class AsleepMonitor : public SessionOp
virtual void handle(MqttMessage& msg) override;

void checkTickRequired();
void reqNextTick();
void reqNextTickInternal();

Timestamp m_lastPing = 0;
unsigned m_duration = 0U;
Expand Down
4 changes: 4 additions & 0 deletions gateway/lib/src/session_op/Connect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ void Connect::handle(ConnectMsg_SN& msg)
session().reportError("Different client id reconnection in the same session");
sendDisconnectToClient();
state().m_connStatus = ConnectionStatus::Disconnected;
session().connStatusUpdated();
termRequest();
return;
}
Expand Down Expand Up @@ -191,6 +192,7 @@ void Connect::doNextStep()
if (state().m_retryCount <= m_internalState.m_attempt) {
m_clientId.clear();
state().m_connStatus = ConnectionStatus::Disconnected;
session().connStatusUpdated();
return;
}

Expand Down Expand Up @@ -372,6 +374,7 @@ void Connect::processAck(ConnackMsg::Field_returnCode::ValueType respCode)
sessionState.m_username = std::move(m_authInfo.first);
sessionState.m_password = std::move(m_authInfo.second);
clearInternalState();
session().connStatusUpdated();

if (m_clean) {
sessionState.m_regMgr.clearRegistrations();
Expand All @@ -389,6 +392,7 @@ void Connect::clearConnectionInfo(bool clearClientId)
sessionState.m_connStatus = ConnectionStatus::Disconnected;
sessionState.m_keepAlive = 0;
sessionState.m_will = WillInfo();
session().connStatusUpdated();
}

void Connect::clearInternalState()
Expand Down
2 changes: 1 addition & 1 deletion gateway/lib/src/session_op/Connect.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace cc_mqttsn_gateway
namespace session_op
{

class Connect : public SessionOp
class Connect final : public SessionOp
{
using Base = SessionOp;

Expand Down
3 changes: 3 additions & 0 deletions gateway/lib/src/session_op/Disconnect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include "Disconnect.h"

#include "SessionImpl.h"

#include <cassert>

namespace cc_mqttsn_gateway
Expand Down Expand Up @@ -57,6 +59,7 @@ void Disconnect::sendDisconnectSn()
{
Base::sendDisconnectToClient();
state().m_connStatus = ConnectionStatus::Disconnected;
session().connStatusUpdated();
}

} // namespace session_op
Expand Down
2 changes: 1 addition & 1 deletion gateway/lib/src/session_op/Disconnect.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace cc_mqttsn_gateway
namespace session_op
{

class Disconnect : public SessionOp
class Disconnect final : public SessionOp
{
typedef SessionOp Base;

Expand Down
2 changes: 1 addition & 1 deletion gateway/lib/src/session_op/Encapsulate.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace cc_mqttsn_gateway
namespace session_op
{

class Encapsulate : public SessionOp
class Encapsulate final : public SessionOp
{
using Base = SessionOp;

Expand Down
2 changes: 1 addition & 1 deletion gateway/lib/src/session_op/Forward.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace cc_mqttsn_gateway
namespace session_op
{

class Forward : public SessionOp
class Forward final : public SessionOp
{
typedef SessionOp Base;

Expand Down
70 changes: 70 additions & 0 deletions gateway/lib/src/session_op/Ping.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//
// Copyright 2016 - 2024 (C). Alex Robenko. All rights reserved.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

#include "Ping.h"

#include <cassert>

namespace cc_mqttsn_gateway
{

namespace session_op
{

Ping::Ping(SessionImpl& session) :
Base(session)
{
}

Ping::~Ping() = default;

void Ping::tickImpl()
{
sendPing();
}

void Ping::connStatusUpdatedImpl()
{
restartPingTimer();
}

void Ping::handle([[maybe_unused]] MqttsnMessage& msg)
{
restartPingTimer();
}

void Ping::sendPing()
{
auto& st = state();

if (st.m_retryCount <= m_attempt) {
termRequest();
return;
}

++m_attempt;
sendToClient(PingreqMsg_SN());
nextTickReq(st.m_retryPeriod);
}

void Ping::restartPingTimer()
{
auto& st = state();
if (st.m_connStatus != ConnectionStatus::Connected) {
cancelTick();
return;
}

m_attempt = 0U;
assert(0U < st.m_keepAlive);
auto nextPingTick = (st.m_keepAlive * 1000);
nextTickReq(nextPingTick);
}

} // namespace session_op

} // namespace cc_mqttsn_gateway
47 changes: 47 additions & 0 deletions gateway/lib/src/session_op/Ping.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//
// Copyright 2016 - 2024 (C). Alex Robenko. All rights reserved.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

#pragma once

#include "SessionOp.h"
#include "common.h"

namespace cc_mqttsn_gateway
{

namespace session_op
{

class Ping final : public SessionOp
{
typedef SessionOp Base;

public:
explicit Ping(SessionImpl& session);
~Ping();

void clientConnected();

protected:
virtual void tickImpl() override;
virtual void connStatusUpdatedImpl() override;

private:
using Base::handle;
virtual void handle(MqttsnMessage& msg) override;

void sendPing();
void restartPingTimer();

unsigned m_attempt = 0;
};

} // namespace session_op

} // namespace cc_mqttsn_gateway


2 changes: 1 addition & 1 deletion gateway/lib/src/session_op/PubRecv.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace cc_mqttsn_gateway
namespace session_op
{

class PubRecv : public SessionOp
class PubRecv final : public SessionOp
{
typedef SessionOp Base;

Expand Down
2 changes: 1 addition & 1 deletion gateway/lib/src/session_op/PubSend.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace cc_mqttsn_gateway
namespace session_op
{

class PubSend : public SessionOp
class PubSend final : public SessionOp
{
typedef SessionOp Base;

Expand Down
2 changes: 1 addition & 1 deletion gateway/lib/src/session_op/WillUpdate.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace cc_mqttsn_gateway
namespace session_op
{

class WillUpdate : public SessionOp
class WillUpdate final : public SessionOp
{
typedef SessionOp Base;

Expand Down
Loading

0 comments on commit d1c483b

Please sign in to comment.