From 2b3a8cbc91154b907ccd5e19ef22d2b9c7a4dae1 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 8 Aug 2019 17:44:29 +0200 Subject: [PATCH] feat(server): Adding multithreading architecture with additional refactoring --- CMakeLists.txt | 14 +-- arch/posix/CMakeLists.txt | 2 +- arch/posix/ua_architecture.h | 30 ++++++ arch/win32/ua_architecture.h | 24 +++++ doc/building.rst | 4 +- include/open62541/architecture_definitions.h | 100 ++++++++++--------- include/open62541/config.h.in | 4 +- include/open62541/types.h | 2 +- plugins/ua_log_stdout.c | 6 +- plugins/ua_nodestore_default.c | 23 ++--- src/client/ua_client_worker.c | 2 +- src/server/ua_discovery_manager.h | 8 +- src/server/ua_server.c | 24 +++-- src/server/ua_server_binary.c | 8 +- src/server/ua_server_discovery_mdns.c | 8 +- src/server/ua_server_internal.h | 5 + src/server/ua_services_discovery.c | 22 ++-- src/server/ua_services_discovery_multicast.c | 8 +- src/ua_workqueue.c | 18 ++-- src/ua_workqueue.h | 24 ++--- tests/server/check_nodestore.c | 6 +- tools/travis/travis_linux_script.sh | 4 +- tools/travis/travis_osx_script.sh | 2 +- 23 files changed, 209 insertions(+), 139 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7afca5c1e4e..1b705f5f863 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -192,12 +192,12 @@ if(UA_ENABLE_DISCOVERY_MULTICAST AND NOT UA_ENABLE_DISCOVERY) endif() # Advanced options -option(UA_ENABLE_MULTITHREADING "Enable multithreading (EXPERIMENTAL)" OFF) -mark_as_advanced(UA_ENABLE_MULTITHREADING) - option(UA_ENABLE_IMMUTABLE_NODES "Nodes in the information model are not edited but copied and replaced" OFF) mark_as_advanced(UA_ENABLE_IMMUTABLE_NODES) -if(UA_ENABLE_MULTITHREADING) + +set(UA_MULTITHREADING 0 CACHE STRING "Level of multithreading") +mark_as_advanced(UA_MULTITHREADING) +if (UA_MULTITHREADING GREATER 100) set(UA_ENABLE_IMMUTABLE_NODES ON) endif() @@ -356,8 +356,8 @@ if(BUILD_SHARED_LIBS) endif() # Warn if experimental features are enabled -if(UA_ENABLE_MULTITHREADING) - MESSAGE(WARNING "UA_ENABLE_MULTITHREADING is enabled. The feature is under development and marked as EXPERIMENTAL") +if(UA_MULTITHREADING) + MESSAGE(WARNING "UA_MULTITHREADING is enabled. The feature is under development and marked as EXPERIMENTAL") endif() ######################## @@ -1154,7 +1154,7 @@ endif() if(UA_ENABLE_SUBSCRIPTIONS_EVENTS) list(APPEND open62541_enabled_components "Events") endif() -if(UA_ENABLE_MULTITHREADING) +if(UA_MULTITHREADING) list(APPEND open62541_enabled_components "Multithreading") endif() if(UA_ENABLE_DISCOVERY) diff --git a/arch/posix/CMakeLists.txt b/arch/posix/CMakeLists.txt index 1d21d21bc2b..3d020eb80ef 100644 --- a/arch/posix/CMakeLists.txt +++ b/arch/posix/CMakeLists.txt @@ -22,7 +22,7 @@ if (${_index} GREATER -1 OR "${UA_ARCHITECTURE}" STREQUAL "posix") ua_architecture_append_to_library(m) #TODO - Error on first make run if pthread is included conditional? ua_architecture_append_to_library(pthread) - if(UA_ENABLE_MULTITHREADING OR UA_BUILD_UNIT_TESTS) + if(UA_MULTITHREADING OR UA_BUILD_UNIT_TESTS) ua_architecture_append_to_library(pthread) endif() if(NOT APPLE AND (NOT ${CMAKE_SYSTEM_NAME} MATCHES "OpenBSD")) diff --git a/arch/posix/ua_architecture.h b/arch/posix/ua_architecture.h index 6236a001f96..3ab8c277999 100644 --- a/arch/posix/ua_architecture.h +++ b/arch/posix/ua_architecture.h @@ -147,6 +147,36 @@ extern void * (*UA_globalRealloc)(void *ptr, size_t size); LOG; \ } +#if UA_MULTITHREADING >= 100 +#include +#define UA_LOCK_TYPE_NAME pthread_mutex_t +#define UA_LOCK_TYPE(mutexName) pthread_mutex_t mutexName; \ + pthread_mutexattr_t mutexName##_attr; \ + int mutexName##Counter; +#define UA_LOCK_INIT(mutexName) pthread_mutexattr_init(&mutexName##_attr); \ + pthread_mutexattr_settype(&mutexName##_attr, PTHREAD_MUTEX_RECURSIVE); \ + pthread_mutex_init(&mutexName, &mutexName##_attr); \ + mutexName##Counter = 0; +#define UA_LOCK_RELEASE(mutexName) pthread_mutex_destroy(&mutexName); \ + pthread_mutexattr_destroy(&mutexName##_attr); + +#define UA_LOCK(mutexName) pthread_mutex_lock(&mutexName); \ + UA_assert(++(mutexName##Counter) == 1); \ + +#define UA_UNLOCK(mutexName) UA_assert(--(mutexName##Counter) == 0); \ + pthread_mutex_unlock(&mutexName); +#define UA_LOCK_SWITCH(currentMutex, newMutex) UA_UNLOCK(currentMutex) \ + UA_LOCK(newMutex) +#else +#define UA_LOCK_TYPE_NAME +#define UA_LOCK_TYPE(mutexName) +#define UA_LOCK_INIT(mutexName) +#define UA_LOCK_RELEASE(mutexName) +#define UA_LOCK(mutexName) +#define UA_UNLOCK(mutexName) +#define UA_LOCK_SWITCH(currentMutex, newMutex) +#endif + #include #if defined(__APPLE__) && defined(_SYS_QUEUE_H_) diff --git a/arch/win32/ua_architecture.h b/arch/win32/ua_architecture.h index 97f101c2b7d..71155ccc3fb 100644 --- a/arch/win32/ua_architecture.h +++ b/arch/win32/ua_architecture.h @@ -143,6 +143,30 @@ } #define UA_LOG_SOCKET_ERRNO_GAI_WRAP UA_LOG_SOCKET_ERRNO_WRAP +#if UA_MULTITHREADING >= 100 +#define UA_LOCK_TYPE_NAME CRITICAL_SECTION +#define UA_LOCK_TYPE(mutexName) CRITICAL_SECTION mutexName; \ + int mutexName##Counter; +#define UA_LOCK_INIT(mutexName) InitializeCriticalSection(&mutexName); \ + mutexName##Counter = 0;; +#define UA_LOCK_RELEASE(mutexName) DeleteCriticalSection(&mutexName); +#define UA_LOCK(mutexName) EnterCriticalSection(&mutexName); \ + UA_assert(++(mutexName##Counter) == 1); +#define UA_UNLOCK(mutexName) UA_assert(--(mutexName##Counter) == 0); \ + LeaveCriticalSection(&mutexName); +#define UA_LOCK_SWITCH(currentMutex, newMutex) UA_UNLOCK(currentMutex) \ + UA_LOCK(newMutex) +#else +#define UA_LOCK_TYPE_NAME +#define UA_LOCK_TYPE(mutexName) +#define UA_LOCK_TYPE_POINTER(mutexName) +#define UA_LOCK_INIT(mutexName) +#define UA_LOCK_RELEASE(mutexName) +#define UA_LOCK(mutexName) +#define UA_UNLOCK(mutexName) +#define UA_LOCK_SWITCH(currentMutex, newMutex) +#endif + #include /* Fix redefinition of SLIST_ENTRY on mingw winnt.h */ diff --git a/doc/building.rst b/doc/building.rst index 6c94dba3608..4d0d73b42fc 100644 --- a/doc/building.rst +++ b/doc/building.rst @@ -191,7 +191,7 @@ Detailed SDK Features **UA_ENABLE_AMALGAMATION** Compile a single-file release into the files :file:`open62541.c` and :file:`open62541.h`. Not receommended for installation. -**UA_ENABLE_MULTITHREADING (EXPERIMENTAL)** +**UA_MULTITHREADING (EXPERIMENTAL)** Enable multi-threading support. Work is distributed to a number of worker threads. This is a new feature and currently marked as EXPERIMENTAL. @@ -200,7 +200,7 @@ Detailed SDK Features replacement is done with atomic operations so that the information model is always consistent and can be accessed from an interrupt or parallel thread (depends on the node storage plugin implementation). This feature is a - prerequisite for ``UA_ENABLE_MULTITHREADING``. + prerequisite for ``UA_MULTITHREADING``. **UA_ENABLE_COVERAGE** Measure the coverage of unit tests diff --git a/include/open62541/architecture_definitions.h b/include/open62541/architecture_definitions.h index 0d153bf8baf..b57e6f9ad7e 100644 --- a/include/open62541/architecture_definitions.h +++ b/include/open62541/architecture_definitions.h @@ -10,6 +10,8 @@ #ifndef ARCH_UA_ARCHITECTURE_DEFINITIONS_H_ #define ARCH_UA_ARCHITECTURE_DEFINITIONS_H_ +#include + /** * C99 Definitions * --------------- */ @@ -273,101 +275,101 @@ UA_STATIC_ASSERT(sizeof(bool) == 1, cannot_overlay_integers_with_large_bool); * Atomic operations that synchronize across processor cores (for * multithreading). Only the inline-functions defined next are used. Replace * with architecture-specific operations if necessary. */ -#ifndef UA_ENABLE_MULTITHREADING -# define UA_atomic_sync() +#if UA_MULTITHREADING >= 200 + #ifdef _MSC_VER /* Visual Studio */ + #define UA_atomic_sync() _ReadWriteBarrier() + #else /* GCC/Clang */ + #define UA_atomic_sync() __sync_synchronize() + #endif #else -# ifdef _MSC_VER /* Visual Studio */ -# define UA_atomic_sync() _ReadWriteBarrier() -# else /* GCC/Clang */ -# define UA_atomic_sync() __sync_synchronize() -# endif + #define UA_atomic_sync() #endif static UA_INLINE void * UA_atomic_xchg(void * volatile * addr, void *newptr) { -#ifndef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 +#ifdef _MSC_VER /* Visual Studio */ + return _InterlockedExchangePointer(addr, newptr); +#else /* GCC/Clang */ + return __sync_lock_test_and_set(addr, newptr); +#endif +#else void *old = *addr; *addr = newptr; return old; -#else - # ifdef _MSC_VER /* Visual Studio */ - return _InterlockedExchangePointer(addr, newptr); -# else /* GCC/Clang */ - return __sync_lock_test_and_set(addr, newptr); -# endif #endif } static UA_INLINE void * UA_atomic_cmpxchg(void * volatile * addr, void *expected, void *newptr) { -#ifndef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 +#ifdef _MSC_VER /* Visual Studio */ + return _InterlockedCompareExchangePointer(addr, expected, newptr); +#else /* GCC/Clang */ + return __sync_val_compare_and_swap(addr, expected, newptr); +#endif +#else void *old = *addr; if(old == expected) { *addr = newptr; } return old; -#else - # ifdef _MSC_VER /* Visual Studio */ - return _InterlockedCompareExchangePointer(addr, expected, newptr); -# else /* GCC/Clang */ - return __sync_val_compare_and_swap(addr, expected, newptr); -# endif #endif } static UA_INLINE uint32_t UA_atomic_addUInt32(volatile uint32_t *addr, uint32_t increase) { -#ifndef UA_ENABLE_MULTITHREADING - *addr += increase; - return *addr; -#else - # ifdef _MSC_VER /* Visual Studio */ +#if UA_MULTITHREADING >= 200 +#ifdef _MSC_VER /* Visual Studio */ return _InterlockedExchangeAdd(addr, increase) + increase; -# else /* GCC/Clang */ +#else /* GCC/Clang */ return __sync_add_and_fetch(addr, increase); -# endif +#endif +#else + *addr += increase; + return *addr; #endif } static UA_INLINE size_t UA_atomic_addSize(volatile size_t *addr, size_t increase) { -#ifndef UA_ENABLE_MULTITHREADING - *addr += increase; - return *addr; -#else - # ifdef _MSC_VER /* Visual Studio */ +#if UA_MULTITHREADING >= 200 +#ifdef _MSC_VER /* Visual Studio */ return _InterlockedExchangeAdd(addr, increase) + increase; -# else /* GCC/Clang */ +#else /* GCC/Clang */ return __sync_add_and_fetch(addr, increase); -# endif +#endif +#else + *addr += increase; + return *addr; #endif } static UA_INLINE uint32_t UA_atomic_subUInt32(volatile uint32_t *addr, uint32_t decrease) { -#ifndef UA_ENABLE_MULTITHREADING - *addr -= decrease; - return *addr; -#else - # ifdef _MSC_VER /* Visual Studio */ +#if UA_MULTITHREADING >= 200 +#ifdef _MSC_VER /* Visual Studio */ return _InterlockedExchangeSub(addr, decrease) - decrease; -# else /* GCC/Clang */ +#else /* GCC/Clang */ return __sync_sub_and_fetch(addr, decrease); -# endif +#endif +#else + *addr -= decrease; + return *addr; #endif } static UA_INLINE size_t UA_atomic_subSize(volatile size_t *addr, size_t decrease) { -#ifndef UA_ENABLE_MULTITHREADING - *addr -= decrease; - return *addr; -#else - # ifdef _MSC_VER /* Visual Studio */ +#if UA_MULTITHREADING >= 200 +#ifdef _MSC_VER /* Visual Studio */ return _InterlockedExchangeSub(addr, decrease) - decrease; -# else /* GCC/Clang */ +#else /* GCC/Clang */ return __sync_sub_and_fetch(addr, decrease); -# endif +#endif +#else + *addr -= decrease; + return *addr; #endif } diff --git a/include/open62541/config.h.in b/include/open62541/config.h.in index 4018924bf34..d603fe09a2c 100644 --- a/include/open62541/config.h.in +++ b/include/open62541/config.h.in @@ -40,9 +40,9 @@ #cmakedefine UA_ENABLE_JSON_ENCODING /* Multithreading */ -#cmakedefine UA_ENABLE_MULTITHREADING #cmakedefine UA_ENABLE_IMMUTABLE_NODES -#if defined(UA_ENABLE_MULTITHREADING) && !defined(UA_ENABLE_IMMUTABLE_NODES) +#define UA_MULTITHREADING ${UA_MULTITHREADING} +#if UA_MULTITHREADING >= 200 && !defined(UA_ENABLE_IMMUTABLE_NODES) #error "The multithreading feature requires nodes to be immutable" #endif diff --git a/include/open62541/types.h b/include/open62541/types.h index e16c95f62f2..1d40f8640d5 100644 --- a/include/open62541/types.h +++ b/include/open62541/types.h @@ -974,7 +974,7 @@ void UA_EXPORT UA_Array_delete(void *p, size_t size, const UA_DataType *type); /** * Random Number Generator * ----------------------- - * If UA_ENABLE_MULTITHREADING is defined, then the seed is stored in thread + * If UA_MULTITHREADING is defined, then the seed is stored in thread * local storage. The seed is initialized for every thread in the * server/client. */ void UA_EXPORT UA_random_seed(UA_UInt64 seed); diff --git a/plugins/ua_log_stdout.c b/plugins/ua_log_stdout.c index 1522a64e43b..ab064eaeee1 100644 --- a/plugins/ua_log_stdout.c +++ b/plugins/ua_log_stdout.c @@ -10,7 +10,7 @@ #include -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 #include static pthread_mutex_t printf_mutex = PTHREAD_MUTEX_INITIALIZER; #endif @@ -53,7 +53,7 @@ UA_Log_Stdout_log(void *_, UA_LogLevel level, UA_LogCategory category, UA_Int64 tOffset = UA_DateTime_localTimeUtcOffset(); UA_DateTimeStruct dts = UA_DateTime_toStruct(UA_DateTime_now() + tOffset); -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 pthread_mutex_lock(&printf_mutex); #endif @@ -64,7 +64,7 @@ UA_Log_Stdout_log(void *_, UA_LogLevel level, UA_LogCategory category, printf("\n"); fflush(stdout); -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 pthread_mutex_unlock(&printf_mutex); #endif } diff --git a/plugins/ua_nodestore_default.c b/plugins/ua_nodestore_default.c index 19bedff4f5a..e9720397729 100644 --- a/plugins/ua_nodestore_default.c +++ b/plugins/ua_nodestore_default.c @@ -11,10 +11,9 @@ #ifndef UA_ENABLE_CUSTOM_NODESTORE -#ifdef UA_ENABLE_MULTITHREADING -#include -#define BEGIN_CRITSECT(NODEMAP) pthread_mutex_lock(&(NODEMAP)->mutex) -#define END_CRITSECT(NODEMAP) pthread_mutex_unlock(&(NODEMAP)->mutex) +#if UA_MULTITHREADING >= 100 +#define BEGIN_CRITSECT(NODEMAP) UA_LOCK(NODEMAP->lock) +#define END_CRITSECT(NODEMAP) UA_UNLOCK(NODEMAP->lock) #else #define BEGIN_CRITSECT(NODEMAP) do {} while(0) #define END_CRITSECT(NODEMAP) do {} while(0) @@ -59,8 +58,8 @@ typedef struct NodeTree NodeTree; typedef struct { NodeTree root; -#ifdef UA_ENABLE_MULTITHREADING - pthread_mutex_t mutex; /* Protect access */ +#if UA_MULTITHREADING >= 100 + UA_LOCK_TYPE(lock) /* Protect access */ #endif } NodeMap; @@ -158,8 +157,8 @@ void UA_Nodestore_releaseNode(void *nsCtx, const UA_Node *node) { if(!node) return; -#ifdef UA_ENABLE_MULTITHREADING - NodeMap *ns = (NodeMap*)nsCtx; +#if UA_MULTITHREADING >= 100 + NodeMap *ns = (NodeMap*)nsCtx; #endif BEGIN_CRITSECT(ns); NodeEntry *entry = container_of(node, NodeEntry, nodeId); @@ -331,8 +330,8 @@ UA_Nodestore_new(void **nsCtx) { NodeMap *nodemap = (NodeMap*)UA_malloc(sizeof(NodeMap)); if(!nodemap) return UA_STATUSCODE_BADOUTOFMEMORY; -#ifdef UA_ENABLE_MULTITHREADING - pthread_mutex_init(&nodemap->mutex, NULL); +#if UA_MULTITHREADING >= 100 + UA_LOCK_INIT(nodemap->lock) #endif ZIP_INIT(&nodemap->root); @@ -348,8 +347,8 @@ UA_Nodestore_delete(void *nsCtx) { return; NodeMap *ns = (NodeMap*)nsCtx; -#ifdef UA_ENABLE_MULTITHREADING - pthread_mutex_destroy(&ns->mutex); +#if UA_MULTITHREADING >= 100 + UA_LOCK_RELEASE(ns->lock); #endif ZIP_ITER(NodeTree, &ns->root, deleteNodeVisitor, NULL); UA_free(ns); diff --git a/src/client/ua_client_worker.c b/src/client/ua_client_worker.c index dfae2be01e4..eedc38804f2 100644 --- a/src/client/ua_client_worker.c +++ b/src/client/ua_client_worker.c @@ -138,7 +138,7 @@ UA_StatusCode UA_Client_run_iterate(UA_Client *client, UA_UInt16 timeout) { #endif asyncServiceTimeoutCheck(client); -#ifndef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING < 200 /* Process delayed callbacks when all callbacks and network events are * done */ UA_WorkQueue_manuallyProcessDelayed(&client->workQueue); diff --git a/src/server/ua_discovery_manager.h b/src/server/ua_discovery_manager.h index 3d1ae299edd..0399f998956 100644 --- a/src/server/ua_discovery_manager.h +++ b/src/server/ua_discovery_manager.h @@ -23,7 +23,7 @@ _UA_BEGIN_DECLS #ifdef UA_ENABLE_DISCOVERY typedef struct registeredServer_list_entry { -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 UA_DelayedCallback delayedCleanup; #endif LIST_ENTRY(registeredServer_list_entry) pointers; @@ -41,7 +41,7 @@ struct PeriodicServerRegisterCallback { }; typedef struct periodicServerRegisterCallback_entry { -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 UA_DelayedCallback delayedCleanup; #endif LIST_ENTRY(periodicServerRegisterCallback_entry) pointers; @@ -62,7 +62,7 @@ typedef struct periodicServerRegisterCallback_entry { */ typedef struct serverOnNetwork_list_entry { -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 UA_DelayedCallback delayedCleanup; #endif LIST_ENTRY(serverOnNetwork_list_entry) pointers; @@ -106,7 +106,7 @@ typedef struct { UA_Server_serverOnNetworkCallback serverOnNetworkCallback; void* serverOnNetworkCallbackData; -# ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 pthread_t mdnsThread; UA_Boolean mdnsRunning; # endif diff --git a/src/server/ua_server.c b/src/server/ua_server.c index d4c15924453..8e91f071173 100644 --- a/src/server/ua_server.c +++ b/src/server/ua_server.c @@ -175,6 +175,11 @@ void UA_Server_delete(UA_Server *server) { UA_DiscoveryManager_deleteMembers(&server->discoveryManager, server); #endif +#if UA_MULTITHREADING >= 100 + UA_LOCK_RELEASE(server->networkMutex) + UA_LOCK_RELEASE(server->serviceMutex) +#endif + /* Clean up the Admin Session */ UA_Session_deleteMembersCleanup(&server->adminSession, server); @@ -220,6 +225,11 @@ UA_Server_init(UA_Server *server) { UA_random_seed((UA_UInt64)UA_DateTime_now()); #endif +#if UA_MULTITHREADING >= 100 + UA_LOCK_INIT(server->networkMutex) + UA_LOCK_INIT(server->serviceMutex) +#endif + /* Initialize the handling of repeated callbacks */ UA_Timer_init(&server->timer); @@ -491,7 +501,7 @@ UA_Server_run_startup(UA_Server *server) { } /* Spin up the worker threads */ -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Spinning up %u worker thread(s)", server->config.nThreads); UA_WorkQueue_start(&server->workQueue, server->config.nThreads); @@ -511,10 +521,10 @@ UA_Server_run_startup(UA_Server *server) { static void serverExecuteRepeatedCallback(UA_Server *server, UA_ApplicationCallback cb, void *callbackApplication, void *data) { -#ifndef UA_ENABLE_MULTITHREADING - cb(callbackApplication, data); -#else +#if UA_MULTITHREADING >= 200 UA_WorkQueue_enqueue(&server->workQueue, cb, callbackApplication, data); +#else + cb(callbackApplication, data); #endif } @@ -541,7 +551,7 @@ UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) { nl->listen(nl, server, timeout); } -#if defined(UA_ENABLE_DISCOVERY_MULTICAST) && !defined(UA_ENABLE_MULTITHREADING) +#if defined(UA_ENABLE_DISCOVERY_MULTICAST) && (UA_MULTITHREADING < 200) if(server->config.discovery.mdnsEnable) { // TODO multicastNextRepeat does not consider new input data (requests) // on the socket. It will be handled on the next call. if needed, we @@ -555,7 +565,7 @@ UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) { } #endif -#ifndef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING < 200 UA_WorkQueue_manuallyProcessDelayed(&server->workQueue); #endif @@ -574,7 +584,7 @@ UA_Server_run_shutdown(UA_Server *server) { nl->stop(nl, server); } -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 /* Shut down the workers */ UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Shutting down %u worker thread(s)", diff --git a/src/server/ua_server_binary.c b/src/server/ua_server_binary.c index c2134c5e9c9..b076e1477ad 100644 --- a/src/server/ua_server_binary.c +++ b/src/server/ua_server_binary.c @@ -810,7 +810,7 @@ UA_Server_processBinaryMessage(UA_Server *server, UA_Connection *connection, UA_SecureChannel_persistIncompleteMessages(connection->channel); } -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 static void deleteConnection(UA_Server *server, UA_Connection *connection) { connection->free(connection); @@ -820,9 +820,7 @@ deleteConnection(UA_Server *server, UA_Connection *connection) { void UA_Server_removeConnection(UA_Server *server, UA_Connection *connection) { UA_Connection_detachSecureChannel(connection); -#ifndef UA_ENABLE_MULTITHREADING - connection->free(connection); -#else +#if UA_MULTITHREADING >= 200 UA_DelayedCallback *dc = (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback)); if(!dc) return; /* Malloc cannot fail on OS's that support multithreading. They @@ -831,5 +829,7 @@ UA_Server_removeConnection(UA_Server *server, UA_Connection *connection) { dc->application = server; dc->data = connection; UA_WorkQueue_enqueueDelayed(&server->workQueue, dc); +#else + connection->free(connection); #endif } diff --git a/src/server/ua_server_discovery_mdns.c b/src/server/ua_server_discovery_mdns.c index fd9c796cc7b..74a262008d0 100644 --- a/src/server/ua_server_discovery_mdns.c +++ b/src/server/ua_server_discovery_mdns.c @@ -331,13 +331,13 @@ mdns_record_remove(UA_Server *server, const char *record, if(entry->pathTmp) UA_free(entry->pathTmp); -#ifndef UA_ENABLE_MULTITHREADING - dm->serverOnNetworkSize--; - UA_free(entry); -#else +#if UA_MULTITHREADING >= 200 UA_atomic_subSize(&dm->serverOnNetworkSize, 1); entry->delayedCleanup.callback = NULL; /* Only free the structure */ UA_WorkQueue_enqueueDelayed(&server->workQueue, &entry->delayedCleanup); +#else + dm->serverOnNetworkSize--; + UA_free(entry); #endif } diff --git a/src/server/ua_server_internal.h b/src/server/ua_server_internal.h index 6f5bc6361b7..51bdf449cdc 100644 --- a/src/server/ua_server_internal.h +++ b/src/server/ua_server_internal.h @@ -108,6 +108,11 @@ struct UA_Server { #ifdef UA_ENABLE_PUBSUB UA_PubSubManager pubSubManager; #endif + +#if UA_MULTITHREADING >= 100 + UA_LOCK_TYPE(networkMutex) + UA_LOCK_TYPE(serviceMutex) +#endif }; /*****************/ diff --git a/src/server/ua_services_discovery.c b/src/server/ua_services_discovery.c index b6940611500..163b6f6db9f 100644 --- a/src/server/ua_services_discovery.c +++ b/src/server/ua_services_discovery.c @@ -410,13 +410,13 @@ process_RegisterServer(UA_Server *server, UA_Session *session, // server found, remove from list LIST_REMOVE(registeredServer_entry, pointers); UA_RegisteredServer_deleteMembers(®isteredServer_entry->registeredServer); -#ifndef UA_ENABLE_MULTITHREADING - UA_free(registeredServer_entry); - server->discoveryManager.registeredServersSize--; -#else +#if UA_MULTITHREADING >= 200 UA_atomic_subSize(&server->discoveryManager.registeredServersSize, 1); registeredServer_entry->delayedCleanup.callback = NULL; /* only free the structure */ UA_WorkQueue_enqueueDelayed(&server->workQueue, ®isteredServer_entry->delayedCleanup); +#else + UA_free(registeredServer_entry); + server->discoveryManager.registeredServersSize--; #endif responseHeader->serviceResult = UA_STATUSCODE_GOOD; return; @@ -436,10 +436,10 @@ process_RegisterServer(UA_Server *server, UA_Session *session, } LIST_INSERT_HEAD(&server->discoveryManager.registeredServers, registeredServer_entry, pointers); -#ifndef UA_ENABLE_MULTITHREADING - server->discoveryManager.registeredServersSize++; -#else +#if UA_MULTITHREADING >= 200 UA_atomic_addSize(&server->discoveryManager.registeredServersSize, 1); +#else + server->discoveryManager.registeredServersSize++; #endif } else { UA_RegisteredServer_deleteMembers(®isteredServer_entry->registeredServer); @@ -533,13 +533,13 @@ void UA_Discovery_cleanupTimedOut(UA_Server *server, UA_DateTime nowMonotonic) { } LIST_REMOVE(current, pointers); UA_RegisteredServer_deleteMembers(¤t->registeredServer); -#ifndef UA_ENABLE_MULTITHREADING - UA_free(current); - server->discoveryManager.registeredServersSize--; -#else +#if UA_MULTITHREADING >= 200 UA_atomic_subSize(&server->discoveryManager.registeredServersSize, 1); current->delayedCleanup.callback = NULL; /* Only free the structure */ UA_WorkQueue_enqueueDelayed(&server->workQueue, ¤t->delayedCleanup); +#else + UA_free(current); + server->discoveryManager.registeredServersSize--; #endif } } diff --git a/src/server/ua_services_discovery_multicast.c b/src/server/ua_services_discovery_multicast.c index 0744dc64d05..d0f59342ed1 100644 --- a/src/server/ua_services_discovery_multicast.c +++ b/src/server/ua_services_discovery_multicast.c @@ -12,7 +12,7 @@ #if defined(UA_ENABLE_DISCOVERY) && defined(UA_ENABLE_DISCOVERY_MULTICAST) -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 static void * multicastWorkerLoop(UA_Server *server) { @@ -72,7 +72,7 @@ multicastListenStop(UA_Server* server) { return UA_STATUSCODE_BADNOTIMPLEMENTED; } -# endif /* UA_ENABLE_MULTITHREADING */ +# endif /* UA_MULTITHREADING */ static UA_StatusCode addMdnsRecordForNetworkLayer(UA_Server *server, const UA_String *appName, @@ -110,7 +110,7 @@ void startMulticastDiscoveryServer(UA_Server *server) { /* find any other server on the net */ UA_Discovery_multicastQuery(server); -# ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 multicastListenStart(server); # endif } @@ -130,7 +130,7 @@ stopMulticastDiscoveryServer(UA_Server *server) { "Could not get hostname for multicast discovery."); } -# ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 multicastListenStop(server); # else // send out last package with TTL = 0 diff --git a/src/ua_workqueue.c b/src/ua_workqueue.c index d14595767da..061119940bd 100644 --- a/src/ua_workqueue.c +++ b/src/ua_workqueue.c @@ -20,7 +20,7 @@ void UA_WorkQueue_init(UA_WorkQueue *wq) { /* Initialized the linked list for delayed callbacks */ SIMPLEQ_INIT(&wq->delayedCallbacks); -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 wq->delayedCallbacks_checkpoint = NULL; pthread_mutex_init(&wq->delayedCallbacks_accessMutex, NULL); @@ -32,13 +32,13 @@ void UA_WorkQueue_init(UA_WorkQueue *wq) { #endif } -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 /* Forward declaration */ static void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq); #endif void UA_WorkQueue_cleanup(UA_WorkQueue *wq) { -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 /* Shut down workers */ UA_WorkQueue_stop(wq); @@ -60,7 +60,7 @@ void UA_WorkQueue_cleanup(UA_WorkQueue *wq) { /* All workers are shut down. Execute remaining delayed work here. */ UA_WorkQueue_manuallyProcessDelayed(wq); -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 wq->delayedCallbacks_checkpoint = NULL; pthread_mutex_destroy(&wq->dispatchQueue_accessMutex); pthread_cond_destroy(&wq->dispatchQueue_condition); @@ -73,7 +73,7 @@ void UA_WorkQueue_cleanup(UA_WorkQueue *wq) { /* Workers */ /***********/ -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 static void * workerLoop(UA_Worker *worker) { @@ -183,7 +183,7 @@ void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb, /* Delayed Callbacks */ /*********************/ -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 /* Delayed Callbacks are called only when all callbacks that were dispatched * prior are finished. After every UA_MAX_DELAYED_SAMPLE delayed Callbacks that @@ -229,13 +229,13 @@ dispatchDelayedCallbacks(UA_WorkQueue *wq, UA_DelayedCallback *cb) { void UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb) { -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 pthread_mutex_lock(&wq->dispatchQueue_accessMutex); #endif SIMPLEQ_INSERT_HEAD(&wq->delayedCallbacks, cb, next); -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 wq->delayedCallbacks_sinceDispatch++; if(wq->delayedCallbacks_sinceDispatch > UA_MAX_DELAYED_SAMPLE) { dispatchDelayedCallbacks(wq, cb); @@ -254,7 +254,7 @@ void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq) { dc->callback(dc->application, dc->data); UA_free(dc); } -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 wq->delayedCallbacks_checkpoint = NULL; #endif } diff --git a/src/ua_workqueue.h b/src/ua_workqueue.h index 6384230a1cc..af6be064121 100644 --- a/src/ua_workqueue.h +++ b/src/ua_workqueue.h @@ -20,7 +20,7 @@ #include "ua_util_internal.h" #include "open62541_queue.h" -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 #include #endif @@ -43,7 +43,7 @@ typedef struct UA_DelayedCallback { struct UA_WorkQueue; typedef struct UA_WorkQueue UA_WorkQueue; -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 /* Workers take out callbacks from the work queue and execute them. * @@ -68,7 +68,7 @@ typedef struct { struct UA_WorkQueue { /* Worker threads and work queue. Without multithreading, work is executed immediately. */ -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 UA_Worker *workers; size_t workersSize; @@ -82,7 +82,7 @@ struct UA_WorkQueue { /* Delayed callbacks * To be executed after all curretly dispatched works has finished */ SIMPLEQ_HEAD(, UA_DelayedCallback) delayedCallbacks; -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 pthread_mutex_t delayedCallbacks_accessMutex; UA_DelayedCallback *delayedCallbacks_checkpoint; size_t delayedCallbacks_sinceDispatch; /* How many have been added since we @@ -104,14 +104,7 @@ void UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb); * mutexes etc. */ void UA_WorkQueue_cleanup(UA_WorkQueue *wq); -#ifndef UA_ENABLE_MULTITHREADING - -/* Process all enqueued delayed work. This is not needed when workers are - * running for the multithreading case. (UA_WorkQueue_cleanup still calls this - * method during cleanup when the workers are shut down.) */ -void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq); - -#else +#if UA_MULTITHREADING >= 200 /* Spin up a number of worker threads that listen on the work queue */ UA_StatusCode UA_WorkQueue_start(UA_WorkQueue *wq, size_t workersCount); @@ -122,6 +115,13 @@ void UA_WorkQueue_stop(UA_WorkQueue *wq); void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb, void *application, void *data); +#else + +/* Process all enqueued delayed work. This is not needed when workers are + * running for the multithreading case. (UA_WorkQueue_cleanup still calls this + * method during cleanup when the workers are shut down.) */ +void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq); + #endif _UA_END_DECLS diff --git a/tests/server/check_nodestore.c b/tests/server/check_nodestore.c index ca5750d4ef7..ac4b27a2aed 100644 --- a/tests/server/check_nodestore.c +++ b/tests/server/check_nodestore.c @@ -17,7 +17,7 @@ #define container_of(ptr, type, member) \ (type *)((uintptr_t)ptr - offsetof(type,member)) -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 #include #endif @@ -212,7 +212,7 @@ END_TEST /* Performance Profiling Test Cases */ /************************************/ -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 struct UA_NodeStoreProfileTest { UA_Int32 min_val; UA_Int32 max_val; @@ -246,7 +246,7 @@ START_TEST(profileGetDelete) { UA_Nodestore_insertNode(nsCtx, n, NULL); } -#ifdef UA_ENABLE_MULTITHREADING +#if UA_MULTITHREADING >= 200 #define THREADS 4 pthread_t t[THREADS]; struct UA_NodeStoreProfileTest p[THREADS]; diff --git a/tools/travis/travis_linux_script.sh b/tools/travis/travis_linux_script.sh index 21589388d22..b17d74868e1 100644 --- a/tools/travis/travis_linux_script.sh +++ b/tools/travis/travis_linux_script.sh @@ -354,7 +354,7 @@ if [ "$CC" != "tcc" ]; then cmake \ -DPYTHON_EXECUTABLE:FILEPATH=/usr/bin/$PYTHON \ -DUA_BUILD_EXAMPLES=ON \ - -DUA_ENABLE_MULTITHREADING=ON .. + -DUA_MULTITHREADING=200 .. make -j if [ $? -ne 0 ] ; then exit 1 ; fi cd .. && rm build -rf @@ -392,7 +392,7 @@ if [ "$CC" != "tcc" ]; then -DUA_BUILD_EXAMPLES=ON \ -DUA_ENABLE_DISCOVERY=ON \ -DUA_ENABLE_DISCOVERY_MULTICAST=ON \ - -DUA_ENABLE_MULTITHREADING=ON .. + -DUA_MULTITHREADING=200 .. make -j if [ $? -ne 0 ] ; then exit 1 ; fi cd .. && rm build -rf diff --git a/tools/travis/travis_osx_script.sh b/tools/travis/travis_osx_script.sh index bfeca749466..0e31181bfe2 100644 --- a/tools/travis/travis_osx_script.sh +++ b/tools/travis/travis_osx_script.sh @@ -48,7 +48,7 @@ echo "Compile multithreaded version" && echo -en 'travis_fold:start:script.build mkdir -p build && cd build cmake \ -DUA_BUILD_EXAMPLES=ON \ - -DUA_ENABLE_MULTITHREADING=ON .. + -DUA_MULTITHREADING=200 .. make -j if [ $? -ne 0 ] ; then exit 1 ; fi cd .. && rm -rf build