Skip to content

Commit

Permalink
PubSub: DataSetReader parameters check in Subscriber
Browse files Browse the repository at this point in the history
 - Check identifiers to direct DataSetMessage to desired DataSetReader
     - PublisherId
     - WriterGroupId
     - DataSetWriterId

Change-Id: Ibfa48d45833a6629134f89648b675441a32f39b9
  • Loading branch information
opcua-tsn-team-kalycito authored and jpfr committed Aug 20, 2019
1 parent bb2e563 commit 43574aa
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 20 deletions.
11 changes: 10 additions & 1 deletion examples/pubsub/tutorial_pubsub_subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,16 @@ addDataSetReader(UA_Server *server) {
UA_StatusCode retval = UA_STATUSCODE_GOOD;
memset (&readerConfig, 0, sizeof(UA_DataSetReaderConfig));
readerConfig.name = UA_STRING("DataSet Reader 1");
readerConfig.dataSetWriterId = 1;
/* Parameters to filter which DataSetMessage has to be processed
* by the DataSetReader */
/* The following parameters are used to show that the data published by
* tutorial_pubsub_publish.c is being subscribed and is being updated in
* the information model */
UA_UInt16 publisherIdentifier = 2234;
readerConfig.publisherId.type = &UA_TYPES[UA_TYPES_UINT16];
readerConfig.publisherId.data = &publisherIdentifier;
readerConfig.writerGroupId = 100;
readerConfig.dataSetWriterId = 62541;

/* Setting up Meta data configuration in DataSetReader */
fillTestDataSetMetaData(&readerConfig.dataSetMetaData);
Expand Down
66 changes: 47 additions & 19 deletions src/pubsub/ua_pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ static void
UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup);
static void
UA_DataSetField_deleteMembers(UA_DataSetField *field);
/* To direct the DataSetMessage to the desired DataSetReader by checking the
* WriterGroupId and DataSetWriterId parameters */
static UA_DataSetReader *
checkReaderIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_DataSetReader *tmpReader);

/**********************************************/
/* Connection */
Expand Down Expand Up @@ -389,15 +393,10 @@ UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,

static UA_DataSetReader *
getReaderFromIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_PubSubConnection *pConnection) {
if(pConnection->readerGroupsSize == 1) {
if(LIST_FIRST(&pConnection->readerGroups)->readersCount == 1) {
UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "only 1 DataSetReader available. This one will be used.");
return LIST_FIRST(&LIST_FIRST(&pConnection->readerGroups)->readers);
}
}

if(!pMsg->publisherIdEnabled)
if(!pMsg->publisherIdEnabled) {
UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Cannot process DataSetReader without PublisherId");
return NULL;
}

UA_ReaderGroup* readerGroup;
LIST_FOREACH(readerGroup, &pConnection->readerGroups, listEntry) {
Expand All @@ -408,35 +407,40 @@ getReaderFromIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_PubSubCon
if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_BYTE] &&
pMsg->publisherIdType == UA_PUBLISHERDATATYPE_BYTE &&
pMsg->publisherId.publisherIdByte == *(UA_Byte*)tmpReader->config.publisherId.data) {
return tmpReader;
UA_DataSetReader* processReader = checkReaderIdentifier(server, pMsg, tmpReader);
return processReader;
}
break;
case UA_PUBLISHERDATATYPE_UINT16:
if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT16] &&
pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT16 &&
pMsg->publisherId.publisherIdUInt16 == *(UA_UInt16*)tmpReader->config.publisherId.data) {
return tmpReader;
UA_DataSetReader* processReader = checkReaderIdentifier(server, pMsg, tmpReader);
return processReader;
}
break;
case UA_PUBLISHERDATATYPE_UINT32:
if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT32] &&
pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT32 &&
pMsg->publisherId.publisherIdUInt32 == *(UA_UInt32*)tmpReader->config.publisherId.data) {
return tmpReader;
UA_DataSetReader* processReader = checkReaderIdentifier(server, pMsg, tmpReader);
return processReader;
}
break;
case UA_PUBLISHERDATATYPE_UINT64:
if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT64] &&
pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT64 &&
pMsg->publisherId.publisherIdUInt64 == *(UA_UInt64*)tmpReader->config.publisherId.data) {
return tmpReader;
UA_DataSetReader* processReader = checkReaderIdentifier(server, pMsg, tmpReader);
return processReader;
}
break;
case UA_PUBLISHERDATATYPE_STRING:
if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_STRING] &&
pMsg->publisherIdType == UA_PUBLISHERDATATYPE_STRING &&
UA_String_equal(&pMsg->publisherId.publisherIdString, (UA_String*)tmpReader->config.publisherId.data)) {
return tmpReader;
UA_DataSetReader* processReader = checkReaderIdentifier(server, pMsg, tmpReader);
return processReader;
}
break;
default:
Expand All @@ -448,6 +452,32 @@ getReaderFromIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_PubSubCon
return NULL;
}

/**
* Check DataSetReader parameters.
*
* @param server
* @param NetworkMessage
* @param DataSetReader
* @return DataSetReader on success
*/
static UA_DataSetReader *
checkReaderIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_DataSetReader *tmpReader) {
if(!pMsg->groupHeaderEnabled && !pMsg->groupHeader.writerGroupIdEnabled && !pMsg->payloadHeaderEnabled) {
UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Cannot process DataSetReader without WriterGroup"
"and DataSetWriter identifiers");
return NULL;
}
else {
if((tmpReader->config.writerGroupId == pMsg->groupHeader.writerGroupId) &&
(tmpReader->config.dataSetWriterId == *pMsg->payloadHeader.dataSetPayloadHeader.dataSetWriterIds)) {
UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetReader found. Process NetworkMessage");
return tmpReader;
}
}

return NULL;
}

/**
* Process NetworkMessage.
*
Expand All @@ -461,15 +491,14 @@ UA_Server_processNetworkMessage(UA_Server *server, UA_NetworkMessage *pMsg,
if(!pMsg || !pConnection)
return UA_STATUSCODE_BADINVALIDARGUMENT;

/* To Do The condition with dataSetWriterIdAvailable and WriterGroupIdAvailable to be handled
* when pMsg->groupHeaderEnabled, pMsg->dataSetClassIdEnabled, pMsg->payloadHeaderEnabled
/* To Do Handle multiple DataSetMessage for one NetworkMessage */
/* To Do The condition pMsg->dataSetClassIdEnabled
* Here some filtering is possible */

UA_DataSetReader* dataSetReaderErg = getReaderFromIdentifier(server, pMsg, pConnection);

/* No Reader with the specified id found */
if(!dataSetReaderErg) {
UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "No DataSetReader found with PublisherId");
return UA_STATUSCODE_BADNOTFOUND; /* TODO: Check the return code */
}

Expand All @@ -483,9 +512,8 @@ UA_Server_processNetworkMessage(UA_Server *server, UA_NetworkMessage *pMsg,
UA_Server_DataSetReader_process(server, dataSetReaderErg, &pMsg->payload.dataSetPayload.dataSetMessages[iterator]);
}

/* To Do the condition with dataSetWriterId and WriterGroupId
* else condition for dataSetWriterIdAvailable and writerGroupIdAvailable) */

/* To Do Handle when dataSetReader parameters are null for publisherId
* and zero for WriterGroupId and DataSetWriterId */
return UA_STATUSCODE_GOOD;
}

Expand Down

0 comments on commit 43574aa

Please sign in to comment.