Skip to content

Commit

Permalink
[Messaging] Message buffers not created by WPEProcess will no longer …
Browse files Browse the repository at this point in the history
…be missed by Thunder (#1474)

* Adding test which first establishes a messaging client and then a server to test validation of message buffers

* Adding Validate to MessageDispatcher and calling it in MessageClient::PopMessagesAndCall to avoid an issue when client is created before the server

* Formatting changes

* Updating the cyclicbuffer test by renaming Validate to Open
  • Loading branch information
VeithMetro authored Dec 20, 2023
1 parent 9097e77 commit 38b37d7
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 19 deletions.
12 changes: 10 additions & 2 deletions Source/core/CyclicBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,11 @@ namespace Core {
{
}

bool CyclicBuffer::Validate() {
bool CyclicBuffer::Open()
{
bool loaded = (_administration != nullptr);

if (loaded == false) {
if (loaded == false) {
loaded = _buffer.Load();
if (loaded == true) {
_realBuffer = (&(_buffer.Buffer()[sizeof(struct control)]));
Expand All @@ -167,6 +168,13 @@ namespace Core {
return (loaded);
}

void CyclicBuffer::Close()
{
_buffer.Destroy();
_realBuffer = nullptr;
_administration = nullptr;
}

void CyclicBuffer::AdminLock()
{
#ifdef __POSIX__
Expand Down
3 changes: 2 additions & 1 deletion Source/core/CyclicBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ namespace Core {
{
return (_administration->_size);
}
bool Validate();
bool Open();
void Close();

// THREAD SAFE
// If there are threads blocked in the Lock, they can be relinquised by
Expand Down
1 change: 1 addition & 0 deletions Source/messaging/MessageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ namespace Messaging {
_adminLock.Lock();

for (auto& client : _clients) {
client.second.Validate();
uint16_t size = sizeof(_readBuffer);

while (client.second.PopData(size, _readBuffer) != Core::ERROR_READ_ERROR) {
Expand Down
17 changes: 11 additions & 6 deletions Source/messaging/MessageDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,19 @@ namespace Messaging {
(initialize == true ? DATA_BUFFER_SIZE : 0), true)
// clang-format on
{
if (_dataBuffer.IsValid() == false) {
_dataBuffer.Validate();
}

if (_dataBuffer.IsValid() == true) {
if ( (initialize == false) && (_dataBuffer.Used() > 0) ) {
TRACE_L1("%d bytes already in the buffer instance %d", _dataBuffer.Used(), instanceId);
_dataBuffer.Ring();
}
}
else {
TRACE_L1("MessageDispatcher instance %d is not valid!", instanceId);
if (initialize == false) {
TRACE_L1("MessageDispatcher instance %d (client) is not valid, probably because the server has not created a file yet", instanceId);
}
else {
TRACE_L1("MessageDispatcher instance %d (server) is not valid, possible issues when creating a file", instanceId);
}
}
}
~MessageDataBufferType()
Expand Down Expand Up @@ -265,7 +266,11 @@ namespace Messaging {
bool IsValid() const {
return (_dataBuffer.IsValid());
}


void Validate() {
_dataBuffer.Open();
}

const string& MetadataName() const {
return (_filenames.metaData);
}
Expand Down
2 changes: 1 addition & 1 deletion Source/messaging/MessageUnit.h
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ namespace WPEFramework {

/**
* @brief Exchanges metadata with the server. Reader needs to register for notifications to recevie this message.
* Passed buffer will be filled with data from thr other side
* Passed buffer will be filled with data from the other side
*
* @param length length of the message
* @param value buffer
Expand Down
5 changes: 5 additions & 0 deletions Tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ option(HTTPSCLIENT_TEST "Example how to do https requests with Thunder." OFF)
option(WORKERPOOL_TEST "WorkerPool stress test" OFF)
option(FILE_UNLINK_TEST "File unlink test" OFF)
option(REDIRECT_TEST "Test stream redirection" OFF)
option(MESSAGEBUFFER_TEST "Test message buffer" OFF)

if(BUILD_TESTS)
add_subdirectory(unit)
Expand All @@ -27,3 +28,7 @@ endif()
if(WORKERPOOL_TEST)
add_subdirectory(workerpool-test)
endif()

if(MESSAGEBUFFER_TEST)
add_subdirectory(message-buffer)
endif()
34 changes: 34 additions & 0 deletions Tests/message-buffer/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# If not stated otherwise in this file or this component's license file the
# following copyright and licenses apply:
#
# Copyright 2020 Metrological
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(MessageBufferTest
Module.cpp
main.cpp)

target_link_libraries(MessageBufferTest
PRIVATE
${NAMESPACE}Core::${NAMESPACE}Core
${NAMESPACE}COM::${NAMESPACE}COM
${NAMESPACE}Messaging::${NAMESPACE}Messaging
)

set_target_properties(MessageBufferTest PROPERTIES
CXX_STANDARD 11
CXX_STANDARD_REQUIRED YES
)

install(TARGETS MessageBufferTest DESTINATION bin)
22 changes: 22 additions & 0 deletions Tests/message-buffer/Module.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* If not stated otherwise in this file or this component's LICENSE file the
* following copyright and licenses apply:
*
* Copyright 2021 Metrological
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "Module.h"

MODULE_NAME_DECLARATION(BUILD_REFERENCE)
31 changes: 31 additions & 0 deletions Tests/message-buffer/Module.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* If not stated otherwise in this file or this component's LICENSE file the
* following copyright and licenses apply:
*
* Copyright 2021 Metrological
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#ifndef MODULE_NAME
#define MODULE_NAME MessageBufferTest
#endif

#include <core/core.h>
#include <com/com.h>
#include <messaging/messaging.h>

#undef EXTERNAL
#define EXTERNAL
120 changes: 120 additions & 0 deletions Tests/message-buffer/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include "Module.h"

namespace WPEFramework {
namespace Test {

static Core::NodeId GetConnectionNode()
{
string nodeName;
Core::SystemInfo::GetEnvironment(string(_T("COMMUNICATOR_CONNECTOR")), nodeName);
return (Core::NodeId(nodeName.c_str()));
}

static class PluginHost {
private:
PluginHost(const PluginHost&) = delete;
PluginHost& operator=(const PluginHost&) = delete;

public:
PluginHost()
: _engine(WPEFramework::Core::ProxyType<WPEFramework::RPC::InvokeServerType<2, 0, 4>>::Create())
, _comClient(Core::ProxyType<RPC::CommunicatorClient>::Create(GetConnectionNode(), Core::ProxyType<Core::IIPCServer>(_engine)))
{
}
~PluginHost()
{
Deinitialize();
}

public:
void Initialize()
{
uint32_t result = _comClient->Open(RPC::CommunicationTimeOut);

if (result != Core::ERROR_NONE) {
TRACE(Trace::Error, (_T("Could not open connection to node %s. Error: %s"), _comClient->Source().RemoteId(), Core::NumberType<uint32_t>(result).Text()));
} else {
Messaging::MessageUnit::Instance().Open(_comClient->ConnectionId());
}
}

void Deinitialize()
{
if (_comClient.IsValid() == true) {
_comClient.Release();
}
if (_engine.IsValid() == true) {
_engine.Release();
}
Core::Singleton::Dispose();
}

void Trace()
{
TRACE(Trace::Information, (_T("test trace")));
}

void Syslog()
{
SYSLOG(Logging::Notification,(_T("test syslog")));
}

private:
Core::ProxyType<RPC::InvokeServerType<2, 0, 4> > _engine;
Core::ProxyType<RPC::CommunicatorClient> _comClient;
} _wpeFrameworkClient;

}
}

void help() {
printf ("I -> Initialize the connection\n");
printf ("D -> Deinitialize the connection\n");
printf ("T -> Trace\n");
printf ("S -> Syslog\n");
printf ("Q -> Quit\n>");
}

int main(int argc, char** argv)
{
{
int element;

help();
do {
element = toupper(getchar());

switch (element) {
case 'I': {
WPEFramework::Test::_wpeFrameworkClient.Initialize();
fprintf(stdout, "PluginHost initialized..\n");
fflush(stdout);
break;
}
case 'D': {
WPEFramework::Test::_wpeFrameworkClient.Deinitialize();
fprintf(stdout, "PluginHost deinitialized..\n");
fflush(stdout);
break;
}
case 'T': {
WPEFramework::Test::_wpeFrameworkClient.Trace();
fprintf(stdout, "Trace sent..\n");
fflush(stdout);
break;
}
case 'S': {
WPEFramework::Test::_wpeFrameworkClient.Syslog();
fprintf(stdout, "Syslog sent..\n");
fflush(stdout);
break;
}
case 'Q': break;
default: {
}
}
} while (element != 'Q');
}

return (0);
}
Loading

0 comments on commit 38b37d7

Please sign in to comment.