Skip to content

Commit

Permalink
feat(multithreading): Locks for subscription and browsing services
Browse files Browse the repository at this point in the history
  • Loading branch information
ckmk14 authored and jpfr committed Aug 27, 2019
1 parent 30c8b7d commit 6050ffe
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 40 deletions.
8 changes: 4 additions & 4 deletions include/open62541/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ typedef void (*UA_Server_EventNotificationCallback)
* @return Returns a description of the created MonitoredItem. The structure
* also contains a StatusCode (in case of an error) and the identifier of the
* new MonitoredItem. */
UA_MonitoredItemCreateResult UA_EXPORT
UA_MonitoredItemCreateResult UA_EXPORT UA_THREADSAFE
UA_Server_createDataChangeMonitoredItem(UA_Server *server,
UA_TimestampsToReturn timestampsToReturn,
const UA_MonitoredItemCreateRequest item,
Expand All @@ -923,7 +923,7 @@ UA_Server_createDataChangeMonitoredItem(UA_Server *server,
/* const UA_MonitoredItemCreateRequest item, void *context, */
/* UA_Server_EventNotificationCallback callback); */

UA_StatusCode UA_EXPORT
UA_StatusCode UA_EXPORT UA_THREADSAFE
UA_Server_deleteMonitoredItem(UA_Server *server, UA_UInt32 monitoredItemId);

#endif
Expand Down Expand Up @@ -1315,7 +1315,7 @@ UA_Server_deleteReference(UA_Server *server, const UA_NodeId sourceNodeId,
* @param eventType The type of the event for which a node should be created
* @param outNodeId The NodeId of the newly created node for the event
* @return The StatusCode of the UA_Server_createEvent method */
UA_StatusCode UA_EXPORT
UA_StatusCode UA_EXPORT UA_THREADSAFE
UA_Server_createEvent(UA_Server *server, const UA_NodeId eventType,
UA_NodeId *outNodeId);

Expand All @@ -1326,7 +1326,7 @@ UA_Server_createEvent(UA_Server *server, const UA_NodeId eventType,
* @param outEvent the EventId of the new event
* @param deleteEventNode Specifies whether the node representation of the event should be deleted
* @return The StatusCode of the UA_Server_triggerEvent method */
UA_StatusCode UA_EXPORT
UA_StatusCode UA_EXPORT UA_THREADSAFE
UA_Server_triggerEvent(UA_Server *server, const UA_NodeId eventNodeId, const UA_NodeId originId,
UA_ByteString *outEventId, const UA_Boolean deleteEventNode);

Expand Down
7 changes: 7 additions & 0 deletions src/server/ua_server_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ readWithReadValue(UA_Server *server, const UA_NodeId *nodeId,
UA_BrowsePathResult
translateBrowsePathToNodeIds(UA_Server *server, const UA_BrowsePath *browsePath);

void
monitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem);

UA_BrowsePathResult
browseSimplifiedBrowsePath(UA_Server *server, const UA_NodeId origin,
size_t browsePathSize, const UA_QualifiedName *browsePath);

/***************************************/
/* Check Information Model Consistency */
/***************************************/
Expand Down
4 changes: 2 additions & 2 deletions src/server/ua_services_attribute.c
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ UA_Server_readObjectProperty(UA_Server *server, const UA_NodeId objectId,
bp.relativePath.elements = &rpe;

UA_StatusCode retval;
UA_BrowsePathResult bpr = UA_Server_translateBrowsePathToNodeIds(server, &bp);
UA_BrowsePathResult bpr = translateBrowsePathToNodeIds(server, &bp);
if(bpr.statusCode != UA_STATUSCODE_GOOD || bpr.targetsSize < 1) {
retval = bpr.statusCode;
UA_BrowsePathResult_deleteMembers(&bpr);
Expand Down Expand Up @@ -1669,7 +1669,7 @@ UA_Server_writeObjectProperty(UA_Server *server, const UA_NodeId objectId,
bp.relativePath.elements = &rpe;

UA_StatusCode retval;
UA_BrowsePathResult bpr = UA_Server_translateBrowsePathToNodeIds(server, &bp);
UA_BrowsePathResult bpr = translateBrowsePathToNodeIds(server, &bp);
if(bpr.statusCode != UA_STATUSCODE_GOOD || bpr.targetsSize < 1) {
retval = bpr.statusCode;
UA_BrowsePathResult_deleteMembers(&bpr);
Expand Down
9 changes: 8 additions & 1 deletion src/server/ua_services_monitoreditem.c
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,12 @@ Operation_CreateMonitoredItem(UA_Server *server, UA_Session *session, struct cre
if(server->config.monitoredItemRegisterCallback) {
void *targetContext = NULL;
UA_Server_getNodeContext(server, request->itemToMonitor.nodeId, &targetContext);
UA_UNLOCK(server->serviceMutex);
server->config.monitoredItemRegisterCallback(server, &session->sessionId,
session->sessionHandle,
&request->itemToMonitor.nodeId,
targetContext, newMon->attributeId, false);
UA_LOCK(server->serviceMutex);
newMon->registered = true;
}

Expand All @@ -263,7 +265,7 @@ Operation_CreateMonitoredItem(UA_Server *server, UA_Session *session, struct cre
/* Create the first sample */
if(request->monitoringMode == UA_MONITORINGMODE_REPORTING &&
newMon->attributeId != UA_ATTRIBUTEID_EVENTNOTIFIER)
UA_MonitoredItem_sampleCallback(server, newMon);
monitoredItem_sampleCallback(server, newMon);

/* Prepare the response */
result->revisedSamplingInterval = newMon->samplingInterval;
Expand Down Expand Up @@ -321,7 +323,9 @@ UA_Server_createDataChangeMonitoredItem(UA_Server *server,

UA_MonitoredItemCreateResult result;
UA_MonitoredItemCreateResult_init(&result);
UA_LOCK(server->serviceMutex);
Operation_CreateMonitoredItem(server, &server->adminSession, &cmc, &item, &result);
UA_UNLOCK(server->serviceMutex);
return result;
}

Expand Down Expand Up @@ -544,14 +548,17 @@ Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,

UA_StatusCode
UA_Server_deleteMonitoredItem(UA_Server *server, UA_UInt32 monitoredItemId) {
UA_LOCK(server->serviceMutex);
UA_MonitoredItem *mon;
LIST_FOREACH(mon, &server->localMonitoredItems, listEntry) {
if(mon->monitoredItemId != monitoredItemId)
continue;
LIST_REMOVE(mon, listEntry);
UA_MonitoredItem_delete(server, mon);
UA_UNLOCK(server->serviceMutex);
return UA_STATUSCODE_GOOD;
}
UA_UNLOCK(server->serviceMutex);
return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
}

Expand Down
24 changes: 21 additions & 3 deletions src/server/ua_services_view.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ UA_StatusCode
UA_Server_browseRecursive(UA_Server *server, const UA_BrowseDescription *bd,
size_t *resultsSize, UA_ExpandedNodeId **results) {
/* Set the list of relevant reference types */
UA_LOCK(server->serviceMutex);
UA_NodeId *refTypes = NULL;
size_t refTypesSize = 0;
UA_StatusCode retval = UA_STATUSCODE_GOOD;
Expand All @@ -299,8 +300,10 @@ UA_Server_browseRecursive(UA_Server *server, const UA_BrowseDescription *bd,
} else {
retval = referenceSubtypes(server, &bd->referenceTypeId,
&refTypesSize, &refTypes);
if(retval != UA_STATUSCODE_GOOD)
if(retval != UA_STATUSCODE_GOOD) {
UA_UNLOCK(server->serviceMutex);
return retval;
}
}
}

Expand All @@ -311,6 +314,8 @@ UA_Server_browseRecursive(UA_Server *server, const UA_BrowseDescription *bd,
/* Clean up */
if(refTypes && bd->includeSubtypes)
UA_Array_delete(refTypes, refTypesSize, &UA_TYPES[UA_TYPES_NODEID]);

UA_UNLOCK(server->serviceMutex);
return retval;
}

Expand Down Expand Up @@ -724,7 +729,9 @@ UA_Server_browse(UA_Server *server, UA_UInt32 maxReferences,
const UA_BrowseDescription *bd) {
UA_BrowseResult result;
UA_BrowseResult_init(&result);
UA_LOCK(server->serviceMutex);
Operation_Browse(server, &server->adminSession, &maxReferences, bd, &result);
UA_UNLOCK(server->serviceMutex);
return result;
}

Expand Down Expand Up @@ -789,8 +796,10 @@ UA_Server_browseNext(UA_Server *server, UA_Boolean releaseContinuationPoint,
const UA_ByteString *continuationPoint) {
UA_BrowseResult result;
UA_BrowseResult_init(&result);
UA_LOCK(server->serviceMutex);
Operation_BrowseNext(server, &server->adminSession, &releaseContinuationPoint,
continuationPoint, &result);
UA_UNLOCK(server->serviceMutex);
return result;
}

Expand Down Expand Up @@ -1143,8 +1152,8 @@ Service_TranslateBrowsePathsToNodeIds(UA_Server *server, UA_Session *session,
}

UA_BrowsePathResult
UA_Server_browseSimplifiedBrowsePath(UA_Server *server, const UA_NodeId origin,
size_t browsePathSize, const UA_QualifiedName *browsePath) {
browseSimplifiedBrowsePath(UA_Server *server, const UA_NodeId origin,
size_t browsePathSize, const UA_QualifiedName *browsePath) {
/* Construct the BrowsePath */
UA_BrowsePath bp;
UA_BrowsePath_init(&bp);
Expand All @@ -1167,6 +1176,15 @@ UA_Server_browseSimplifiedBrowsePath(UA_Server *server, const UA_NodeId origin,
return bpr;
}

UA_BrowsePathResult
UA_Server_browseSimplifiedBrowsePath(UA_Server *server, const UA_NodeId origin,
size_t browsePathSize, const UA_QualifiedName *browsePath) {
UA_LOCK(server->serviceMutex);
UA_BrowsePathResult bpr = browseSimplifiedBrowsePath(server, origin, browsePathSize, browsePath);;
UA_UNLOCK(server->serviceMutex);
return bpr;
}

/************/
/* Register */
/************/
Expand Down
14 changes: 12 additions & 2 deletions src/server/ua_subscription_datachange.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ detectValueChangeWithFilter(UA_Server *server, UA_Session *session, UA_Monitored
else if(mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_PERCENT) {
/* Browse for the percent range */
UA_QualifiedName qn = UA_QUALIFIEDNAME(0, "EURange");
UA_BrowsePathResult bpr = UA_Server_browseSimplifiedBrowsePath(server, mon->monitoredNodeId, 1, &qn);
UA_BrowsePathResult bpr = browseSimplifiedBrowsePath(server, mon->monitoredNodeId, 1, &qn);
if(bpr.statusCode != UA_STATUSCODE_GOOD || bpr.targetsSize < 1) {
UA_BrowsePathResult_deleteMembers(&bpr);
return UA_STATUSCODE_GOOD;
Expand Down Expand Up @@ -306,18 +306,28 @@ sampleCallbackWithValue(UA_Server *server, UA_Session *session,
UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*) mon;
void *nodeContext = NULL;
UA_Server_getNodeContext(server, mon->monitoredNodeId, &nodeContext);
UA_UNLOCK(server->serviceMutex);
localMon->callback.dataChangeCallback(server, mon->monitoredItemId,
localMon->context,
&mon->monitoredNodeId,
nodeContext, mon->attributeId,
value);
UA_LOCK(server->serviceMutex);
}

return UA_STATUSCODE_GOOD;
}

void
UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem)
{
UA_LOCK(server->serviceMutex);
monitoredItem_sampleCallback(server, monitoredItem);
UA_UNLOCK(server->serviceMutex)
}

void
monitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
UA_Subscription *sub = monitoredItem->subscription;
UA_Session *session = &server->adminSession;
if(sub)
Expand Down
Loading

0 comments on commit 6050ffe

Please sign in to comment.