Skip to content

Commit

Permalink
Fix a deadlock in binary channel on cleanup, reduce contention due to…
Browse files Browse the repository at this point in the history
… locks on high server load (OPCFoundation#2714)

Server: Fix a deadlock in the binary channel cleanup (regression from secure channel cleanup)
Server: Use ConcurrentDictionary for sessions and channels to reduce locking overhead
Server: Replace legacy implementation of NodeIdDictionary with standard dictionary, as with IEqualityComparer the look up is much faster than specialized SortedDictionary for each NodeId Type.

Co-authored-by: Luis Cantero <[email protected]>
  • Loading branch information
mregen and luiscantero authored Aug 24, 2024
1 parent bc9af8e commit 6dcee7e
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 140 deletions.
40 changes: 10 additions & 30 deletions Libraries/Opc.Ua.Server/Diagnostics/CustomNodeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ protected CustomNodeManager2(
}
}

// create the table of monitored items.
// these are items created by clients when they subscribe to data or events.
m_monitoredItems = new Dictionary<uint, IDataChangeMonitoredItem>();

// create the table of monitored nodes.
// these are created by the node manager whenever a client subscribe to an attribute of the node.
m_monitoredNodes = new Dictionary<NodeId, MonitoredNode2>();
Expand Down Expand Up @@ -237,14 +233,6 @@ protected List<NodeState> RootNotifiers
get { return m_rootNotifiers; }
}

/// <summary>
/// Gets the table of monitored items.
/// </summary>
protected Dictionary<uint, IDataChangeMonitoredItem> MonitoredItems
{
get { return m_monitoredItems; }
}

/// <summary>
/// Gets the table of nodes being monitored.
/// </summary>
Expand Down Expand Up @@ -929,7 +917,7 @@ public virtual void DeleteAddressSpace()
/// </remarks>
public virtual object GetManagerHandle(NodeId nodeId)
{
lock (Lock)
lock (m_lock)
{
return GetManagerHandle(m_systemContext, nodeId, null);
}
Expand All @@ -945,20 +933,17 @@ protected virtual NodeHandle GetManagerHandle(ServerSystemContext context, NodeI
return null;
}

if (m_predefinedNodes != null)
{
NodeState node = null;
NodeState node = null;

if (m_predefinedNodes.TryGetValue(nodeId, out node))
{
NodeHandle handle = new NodeHandle();
if (m_predefinedNodes?.TryGetValue(nodeId, out node) == true)
{
NodeHandle handle = new NodeHandle();

handle.NodeId = nodeId;
handle.Node = node;
handle.Validated = true;
handle.NodeId = nodeId;
handle.Node = node;
handle.Validated = true;

return handle;
}
return handle;
}

return null;
Expand Down Expand Up @@ -3706,7 +3691,6 @@ protected virtual ServiceResult CreateMonitoredItem(
monitoredItem = datachangeItem;

// save the monitored item.
m_monitoredItems.Add(monitoredItemId, datachangeItem);
monitoredNode.Add(datachangeItem);

// report change.
Expand Down Expand Up @@ -3809,7 +3793,7 @@ public ServiceResult ValidateEventRolePermissions(IEventMonitoredItem monitoredI
eventTypeId = baseEventState.EventType?.Value;
sourceNodeId = baseEventState.SourceNode?.Value;
}

OperationContext operationContext = new OperationContext(monitoredItem);

// validate the event type id permissions as specified
Expand Down Expand Up @@ -4259,9 +4243,6 @@ protected virtual ServiceResult DeleteMonitoredItem(
}
}

// remove the monitored item.
m_monitoredItems.Remove(monitoredItem.Id);

// report change.
OnMonitoredItemDeleted(context, handle, datachangeItem);

Expand Down Expand Up @@ -4777,7 +4758,6 @@ protected NodeState AddNodeToComponentCache(ISystemContext context, NodeHandle h
private ServerSystemContext m_systemContext;
private string[] m_namespaceUris;
private ushort[] m_namespaceIndexes;
private Dictionary<uint, IDataChangeMonitoredItem> m_monitoredItems;
private Dictionary<NodeId, MonitoredNode2> m_monitoredNodes;
private Dictionary<NodeId, CacheEntry> m_componentCache;
private NodeIdDictionary<NodeState> m_predefinedNodes;
Expand Down
151 changes: 72 additions & 79 deletions Libraries/Opc.Ua.Server/Session/SessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
using System.Security.Cryptography.X509Certificates;
using System.Globalization;
using System.Threading.Tasks;
using System.Collections.Concurrent;

namespace Opc.Ua.Server
{
Expand Down Expand Up @@ -62,7 +63,7 @@ public SessionManager(
m_maxHistoryContinuationPoints = configuration.ServerConfiguration.MaxHistoryContinuationPoints;
m_minNonceLength = configuration.SecurityConfiguration.NonceLength;

m_sessions = new Dictionary<NodeId, Session>();
m_sessions = new ConcurrentDictionary<NodeId, Session>(Environment.ProcessorCount, m_maxSessionCount);
m_lastSessionId = BitConverter.ToInt64(Utils.Nonce.CreateNonce(sizeof(long)), 0);

// create a event to signal shutdown.
Expand All @@ -87,17 +88,13 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
List<Session> sessions = null;

lock (m_lock)
{
sessions = new List<Session>(m_sessions.Values);
m_sessions.Clear();
}
// create snapshot of all sessions
var sessions = m_sessions.ToArray();
m_sessions.Clear();

foreach (Session session in sessions)
foreach (var sessionKeyValue in sessions)
{
Utils.SilentDispose(session);
Utils.SilentDispose(sessionKeyValue.Value);
}

m_shutdownEvent.Set();
Expand Down Expand Up @@ -127,18 +124,16 @@ public virtual void Startup()
/// </summary>
public virtual void Shutdown()
{
lock (m_lock)
{
// stop the monitoring thread.
m_shutdownEvent.Set();
// stop the monitoring thread.
m_shutdownEvent.Set();

// dispose of session objects.
foreach (Session session in m_sessions.Values)
{
session.Dispose();
}
// dispose of session objects using a snapshot.
var sessions = m_sessions.ToArray();
m_sessions.Clear();

m_sessions.Clear();
foreach (var sessionKeyValue in sessions)
{
Utils.SilentDispose(sessionKeyValue.Value);
}
}

Expand Down Expand Up @@ -176,9 +171,11 @@ public virtual Session CreateSession(
// check for same Nonce in another session
if (clientNonce != null)
{
foreach (Session sessionIterator in m_sessions.Values)
// iterate over key/value pairs in the dictionary with a thread safe iterator
foreach (var sessionKeyValueIterator in m_sessions)
{
if (Utils.CompareNonce(sessionIterator.ClientNonce, clientNonce))
byte[] sessionClientNonce = sessionKeyValueIterator.Value?.ClientNonce;
if (Utils.CompareNonce(sessionClientNonce, clientNonce))
{
throw new ServiceResultException(StatusCodes.BadNonceInvalid);
}
Expand All @@ -191,7 +188,7 @@ public virtual Session CreateSession(
{
if (context.ChannelContext.EndpointDescription.SecurityMode != MessageSecurityMode.None)
{
authenticationToken = Utils.IncrementIdentifier(ref m_lastSessionId);
authenticationToken = new NodeId(Utils.IncrementIdentifier(ref m_lastSessionId));
}
}

Expand Down Expand Up @@ -243,7 +240,10 @@ public virtual Session CreateSession(
sessionId = session.Id;

// save session.
m_sessions.Add(authenticationToken, session);
if (!m_sessions.TryAdd(authenticationToken, session))
{
throw new ServiceResultException(StatusCodes.BadTooManySessions);
}
}

// raise session related event.
Expand Down Expand Up @@ -272,6 +272,12 @@ public virtual bool ActivateSession(
UserIdentityToken newIdentity = null;
UserTokenPolicy userTokenPolicy = null;

// fast path no lock
if (!m_sessions.TryGetValue(authenticationToken, out _))
{
throw new ServiceResultException(StatusCodes.BadSessionIdInvalid);
}

lock (m_lock)
{
// find session.
Expand Down Expand Up @@ -306,7 +312,6 @@ public virtual bool ActivateSession(
out newIdentity,
out userTokenPolicy);
}

IUserIdentity identity = null;
IUserIdentity effectiveIdentity = null;
ServiceResult error = null;
Expand Down Expand Up @@ -393,22 +398,23 @@ public virtual bool ActivateSession(
/// </remarks>
public virtual void CloseSession(NodeId sessionId)
{
// find the session.
Session session = null;

lock (m_lock)
// thread safe search for the session.
foreach (KeyValuePair<NodeId, Session> current in m_sessions)
{
foreach (KeyValuePair<NodeId, Session> current in m_sessions)
if (current.Value.Id == sessionId)
{
if (current.Value.Id == sessionId)
if (!m_sessions.TryRemove(current.Key, out session))
{
session = current.Value;
m_sessions.Remove(current.Key);
break;
// found but was already removed
return;
}
break;
}
}

// close the session if removed.
if (session != null)
{
// raise session related event.
Expand All @@ -423,7 +429,6 @@ public virtual void CloseSession(NodeId sessionId)
m_server.ServerDiagnostics.CurrentSessionCount--;
}
}

}

/// <summary>
Expand All @@ -442,44 +447,41 @@ public virtual OperationContext ValidateRequest(RequestHeader requestHeader, Req

try
{
lock (m_lock)
// check for create session request.
if (requestType == RequestType.CreateSession || requestType == RequestType.ActivateSession)
{
// check for create session request.
if (requestType == RequestType.CreateSession || requestType == RequestType.ActivateSession)
{
return new OperationContext(requestHeader, requestType);
}
return new OperationContext(requestHeader, requestType);
}

// find session.
if (!m_sessions.TryGetValue(requestHeader.AuthenticationToken, out session))
{
EventHandler<ValidateSessionLessRequestEventArgs> handler = m_validateSessionLessRequest;

// find session.
if (!m_sessions.TryGetValue(requestHeader.AuthenticationToken, out session))
if (handler != null)
{
EventHandler<ValidateSessionLessRequestEventArgs> handler = m_validateSessionLessRequest;
var args = new ValidateSessionLessRequestEventArgs(requestHeader.AuthenticationToken, requestType);
handler(this, args);

if (handler != null)
if (ServiceResult.IsBad(args.Error))
{
var args = new ValidateSessionLessRequestEventArgs(requestHeader.AuthenticationToken, requestType);
handler(this, args);

if (ServiceResult.IsBad(args.Error))
{
throw new ServiceResultException(args.Error);
}

return new OperationContext(requestHeader, requestType, args.Identity);
throw new ServiceResultException(args.Error);
}

throw new ServiceResultException(StatusCodes.BadSessionIdInvalid);
return new OperationContext(requestHeader, requestType, args.Identity);
}

// validate request header.
session.ValidateRequest(requestHeader, requestType);
throw new ServiceResultException(StatusCodes.BadSessionIdInvalid);
}

// validate request header.
session.ValidateRequest(requestHeader, requestType);

// validate user has permissions for additional info
session.ValidateDiagnosticInfo(requestHeader);
// validate user has permissions for additional info
session.ValidateDiagnosticInfo(requestHeader);

// return context.
return new OperationContext(requestHeader, requestType, session);
}
// return context.
return new OperationContext(requestHeader, requestType, session);
}
catch (Exception e)
{
Expand Down Expand Up @@ -584,17 +586,10 @@ private void MonitorSessions(object data)

do
{
Session[] sessions = null;

lock (m_lock)
{
sessions = new Session[m_sessions.Count];
m_sessions.Values.CopyTo(sessions, 0);
}

for (int ii = 0; ii < sessions.Length; ii++)
// enumerator is thread safe
foreach (var sessionKeyValue in m_sessions)
{
var session = sessions[ii];
Session session = sessionKeyValue.Value;
if (session.HasExpired)
{
// update diagnostics.
Expand All @@ -604,9 +599,9 @@ private void MonitorSessions(object data)
}

// raise audit event for session closed because of timeout
m_server.ReportAuditCloseSessionEvent(null, sessions[ii], "Session/Timeout");
m_server.ReportAuditCloseSessionEvent(null, session, "Session/Timeout");

m_server.CloseSession(null, sessions[ii].Id, false);
m_server.CloseSession(null, session.Id, false);
}
// if a session had no activity for the last m_minSessionTimeout milliseconds, send a keep alive event.
else if (session.ClientLastContactTime.AddMilliseconds(m_minSessionTimeout) < DateTime.UtcNow)
Expand Down Expand Up @@ -634,7 +629,7 @@ private void MonitorSessions(object data)
#region Private Fields
private readonly object m_lock = new object();
private IServerInternal m_server;
private Dictionary<NodeId, Session> m_sessions;
private ConcurrentDictionary<NodeId, Session> m_sessions;
private long m_lastSessionId;
private ManualResetEvent m_shutdownEvent;

Expand Down Expand Up @@ -790,14 +785,12 @@ public IList<Session> GetSessions()
/// <inheritdoc/>
public Session GetSession(NodeId authenticationToken)
{

Session session = null;
lock (m_lock)
// find session.
if (m_sessions.TryGetValue(authenticationToken, out Session session))
{
// find session.
m_sessions.TryGetValue(authenticationToken, out session);
return session;
}
return session;
return null;
}
#endregion
}
Expand Down
Loading

0 comments on commit 6dcee7e

Please sign in to comment.