Skip to content

Commit

Permalink
feat(server): Adding multithreading architecture with additional refa…
Browse files Browse the repository at this point in the history
…ctoring
  • Loading branch information
ckmk14 authored and jpfr committed Aug 19, 2019
1 parent 7f3271d commit 2b3a8cb
Show file tree
Hide file tree
Showing 23 changed files with 209 additions and 139 deletions.
14 changes: 7 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ if(UA_ENABLE_DISCOVERY_MULTICAST AND NOT UA_ENABLE_DISCOVERY)
endif()

# Advanced options
option(UA_ENABLE_MULTITHREADING "Enable multithreading (EXPERIMENTAL)" OFF)
mark_as_advanced(UA_ENABLE_MULTITHREADING)

option(UA_ENABLE_IMMUTABLE_NODES "Nodes in the information model are not edited but copied and replaced" OFF)
mark_as_advanced(UA_ENABLE_IMMUTABLE_NODES)
if(UA_ENABLE_MULTITHREADING)

set(UA_MULTITHREADING 0 CACHE STRING "Level of multithreading")
mark_as_advanced(UA_MULTITHREADING)
if (UA_MULTITHREADING GREATER 100)
set(UA_ENABLE_IMMUTABLE_NODES ON)
endif()

Expand Down Expand Up @@ -356,8 +356,8 @@ if(BUILD_SHARED_LIBS)
endif()

# Warn if experimental features are enabled
if(UA_ENABLE_MULTITHREADING)
MESSAGE(WARNING "UA_ENABLE_MULTITHREADING is enabled. The feature is under development and marked as EXPERIMENTAL")
if(UA_MULTITHREADING)
MESSAGE(WARNING "UA_MULTITHREADING is enabled. The feature is under development and marked as EXPERIMENTAL")
endif()

########################
Expand Down Expand Up @@ -1154,7 +1154,7 @@ endif()
if(UA_ENABLE_SUBSCRIPTIONS_EVENTS)
list(APPEND open62541_enabled_components "Events")
endif()
if(UA_ENABLE_MULTITHREADING)
if(UA_MULTITHREADING)
list(APPEND open62541_enabled_components "Multithreading")
endif()
if(UA_ENABLE_DISCOVERY)
Expand Down
2 changes: 1 addition & 1 deletion arch/posix/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ if (${_index} GREATER -1 OR "${UA_ARCHITECTURE}" STREQUAL "posix")
ua_architecture_append_to_library(m)
#TODO - Error on first make run if pthread is included conditional?
ua_architecture_append_to_library(pthread)
if(UA_ENABLE_MULTITHREADING OR UA_BUILD_UNIT_TESTS)
if(UA_MULTITHREADING OR UA_BUILD_UNIT_TESTS)
ua_architecture_append_to_library(pthread)
endif()
if(NOT APPLE AND (NOT ${CMAKE_SYSTEM_NAME} MATCHES "OpenBSD"))
Expand Down
30 changes: 30 additions & 0 deletions arch/posix/ua_architecture.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,36 @@ extern void * (*UA_globalRealloc)(void *ptr, size_t size);
LOG; \
}

#if UA_MULTITHREADING >= 100
#include <pthread.h>
#define UA_LOCK_TYPE_NAME pthread_mutex_t
#define UA_LOCK_TYPE(mutexName) pthread_mutex_t mutexName; \
pthread_mutexattr_t mutexName##_attr; \
int mutexName##Counter;
#define UA_LOCK_INIT(mutexName) pthread_mutexattr_init(&mutexName##_attr); \
pthread_mutexattr_settype(&mutexName##_attr, PTHREAD_MUTEX_RECURSIVE); \
pthread_mutex_init(&mutexName, &mutexName##_attr); \
mutexName##Counter = 0;
#define UA_LOCK_RELEASE(mutexName) pthread_mutex_destroy(&mutexName); \
pthread_mutexattr_destroy(&mutexName##_attr);

#define UA_LOCK(mutexName) pthread_mutex_lock(&mutexName); \
UA_assert(++(mutexName##Counter) == 1); \

#define UA_UNLOCK(mutexName) UA_assert(--(mutexName##Counter) == 0); \
pthread_mutex_unlock(&mutexName);
#define UA_LOCK_SWITCH(currentMutex, newMutex) UA_UNLOCK(currentMutex) \
UA_LOCK(newMutex)
#else
#define UA_LOCK_TYPE_NAME
#define UA_LOCK_TYPE(mutexName)
#define UA_LOCK_INIT(mutexName)
#define UA_LOCK_RELEASE(mutexName)
#define UA_LOCK(mutexName)
#define UA_UNLOCK(mutexName)
#define UA_LOCK_SWITCH(currentMutex, newMutex)
#endif

#include <open62541/architecture_functions.h>

#if defined(__APPLE__) && defined(_SYS_QUEUE_H_)
Expand Down
24 changes: 24 additions & 0 deletions arch/win32/ua_architecture.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,30 @@
}
#define UA_LOG_SOCKET_ERRNO_GAI_WRAP UA_LOG_SOCKET_ERRNO_WRAP

#if UA_MULTITHREADING >= 100
#define UA_LOCK_TYPE_NAME CRITICAL_SECTION
#define UA_LOCK_TYPE(mutexName) CRITICAL_SECTION mutexName; \
int mutexName##Counter;
#define UA_LOCK_INIT(mutexName) InitializeCriticalSection(&mutexName); \
mutexName##Counter = 0;;
#define UA_LOCK_RELEASE(mutexName) DeleteCriticalSection(&mutexName);
#define UA_LOCK(mutexName) EnterCriticalSection(&mutexName); \
UA_assert(++(mutexName##Counter) == 1);
#define UA_UNLOCK(mutexName) UA_assert(--(mutexName##Counter) == 0); \
LeaveCriticalSection(&mutexName);
#define UA_LOCK_SWITCH(currentMutex, newMutex) UA_UNLOCK(currentMutex) \
UA_LOCK(newMutex)
#else
#define UA_LOCK_TYPE_NAME
#define UA_LOCK_TYPE(mutexName)
#define UA_LOCK_TYPE_POINTER(mutexName)
#define UA_LOCK_INIT(mutexName)
#define UA_LOCK_RELEASE(mutexName)
#define UA_LOCK(mutexName)
#define UA_UNLOCK(mutexName)
#define UA_LOCK_SWITCH(currentMutex, newMutex)
#endif

#include <open62541/architecture_functions.h>

/* Fix redefinition of SLIST_ENTRY on mingw winnt.h */
Expand Down
4 changes: 2 additions & 2 deletions doc/building.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ Detailed SDK Features
**UA_ENABLE_AMALGAMATION**
Compile a single-file release into the files :file:`open62541.c` and :file:`open62541.h`. Not receommended for installation.

**UA_ENABLE_MULTITHREADING (EXPERIMENTAL)**
**UA_MULTITHREADING (EXPERIMENTAL)**
Enable multi-threading support. Work is distributed to a number of worker threads.
This is a new feature and currently marked as EXPERIMENTAL.

Expand All @@ -200,7 +200,7 @@ Detailed SDK Features
replacement is done with atomic operations so that the information model is
always consistent and can be accessed from an interrupt or parallel thread
(depends on the node storage plugin implementation). This feature is a
prerequisite for ``UA_ENABLE_MULTITHREADING``.
prerequisite for ``UA_MULTITHREADING``.

**UA_ENABLE_COVERAGE**
Measure the coverage of unit tests
Expand Down
100 changes: 51 additions & 49 deletions include/open62541/architecture_definitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#ifndef ARCH_UA_ARCHITECTURE_DEFINITIONS_H_
#define ARCH_UA_ARCHITECTURE_DEFINITIONS_H_

#include <open62541/config.h>

/**
* C99 Definitions
* --------------- */
Expand Down Expand Up @@ -273,101 +275,101 @@ UA_STATIC_ASSERT(sizeof(bool) == 1, cannot_overlay_integers_with_large_bool);
* Atomic operations that synchronize across processor cores (for
* multithreading). Only the inline-functions defined next are used. Replace
* with architecture-specific operations if necessary. */
#ifndef UA_ENABLE_MULTITHREADING
# define UA_atomic_sync()
#if UA_MULTITHREADING >= 200
#ifdef _MSC_VER /* Visual Studio */
#define UA_atomic_sync() _ReadWriteBarrier()
#else /* GCC/Clang */
#define UA_atomic_sync() __sync_synchronize()
#endif
#else
# ifdef _MSC_VER /* Visual Studio */
# define UA_atomic_sync() _ReadWriteBarrier()
# else /* GCC/Clang */
# define UA_atomic_sync() __sync_synchronize()
# endif
#define UA_atomic_sync()
#endif

static UA_INLINE void *
UA_atomic_xchg(void * volatile * addr, void *newptr) {
#ifndef UA_ENABLE_MULTITHREADING
#if UA_MULTITHREADING >= 200
#ifdef _MSC_VER /* Visual Studio */
return _InterlockedExchangePointer(addr, newptr);
#else /* GCC/Clang */
return __sync_lock_test_and_set(addr, newptr);
#endif
#else
void *old = *addr;
*addr = newptr;
return old;
#else
# ifdef _MSC_VER /* Visual Studio */
return _InterlockedExchangePointer(addr, newptr);
# else /* GCC/Clang */
return __sync_lock_test_and_set(addr, newptr);
# endif
#endif
}

static UA_INLINE void *
UA_atomic_cmpxchg(void * volatile * addr, void *expected, void *newptr) {
#ifndef UA_ENABLE_MULTITHREADING
#if UA_MULTITHREADING >= 200
#ifdef _MSC_VER /* Visual Studio */
return _InterlockedCompareExchangePointer(addr, expected, newptr);
#else /* GCC/Clang */
return __sync_val_compare_and_swap(addr, expected, newptr);
#endif
#else
void *old = *addr;
if(old == expected) {
*addr = newptr;
}
return old;
#else
# ifdef _MSC_VER /* Visual Studio */
return _InterlockedCompareExchangePointer(addr, expected, newptr);
# else /* GCC/Clang */
return __sync_val_compare_and_swap(addr, expected, newptr);
# endif
#endif
}

static UA_INLINE uint32_t
UA_atomic_addUInt32(volatile uint32_t *addr, uint32_t increase) {
#ifndef UA_ENABLE_MULTITHREADING
*addr += increase;
return *addr;
#else
# ifdef _MSC_VER /* Visual Studio */
#if UA_MULTITHREADING >= 200
#ifdef _MSC_VER /* Visual Studio */
return _InterlockedExchangeAdd(addr, increase) + increase;
# else /* GCC/Clang */
#else /* GCC/Clang */
return __sync_add_and_fetch(addr, increase);
# endif
#endif
#else
*addr += increase;
return *addr;
#endif
}

static UA_INLINE size_t
UA_atomic_addSize(volatile size_t *addr, size_t increase) {
#ifndef UA_ENABLE_MULTITHREADING
*addr += increase;
return *addr;
#else
# ifdef _MSC_VER /* Visual Studio */
#if UA_MULTITHREADING >= 200
#ifdef _MSC_VER /* Visual Studio */
return _InterlockedExchangeAdd(addr, increase) + increase;
# else /* GCC/Clang */
#else /* GCC/Clang */
return __sync_add_and_fetch(addr, increase);
# endif
#endif
#else
*addr += increase;
return *addr;
#endif
}

static UA_INLINE uint32_t
UA_atomic_subUInt32(volatile uint32_t *addr, uint32_t decrease) {
#ifndef UA_ENABLE_MULTITHREADING
*addr -= decrease;
return *addr;
#else
# ifdef _MSC_VER /* Visual Studio */
#if UA_MULTITHREADING >= 200
#ifdef _MSC_VER /* Visual Studio */
return _InterlockedExchangeSub(addr, decrease) - decrease;
# else /* GCC/Clang */
#else /* GCC/Clang */
return __sync_sub_and_fetch(addr, decrease);
# endif
#endif
#else
*addr -= decrease;
return *addr;
#endif
}

static UA_INLINE size_t
UA_atomic_subSize(volatile size_t *addr, size_t decrease) {
#ifndef UA_ENABLE_MULTITHREADING
*addr -= decrease;
return *addr;
#else
# ifdef _MSC_VER /* Visual Studio */
#if UA_MULTITHREADING >= 200
#ifdef _MSC_VER /* Visual Studio */
return _InterlockedExchangeSub(addr, decrease) - decrease;
# else /* GCC/Clang */
#else /* GCC/Clang */
return __sync_sub_and_fetch(addr, decrease);
# endif
#endif
#else
*addr -= decrease;
return *addr;
#endif
}

Expand Down
4 changes: 2 additions & 2 deletions include/open62541/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
#cmakedefine UA_ENABLE_JSON_ENCODING

/* Multithreading */
#cmakedefine UA_ENABLE_MULTITHREADING
#cmakedefine UA_ENABLE_IMMUTABLE_NODES
#if defined(UA_ENABLE_MULTITHREADING) && !defined(UA_ENABLE_IMMUTABLE_NODES)
#define UA_MULTITHREADING ${UA_MULTITHREADING}
#if UA_MULTITHREADING >= 200 && !defined(UA_ENABLE_IMMUTABLE_NODES)
#error "The multithreading feature requires nodes to be immutable"
#endif

Expand Down
2 changes: 1 addition & 1 deletion include/open62541/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ void UA_EXPORT UA_Array_delete(void *p, size_t size, const UA_DataType *type);
/**
* Random Number Generator
* -----------------------
* If UA_ENABLE_MULTITHREADING is defined, then the seed is stored in thread
* If UA_MULTITHREADING is defined, then the seed is stored in thread
* local storage. The seed is initialized for every thread in the
* server/client. */
void UA_EXPORT UA_random_seed(UA_UInt64 seed);
Expand Down
6 changes: 3 additions & 3 deletions plugins/ua_log_stdout.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

#include <stdio.h>

#ifdef UA_ENABLE_MULTITHREADING
#if UA_MULTITHREADING >= 200
#include <pthread.h>
static pthread_mutex_t printf_mutex = PTHREAD_MUTEX_INITIALIZER;
#endif
Expand Down Expand Up @@ -53,7 +53,7 @@ UA_Log_Stdout_log(void *_, UA_LogLevel level, UA_LogCategory category,
UA_Int64 tOffset = UA_DateTime_localTimeUtcOffset();
UA_DateTimeStruct dts = UA_DateTime_toStruct(UA_DateTime_now() + tOffset);

#ifdef UA_ENABLE_MULTITHREADING
#if UA_MULTITHREADING >= 200
pthread_mutex_lock(&printf_mutex);
#endif

Expand All @@ -64,7 +64,7 @@ UA_Log_Stdout_log(void *_, UA_LogLevel level, UA_LogCategory category,
printf("\n");
fflush(stdout);

#ifdef UA_ENABLE_MULTITHREADING
#if UA_MULTITHREADING >= 200
pthread_mutex_unlock(&printf_mutex);
#endif
}
Expand Down
23 changes: 11 additions & 12 deletions plugins/ua_nodestore_default.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@

#ifndef UA_ENABLE_CUSTOM_NODESTORE

#ifdef UA_ENABLE_MULTITHREADING
#include <pthread.h>
#define BEGIN_CRITSECT(NODEMAP) pthread_mutex_lock(&(NODEMAP)->mutex)
#define END_CRITSECT(NODEMAP) pthread_mutex_unlock(&(NODEMAP)->mutex)
#if UA_MULTITHREADING >= 100
#define BEGIN_CRITSECT(NODEMAP) UA_LOCK(NODEMAP->lock)
#define END_CRITSECT(NODEMAP) UA_UNLOCK(NODEMAP->lock)
#else
#define BEGIN_CRITSECT(NODEMAP) do {} while(0)
#define END_CRITSECT(NODEMAP) do {} while(0)
Expand Down Expand Up @@ -59,8 +58,8 @@ typedef struct NodeTree NodeTree;

typedef struct {
NodeTree root;
#ifdef UA_ENABLE_MULTITHREADING
pthread_mutex_t mutex; /* Protect access */
#if UA_MULTITHREADING >= 100
UA_LOCK_TYPE(lock) /* Protect access */
#endif
} NodeMap;

Expand Down Expand Up @@ -158,8 +157,8 @@ void
UA_Nodestore_releaseNode(void *nsCtx, const UA_Node *node) {
if(!node)
return;
#ifdef UA_ENABLE_MULTITHREADING
NodeMap *ns = (NodeMap*)nsCtx;
#if UA_MULTITHREADING >= 100
NodeMap *ns = (NodeMap*)nsCtx;
#endif
BEGIN_CRITSECT(ns);
NodeEntry *entry = container_of(node, NodeEntry, nodeId);
Expand Down Expand Up @@ -331,8 +330,8 @@ UA_Nodestore_new(void **nsCtx) {
NodeMap *nodemap = (NodeMap*)UA_malloc(sizeof(NodeMap));
if(!nodemap)
return UA_STATUSCODE_BADOUTOFMEMORY;
#ifdef UA_ENABLE_MULTITHREADING
pthread_mutex_init(&nodemap->mutex, NULL);
#if UA_MULTITHREADING >= 100
UA_LOCK_INIT(nodemap->lock)
#endif

ZIP_INIT(&nodemap->root);
Expand All @@ -348,8 +347,8 @@ UA_Nodestore_delete(void *nsCtx) {
return;

NodeMap *ns = (NodeMap*)nsCtx;
#ifdef UA_ENABLE_MULTITHREADING
pthread_mutex_destroy(&ns->mutex);
#if UA_MULTITHREADING >= 100
UA_LOCK_RELEASE(ns->lock);
#endif
ZIP_ITER(NodeTree, &ns->root, deleteNodeVisitor, NULL);
UA_free(ns);
Expand Down
Loading

0 comments on commit 2b3a8cb

Please sign in to comment.