Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding filters should not force initialization of Agent. #2418

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 52 additions & 10 deletions src/Elastic.Apm/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public interface IApmAgent : IApmAgentComponents
internal class ApmAgent : IApmAgent, IDisposable
{
internal readonly CompositeDisposable Disposables = new();

internal ApmAgent(AgentComponents agentComponents) => Components = agentComponents ?? new AgentComponents();

internal ICentralConfigurationFetcher CentralConfigurationFetcher => Components.CentralConfigurationFetcher;
Expand Down Expand Up @@ -72,16 +71,31 @@ public static class Agent
lock (InitializationLock)
{
var agent = new ApmAgent(Components);

agent.Logger?.Trace()
?.Log("Initialization - Agent instance initialized. Callstack: {callstack}", new StackTrace().ToString());

if (agent.Components.PayloadSender is not IPayloadSenderWithFilters sender)
return agent;

ErrorFilters.ForEach(f => sender.AddFilter(f));
TransactionFilters.ForEach(f => sender.AddFilter(f));
SpanFilters.ForEach(f => sender.AddFilter(f));
agent.Logger?.Trace()
?.Log(@"Initialization - Added filters to agent (errors:{{ErrorFilters}}, transactions:{TransactionFilters} spans:{SpanFilters}",
ErrorFilters.Count, TransactionFilters.Count, SpanFilters.Count);

return agent;
}
});

private static readonly object InitializationLock = new object();

private static readonly List<Func<IError, IError>> ErrorFilters = [];

private static readonly List<Func<ISpan, ISpan>> SpanFilters = [];

private static readonly List<Func<ITransaction, ITransaction>> TransactionFilters = [];

internal static AgentComponents Components { get; private set; }

public static IConfigurationReader Config => Instance.Configuration;
Expand Down Expand Up @@ -114,7 +128,16 @@ public static class Agent
/// <code>true</code> if the filter was added successfully, <code>false</code> otherwise. In case the method
/// returns <code>false</code> the filter won't be called.
/// </returns>
public static bool AddFilter(Func<ITransaction, ITransaction> filter) => CheckAndAddFilter(p => p.TransactionFilters.Add(filter));
public static bool AddFilter(Func<ITransaction, ITransaction> filter)
{
if (!IsConfigured)
{
TransactionFilters.Add(filter);
return true;
}

return CheckAndAddFilter(p => p.AddFilter(filter));
}

/// <summary>
/// Adds a filter which gets called before each span gets sent to APM Server.
Expand All @@ -133,7 +156,16 @@ public static class Agent
/// <code>true</code> if the filter was added successfully, <code>false</code> otherwise. In case the method
/// returns <code>false</code> the filter won't be called.
/// </returns>
public static bool AddFilter(Func<ISpan, ISpan> filter) => CheckAndAddFilter(p => p.SpanFilters.Add(filter));
public static bool AddFilter(Func<ISpan, ISpan> filter)
{
if (!IsConfigured)
{
SpanFilters.Add(filter);
return true;
}

return CheckAndAddFilter(p => p.AddFilter(filter));
}

/// <summary>
/// Adds a filter which gets called before each error gets sent to APM Server.
Expand All @@ -152,15 +184,23 @@ public static class Agent
/// <code>true</code> if the filter was added successfully, <code>false</code> otherwise. In case the method
/// returns <code>false</code> the filter won't be called.
/// </returns>
public static bool AddFilter(Func<IError, IError> filter) => CheckAndAddFilter(p => p.ErrorFilters.Add(filter));
public static bool AddFilter(Func<IError, IError> filter)
{
if (!IsConfigured)
{
ErrorFilters.Add(filter);
return true;
}

return CheckAndAddFilter(p => p.AddFilter(filter));
}

private static bool CheckAndAddFilter(Action<PayloadSenderV2> action)
private static bool CheckAndAddFilter(Func<IPayloadSenderWithFilters, bool> action)
{
if (!(Instance.PayloadSender is PayloadSenderV2 payloadSenderV2))
if (Instance.PayloadSender is not IPayloadSenderWithFilters sender)
return false;

action(payloadSenderV2);
return true;
return action(sender);
}

/// <summary>
Expand Down Expand Up @@ -199,10 +239,12 @@ public static void Setup(AgentComponents agentComponents)
return;
}

Components ??= agentComponents;

agentComponents?.Logger?.Trace()
?.Log("Initialization - Agent.Setup called");

Components = agentComponents;

// Force initialization
var _ = LazyApmAgent.Value;
}
Expand Down
9 changes: 9 additions & 0 deletions src/Elastic.Apm/Report/IPayloadSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using Elastic.Apm.Api;

namespace Elastic.Apm.Report
Expand All @@ -16,4 +18,11 @@ public interface IPayloadSender

void QueueTransaction(ITransaction transaction);
}

public interface IPayloadSenderWithFilters
{
bool AddFilter(Func<ITransaction, ITransaction> transactionFilter);
bool AddFilter(Func<ISpan, ISpan> spanFilter);
bool AddFilter(Func<IError, IError> errorFilter);
}
}
38 changes: 34 additions & 4 deletions src/Elastic.Apm/Report/PayloadSenderV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ namespace Elastic.Apm.Report
/// Responsible for sending the data to APM server. Implements Intake V2.
/// Each instance creates its own thread to do the work. Therefore, instances should be reused if possible.
/// </summary>
internal class PayloadSenderV2 : BackendCommComponentBase, IPayloadSender
internal class PayloadSenderV2 : BackendCommComponentBase, IPayloadSender, IPayloadSenderWithFilters
{
private const string ThisClassName = nameof(PayloadSenderV2);
internal readonly List<Func<IError, IError>> ErrorFilters = new List<Func<IError, IError>>();
internal readonly List<Func<ISpan, ISpan>> SpanFilters = new List<Func<ISpan, ISpan>>();
internal readonly List<Func<ITransaction, ITransaction>> TransactionFilters = new List<Func<ITransaction, ITransaction>>();
internal readonly List<Func<IError, IError>> ErrorFilters = new();
internal readonly List<Func<ISpan, ISpan>> SpanFilters = new();
internal readonly List<Func<ITransaction, ITransaction>> TransactionFilters = new();

private readonly IApmServerInfo _apmServerInfo;

Expand Down Expand Up @@ -148,6 +148,7 @@ IApmLogger logger

private bool _getApmServerVersion;
private bool _getCloudMetadata;
private bool _allowFilterAdd;
private static readonly UTF8Encoding Utf8Encoding;
private static readonly MediaTypeHeaderValue MediaTypeHeaderValue;

Expand Down Expand Up @@ -244,6 +245,8 @@ protected override void WorkLoopIteration()
}

var batch = ReceiveBatch();
if (_allowFilterAdd && batch is { Length: > 0 })
_allowFilterAdd = false;
if (batch != null)
ProcessQueueItems(batch);
}
Expand Down Expand Up @@ -497,5 +500,32 @@ private T TryExecuteFilter<T>(List<Func<T, T>> filters, T item) where T : class

return item;
}

public bool AddFilter(Func<ITransaction, ITransaction> transactionFilter)
{
if (!_allowFilterAdd)
return false;

TransactionFilters.Add(transactionFilter);
return true;
}

public bool AddFilter(Func<ISpan, ISpan> spanFilter)
{
if (!_allowFilterAdd)
return false;

SpanFilters.Add(spanFilter);
return true;
}

public bool AddFilter(Func<IError, IError> errorFilter)
{
if (!_allowFilterAdd)
return false;

ErrorFilters.Add(errorFilter);
return true;
}
}
}
37 changes: 28 additions & 9 deletions test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

namespace Elastic.Apm.Tests.Utilities
{
internal class MockPayloadSender : IPayloadSender
internal class MockPayloadSender : IPayloadSender, IPayloadSenderWithFilters
{
private static readonly JObject JsonSpanTypesData =
JObject.Parse(File.ReadAllText(Path.Combine(SolutionPaths.Root, "test/Elastic.Apm.Tests.Utilities/TestResources/json-specs/span_types.json")));
Expand Down Expand Up @@ -211,7 +211,7 @@ public IReadOnlyList<IMetricSet> Metrics
get
{
lock (_metricsLock)
return CreateImmutableSnapshot<IMetricSet>(_metrics);
return CreateImmutableSnapshot(_metrics);
}
}

Expand All @@ -220,7 +220,7 @@ public IReadOnlyList<ISpan> Spans
get
{
lock (_spanLock)
return CreateImmutableSnapshot<ISpan>(_spans);
return CreateImmutableSnapshot(_spans);
}
}

Expand All @@ -229,7 +229,7 @@ public IReadOnlyList<ITransaction> Transactions
get
{
lock (_transactionLock)
return CreateImmutableSnapshot<ITransaction>(_transactions);
return CreateImmutableSnapshot(_transactions);
}
}

Expand All @@ -240,8 +240,8 @@ public void QueueError(IError error)
{
lock (_errorLock)
{
error = _errorFilters.Aggregate(error,
(current, filter) => filter(current));
error = _errorFilters
.Aggregate(error, (current, filter) => filter(current));
_errors.Add(error);
_errorWaitHandle.Set();
}
Expand All @@ -251,8 +251,8 @@ public virtual void QueueTransaction(ITransaction transaction)
{
lock (_transactionLock)
{
transaction = _transactionFilters.Aggregate(transaction,
(current, filter) => filter(current));
transaction = _transactionFilters
.Aggregate(transaction, (current, filter) => filter(current));
_transactions.Add(transaction);
_transactionWaitHandle.Set();
}
Expand All @@ -263,7 +263,8 @@ public void QueueSpan(ISpan span)
VerifySpan(span);
lock (_spanLock)
{
span = _spanFilters.Aggregate(span, (current, filter) => filter(current));
span = _spanFilters
.Aggregate(span, (current, filter) => filter(current));
_spans.Add(span);
_spanWaitHandle.Set();
}
Expand Down Expand Up @@ -305,6 +306,24 @@ private void VerifySpan(ISpan span)
}
}

public bool AddFilter(Func<ITransaction, ITransaction> transactionFilter)
{
_transactionFilters.Add(transactionFilter);
return true;
}

public bool AddFilter(Func<ISpan, ISpan> spanFilter)
{
_spanFilters.Add(spanFilter);
return true;
}

public bool AddFilter(Func<IError, IError> errorFilter)
{
_errorFilters.Add(errorFilter);
return true;
}

public void QueueMetrics(IMetricSet metricSet)
{
lock (_metricsLock)
Expand Down
11 changes: 10 additions & 1 deletion test/Elastic.Apm.Tests.Utilities/NoopPayloadSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using Elastic.Apm.Api;
using Elastic.Apm.Report;

namespace Elastic.Apm.Tests.Utilities
{
public class NoopPayloadSender : IPayloadSender
public class NoopPayloadSender : IPayloadSender, IPayloadSenderWithFilters
{
public void QueueError(IError error) { }

Expand All @@ -16,5 +18,12 @@ public void QueueTransaction(ITransaction transaction) { }
public void QueueSpan(ISpan span) { }

public void QueueMetrics(IMetricSet metricSet) { }

public bool AddFilter(Func<ITransaction, ITransaction> transactionFilter) => true;

public bool AddFilter(Func<ISpan, ISpan> spanFilter) => true;

public bool AddFilter(Func<IError, IError> errorFilter) => true;

}
}
Loading
Loading