Skip to content

Commit

Permalink
Initial support for client unsubscribe.
Browse files Browse the repository at this point in the history
  • Loading branch information
arobenko committed Jul 14, 2024
1 parent b8067ba commit f421c86
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 31 deletions.
1 change: 0 additions & 1 deletion client/lib/include/cc_mqttsn_client/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ typedef struct
typedef struct
{
CC_MqttsnReturnCode m_returnCode; ///< Return code reported by the @b SUBACK message
CC_MqttsnTopicId m_topicId; ///< Granted topic ID (if applicable).
CC_MqttsnQoS m_qos; ///< Granted max QoS value
} CC_MqttsnSubscribeInfo;

Expand Down
12 changes: 9 additions & 3 deletions client/lib/src/TopicFilterDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ namespace cc_mqttsn_client
{

using TopicNameStr = SubscribeMsg::Field_topicName::Field::ValueType;
using SubFiltersMap = ObjListType<TopicNameStr, Config::SubFiltersLimit, Config::HasSubTopicVerification>;

struct RegTopicInfo
{
TopicNameStr m_topic;
CC_MqttsnTopicId m_topicId = 0U; // key
CC_MqttsnTopicId m_topicId = 0U;

template <typename T>
RegTopicInfo(T&& topic, CC_MqttsnTopicId topicId) : m_topic(std::forward<T>(topic)), m_topicId(topicId) {}

RegTopicInfo(const char* topic) : m_topic(topic) {}
RegTopicInfo(CC_MqttsnTopicId topicId) : m_topicId(topicId) {}
};

using InRegTopicsMap = ObjListType<RegTopicInfo, Config::InRegTopicsLimit>;
using SubFiltersMap = ObjListType<RegTopicInfo, Config::SubFiltersLimit, Config::HasSubTopicVerification>; // key is m_topic
using InRegTopicsMap = ObjListType<RegTopicInfo, Config::InRegTopicsLimit>; // key is m_topicId;

} // namespace cc_mqttsn_client
53 changes: 37 additions & 16 deletions client/lib/src/op/SubscribeOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,29 +150,50 @@ void SubscribeOp::handle(SubackMsg& msg)
auto info = CC_MqttsnSubscribeInfo();
info.m_returnCode = static_cast<decltype(info.m_returnCode)>(msg.field_returnCode().value());
info.m_qos = static_cast<decltype(info.m_qos)>(msg.field_flags().field_qos().value());
info.m_topicId = msg.field_topicId().value();

auto topicId = static_cast<CC_MqttsnTopicId>(msg.field_topicId().value());
auto& topicStr = m_subscribeMsg.field_topicName().field().value();
auto* topicPtr = topicStr.c_str();
if (topicStr.empty()) {
topicPtr = nullptr;
}

if (topicId != 0U) {
storeInRegTopic(topicPtr, topicId);
}

COMMS_ASSERT((topicId != 0U) || (topicPtr != nullptr));
auto& filtersMap = client().reuseState().m_subFilters;
do {
if (info.m_topicId == 0U) {
break;
}
if (topicPtr != nullptr) {
auto iter =
std::lower_bound(
filtersMap.begin(), filtersMap.end(), topicPtr,
[](auto& elem, const char* topicParam)
{
return elem.m_topic < topicParam;
});

if ((iter == filtersMap.end()) || (iter->m_topic != topicPtr)) {
filtersMap.emplace(iter, topicPtr);
}

auto& topicStr = m_subscribeMsg.field_topicName().field().value();
if (!topicStr.empty()) {
storeInRegTopic(topicStr.c_str(), info.m_topicId);
break;
}

if constexpr (Config::HasSubTopicVerification) {
storeInRegTopic(nullptr, info.m_topicId);
break;
}

} while (false);
COMMS_ASSERT(m_subscribeMsg.field_topicId().doesExist());
COMMS_ASSERT(m_subscribeMsg.field_topicId().field().value() != 0U);

if ((info.m_topicId != 0U) && (!m_subscribeMsg.field_topicName().field().value().empty())) {
storeInRegTopic(m_subscribeMsg.field_topicName().field().value().c_str(), info.m_topicId);
}
auto iter =
std::find_if(
filtersMap.begin(), filtersMap.end(),
[](auto& elem)
{
return !elem.m_topic.empty();
});

filtersMap.emplace(iter, static_cast<CC_MqttsnTopicId>(m_subscribeMsg.field_topicId().field().value()));
} while (false);

completeOpInternal(CC_MqttsnAsyncOpStatus_Complete, &info);
}
Expand Down
97 changes: 95 additions & 2 deletions client/lib/src/op/UnsubscribeOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,47 @@ CC_MqttsnErrorCode UnsubscribeOp::config(const CC_MqttsnUnsubscribeConfig* confi
if ((!emptyTopic) && (!verifySubFilter(config->m_topic))) {
errorLog("Bad topic filter format in unsubscribe.");
return CC_MqttsnErrorCode_BadParam;
}
}

if constexpr (Config::HasSubTopicVerification) {
do {
if (!client().configState().m_verifySubFilter) {
break;
}

auto& filtersMap = client().reuseState().m_subFilters;
if (!emptyTopic) {
auto iter =
std::lower_bound(
filtersMap.begin(), filtersMap.end(), config->m_topic,
[](auto& elem, const char* topicParam)
{
return elem.m_topic < topicParam;
});

if ((iter == filtersMap.end()) || (iter->m_topic != config->m_topic)) {
errorLog("Requested unsubscribe topic hasn't been used for subscription before");
return CC_MqttsnErrorCode_BadParam;
}

break;
}

COMMS_ASSERT(isValidTopicId(config->m_topicId));
auto iter =
std::find_if(
filtersMap.begin(), filtersMap.end(),
[config](auto& elem)
{
return config->m_topicId == elem.m_topicId;
});

if (iter == filtersMap.end()) {
errorLog("Requested unsubscribe topic ID hasn't been used for subscription before");
return CC_MqttsnErrorCode_BadParam;
}
} while (false);
}

m_unsubscribeMsg.field_flags().field_qos().setValue(config->m_qos);

Expand Down Expand Up @@ -124,7 +164,60 @@ CC_MqttsnErrorCode UnsubscribeOp::send(CC_MqttsnUnsubscribeCompleteCb cb, void*
return ec;
}

// TODO: remove the record
// Remove record on first send rather than acknowledgement allowing message
// to get lost.
do {
if (m_recordRemoved) {
break;
}

m_recordRemoved = true;

auto& topicStr = m_unsubscribeMsg.field_topicName().field().value();
auto* topicPtr = topicStr.c_str();
if (m_unsubscribeMsg.field_topicName().isMissing()) {
topicPtr = nullptr;
}

auto topicId = m_unsubscribeMsg.field_topicId().field().value();
COMMS_ASSERT(m_unsubscribeMsg.field_topicId().doesExist() || (topicId == 0U));
COMMS_ASSERT((topicPtr == nullptr) || (topicId == 0U));
COMMS_ASSERT((topicPtr != nullptr) || (topicId != 0U));

removeInRegTopic(topicPtr, topicId);

if constexpr (Config::HasSubTopicVerification) {
auto& filtersMap = client().reuseState().m_subFilters;
if (topicPtr != nullptr) {
auto iter =
std::lower_bound(
filtersMap.begin(), filtersMap.end(), topicPtr,
[](auto& elem, const char* topicParam)
{
return elem.m_topic < topicParam;
});

if ((iter != filtersMap.end()) && (iter->m_topic != topicPtr)) {
filtersMap.erase(iter);
}

break;
}

COMMS_ASSERT(topicId != 0U);

auto iter =
std::find_if(
filtersMap.begin(), filtersMap.end(),
[topicId](auto& elem) {
return elem.m_topicId == topicId;
});

if (iter != filtersMap.end()) {
filtersMap.erase(iter);
}
}
} while (false);

completeOnError.release();
return CC_MqttsnErrorCode_Success;
Expand Down
1 change: 1 addition & 0 deletions client/lib/src/op/UnsubscribeOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class UnsubscribeOp final : public Op
CC_MqttsnUnsubscribeCompleteCb m_cb = nullptr;
void* m_cbData = nullptr;
bool m_suspended = false;
bool m_recordRemoved = false;

static_assert(ExtConfig::UnsubscribeOpTimers == 1U);
};
Expand Down
1 change: 0 additions & 1 deletion client/lib/test/UnitTestCommonBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ UnitTestCommonBase::UnitTestConnectCompleteReport::UnitTestConnectCompleteReport
UnitTestCommonBase::UnitTestSubscribeInfo& UnitTestCommonBase::UnitTestSubscribeInfo::operator=(const CC_MqttsnSubscribeInfo& info)
{
m_returnCode = info.m_returnCode;
m_topicId = info.m_topicId;
m_qos = info.m_qos;
return *this;
}
Expand Down
1 change: 0 additions & 1 deletion client/lib/test/UnitTestCommonBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ class UnitTestCommonBase
struct UnitTestSubscribeInfo
{
CC_MqttsnReturnCode m_returnCode = CC_MqttsnReturnCode_ValuesLimit;
CC_MqttsnTopicId m_topicId;
CC_MqttsnQoS m_qos;
UnitTestSubscribeInfo() = default;
UnitTestSubscribeInfo(const UnitTestSubscribeInfo&) = default;
Expand Down
7 changes: 0 additions & 7 deletions client/lib/test/UnitTestSubscribe.th
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ void UnitTestSubscribe::test1()
auto subscribeReport = unitTestSubscribeCompleteReport();
TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, 0U);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos);

TS_ASSERT(unitTestHasTickReq());
Expand Down Expand Up @@ -166,7 +165,6 @@ void UnitTestSubscribe::test2()
auto subscribeReport = unitTestSubscribeCompleteReport();
TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, AckTopicId);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos);

TS_ASSERT(unitTestHasTickReq());
Expand Down Expand Up @@ -236,7 +234,6 @@ void UnitTestSubscribe::test3()
auto subscribeReport = unitTestSubscribeCompleteReport();
TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, TopicId);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos);

TS_ASSERT(unitTestHasTickReq());
Expand Down Expand Up @@ -322,7 +319,6 @@ void UnitTestSubscribe::test4()
auto subscribeReport = unitTestSubscribeCompleteReport();
TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, 0U);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos);

TS_ASSERT(unitTestHasTickReq());
Expand Down Expand Up @@ -514,7 +510,6 @@ void UnitTestSubscribe::test6()
auto subscribeReport = unitTestSubscribeCompleteReport();
TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, 0U);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos);
}

Expand Down Expand Up @@ -552,7 +547,6 @@ void UnitTestSubscribe::test6()
auto subscribeReport = unitTestSubscribeCompleteReport();
TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, 0U);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos);
}

Expand Down Expand Up @@ -591,7 +585,6 @@ void UnitTestSubscribe::test6()
auto subscribeReport = unitTestSubscribeCompleteReport();
TS_ASSERT_EQUALS(subscribeReport->m_status, CC_MqttsnAsyncOpStatus_Complete);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_returnCode, CC_MqttsnReturnCode_Accepted);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_topicId, 0U);
TS_ASSERT_EQUALS(subscribeReport->m_info.m_qos, AckQos);
}

Expand Down

0 comments on commit f421c86

Please sign in to comment.