Skip to content

Commit

Permalink
Added TestServiceRemotingClientContextPropagation_ShouldInjectActivit…
Browse files Browse the repository at this point in the history
…yContextAndBaggage test
  • Loading branch information
sablancoleis committed Nov 24, 2024
1 parent 2f26254 commit a0d8e56
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;
using System.Text;
using Microsoft.ServiceFabric.Actors.Remoting.V2.Runtime;
using Microsoft.ServiceFabric.Actors.Runtime;
using Microsoft.ServiceFabric.Services.Remoting.V2;
Expand Down Expand Up @@ -51,7 +50,7 @@ public override async Task<IServiceRemotingResponseMessage> HandleRequestRespons
Guard.ThrowIfNull(requestMessageHeader, "requestMessage.GetHeader()");

// Extract the PropagationContext of the upstream parent from the message headers.
PropagationContext parentContext = Propagator.Extract(default, requestMessageHeader, this.ExtractTraceContextFromRequestMessageHeader);
PropagationContext parentContext = Propagator.Extract(default, requestMessageHeader, ServiceFabricRemotingUtils.ExtractTraceContextFromRequestMessageHeader);
Baggage.Current = parentContext.Baggage;

string activityName = requestMessageHeader?.MethodName ?? ServiceFabricRemotingActivitySource.IncomingRequestActivityName;
Expand Down Expand Up @@ -81,16 +80,4 @@ public override async Task<IServiceRemotingResponseMessage> HandleRequestRespons
}
}
}

private IEnumerable<string> ExtractTraceContextFromRequestMessageHeader(IServiceRemotingRequestMessageHeader requestMessageHeader, string headerKey)
{
if (requestMessageHeader.TryGetHeaderValue(headerKey, out byte[] headerValueAsBytes))
{
string headerValue = Encoding.UTF8.GetString(headerValueAsBytes);

return [headerValue];
}

return Enumerable.Empty<string>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Runtime.CompilerServices;

[assembly: CLSCompliant(false)]
#if SIGNED
[assembly: InternalsVisibleTo("OpenTelemetry.Instrumentation.ServiceFabricRemoting.Tests, PublicKey=002400000480000094000000060200000024000052534131000400000100010051c1562a090fb0c9f391012a32198b5e5d9a60e9b80fa2d7b434c9e5ccb7259bd606e66f9660676afc6692b8cdc6793d190904551d2103b7b22fa636dcbb8208839785ba402ea08fc00c8f1500ccef28bbf599aa64ffb1e1d5dc1bf3420a3777badfe697856e9d52070a50c3ea5821c80bef17ca3acffa28f89dd413f096f898")]
#else
[assembly: InternalsVisibleTo("OpenTelemetry.Instrumentation.ServiceFabricRemoting.Tests")]
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Text;
using Microsoft.ServiceFabric.Services.Remoting.V2;

namespace OpenTelemetry.Instrumentation.ServiceFabricRemoting;

internal static class ServiceFabricRemotingUtils
{
internal static void InjectTraceContextIntoServiceRemotingRequestMessageHeader(IServiceRemotingRequestMessageHeader requestMessageHeader, string key, string value)
{
if (!requestMessageHeader.TryGetHeaderValue(key, out byte[] _))
{
byte[] valueAsBytes = Encoding.UTF8.GetBytes(value);

requestMessageHeader.AddHeader(key, valueAsBytes);
}
}

internal static IEnumerable<string> ExtractTraceContextFromRequestMessageHeader(IServiceRemotingRequestMessageHeader requestMessageHeader, string headerKey)
{
if (requestMessageHeader.TryGetHeaderValue(headerKey, out byte[] headerValueAsBytes))
{
string headerValue = Encoding.UTF8.GetString(headerValueAsBytes);

return [headerValue];
}

return Enumerable.Empty<string>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System.Diagnostics;
using System.Fabric;
using System.Text;
using Microsoft.ServiceFabric.Services.Remoting;
using Microsoft.ServiceFabric.Services.Remoting.V2;
using Microsoft.ServiceFabric.Services.Remoting.V2.Runtime;
Expand Down Expand Up @@ -51,7 +50,7 @@ public override async Task<IServiceRemotingResponseMessage> HandleRequestRespons
Guard.ThrowIfNull(requestMessageHeader, "requestMessage.GetHeader()");

// Extract the PropagationContext of the upstream parent from the message headers.
PropagationContext parentContext = Propagator.Extract(default, requestMessageHeader, this.ExtractTraceContextFromRequestMessageHeader);
PropagationContext parentContext = Propagator.Extract(default, requestMessageHeader, ServiceFabricRemotingUtils.ExtractTraceContextFromRequestMessageHeader);
Baggage.Current = parentContext.Baggage;

string activityName = requestMessageHeader?.MethodName ?? ServiceFabricRemotingActivitySource.IncomingRequestActivityName;
Expand Down Expand Up @@ -81,16 +80,4 @@ public override async Task<IServiceRemotingResponseMessage> HandleRequestRespons
}
}
}

private IEnumerable<string> ExtractTraceContextFromRequestMessageHeader(IServiceRemotingRequestMessageHeader requestMessageHeader, string headerKey)
{
if (requestMessageHeader.TryGetHeaderValue(headerKey, out byte[] headerValueAsBytes))
{
string headerValue = Encoding.UTF8.GetString(headerValueAsBytes);

return [headerValue];
}

return Enumerable.Empty<string>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public async Task<IServiceRemotingResponseMessage> RequestResponseAsync(IService
try
{
// Inject the ActivityContext into the message headers to propagate trace context and Baggage to the receiving service.
Propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), requestMessageHeader, this.InjectTraceContextIntoServiceRemotingRequestMessageHeader);
Propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), requestMessageHeader, ServiceFabricRemotingUtils.InjectTraceContextIntoServiceRemotingRequestMessageHeader);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -138,14 +138,4 @@ public void SendOneWay(IServiceRemotingRequestMessage requestMessage)
{
this.InnerClient.SendOneWay(requestMessage);
}

private void InjectTraceContextIntoServiceRemotingRequestMessageHeader(IServiceRemotingRequestMessageHeader requestMessageHeader, string key, string value)
{
if (!requestMessageHeader.TryGetHeaderValue(key, out byte[] _))
{
byte[] valueAsBytes = Encoding.UTF8.GetBytes(value);

requestMessageHeader.AddHeader(key, valueAsBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

using Microsoft.ServiceFabric.Services.Remoting.V2.Runtime;

namespace OpenTelemetry.Instrumentation.ServiceFabricRemoting.Tests.Mocks;
namespace OpenTelemetry.Instrumentation.ServiceFabricRemoting.Tests;

internal class FabricTransportServiceRemotingRequestContextMock : IServiceRemotingRequestContext
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Fabric;
using System.Text;
using Microsoft.ServiceFabric.Services.Remoting.V2;
using Microsoft.ServiceFabric.Services.Remoting.V2.Client;
using OpenTelemetry.Context.Propagation;
using ServiceFabric.Mocks.RemotingV2;

namespace OpenTelemetry.Instrumentation.ServiceFabricRemoting.Tests;

internal class ServiceRemotingClientMock : IServiceRemotingClient
{
public ServiceRemotingClientMock()
{
}

public ResolvedServicePartition ResolvedServicePartition { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }

public string ListenerName { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }

public ResolvedServiceEndpoint Endpoint { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }

/// <summary>
/// The RequestResponseAsync method reads the headers from the request and injects them into the response, using OpneTelemetry's TextMapPropagator.
/// </summary>
public Task<IServiceRemotingResponseMessage> RequestResponseAsync(IServiceRemotingRequestMessage requestMessage)
{
IServiceRemotingRequestMessageHeader requestMessageHeader = requestMessage.GetHeader();
PropagationContext propagationContext = Propagators.DefaultTextMapPropagator.Extract(default, requestMessageHeader, ServiceFabricRemotingUtils.ExtractTraceContextFromRequestMessageHeader);

MockServiceRemotingResponseMessage responseMessage = new MockServiceRemotingResponseMessage()
{
Header = new ServiceRemotingResponseMessageHeaderMock(),
};

Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(propagationContext.ActivityContext, propagationContext.Baggage), responseMessage.Header, this.InjectTraceContextIntoServiceRemotingRequestMessageHeader);

return Task.FromResult<IServiceRemotingResponseMessage>(responseMessage);
}

public void SendOneWay(IServiceRemotingRequestMessage requestMessage) => throw new NotImplementedException();

private void InjectTraceContextIntoServiceRemotingRequestMessageHeader(IServiceRemotingResponseMessageHeader responseMessageHeaders, string key, string value)
{
if (!responseMessageHeaders.TryGetHeaderValue(key, out byte[] _))
{
byte[] valueAsBytes = Encoding.UTF8.GetBytes(value);

responseMessageHeaders.AddHeader(key, valueAsBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using System.Runtime.Serialization;
using Microsoft.ServiceFabric.Services.Remoting.V2;

namespace OpenTelemetry.Instrumentation.ServiceFabricRemoting.Tests.Mocks;
namespace OpenTelemetry.Instrumentation.ServiceFabricRemoting.Tests;

internal class ServiceRemotingRequestMessageHeaderMock : IServiceRemotingRequestMessageHeader
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Fabric;
using System.Globalization;
using Microsoft.ServiceFabric.Services.Remoting.V2;

namespace OpenTelemetry.Instrumentation.ServiceFabricRemoting.Tests;

internal class ServiceRemotingResponseMessageHeaderMock : IServiceRemotingResponseMessageHeader
{
private Dictionary<string, byte[]> headers;

public ServiceRemotingResponseMessageHeaderMock()
{
this.headers = new Dictionary<string, byte[]>();
}

public void AddHeader(string headerName, byte[] headerValue)
{
if (this.headers.ContainsKey(headerName))
{
throw new FabricElementAlreadyExistsException(string.Format((IFormatProvider)(object)CultureInfo.CurrentCulture, "ErrorHeaderAlreadyExists"));
}

this.headers[headerName] = headerValue;
}

public bool CheckIfItsEmpty()
{
if (this.headers == null || this.headers.Count == 0)
{
return true;
}

return false;
}

public bool TryGetHeaderValue(string headerName, out byte[]? headerValue)
{
headerValue = null;
if (this.headers == null)
{
return false;
}

return this.headers.TryGetValue(headerName, out headerValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,4 @@ protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListe

return [serviceReplicaListener];
}

//protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
//{
// //Func<IServiceRemotingListener> getListenerFunc = () =>
// //{
// // FabricTransportRemotingListenerSettings listenerSettings = new FabricTransportRemotingListenerSettings();

// // return new FabricTransportActorServiceRemotingListener(this, this.dispatcher, listenerSettings);
// //};
// Func<ServiceContext, IService, IServiceRemotingListener> getListenerFunc = (ServiceContext serviceContext, IService serviceImplementation) =>
// {
// FabricTransportRemotingListenerSettings listenerSettings = new FabricTransportRemotingListenerSettings();

// return new FabricTransportServiceRemotingListener(serviceContext, this.dispatcher, listenerSettings);
// };

// ServiceReplicaListener serviceReplicaListener = new ServiceReplicaListener((StatefulServiceContext t) => getListenerFunc(this.ServiceContext, this), "V2Listener");

// //ServiceReplicaListener serviceReplicaListener = new ServiceReplicaListener((StatefulServiceContext t) => getListenerFunc(), "V2Listener");
// return [serviceReplicaListener];
//}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright The OpenTelemetry Authors
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;
Expand All @@ -15,7 +15,7 @@ namespace OpenTelemetry.Instrumentation.ServiceFabricRemoting.Tests;

public class MyTestStatefulService : StatefulService, ITestMyStatefulService
{
private TraceContextEnrichedServiceV2RemotingDispatcher dispatcher;
private TraceContextEnrichedServiceV2RemotingDispatcher? dispatcher;

public MyTestStatefulService(StatefulServiceContext serviceContext, IReliableStateManagerReplica reliableStateManagerReplica)
: base(serviceContext, reliableStateManagerReplica)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using Microsoft.ServiceFabric.Actors.Runtime;
using Microsoft.ServiceFabric.Services.Remoting.V2;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Instrumentation.ServiceFabricRemoting.Tests.Mocks;
using OpenTelemetry.Metrics;
using OpenTelemetry.Trace;
using ServiceFabric.Mocks;
Expand All @@ -23,6 +22,7 @@ public class ServiceFabricRemotingTests
private const string ValueToSend = "SomeValue";
private const string BaggageKey = "SomeBaggageKey";
private const string BaggageValue = "SomeBaggageValue";
private static readonly ActivitySource ActivitySource = new ActivitySource("ServiceFabricRemotingTests");

[Fact]
public async Task TestStatefulServiceContextPropagation_ShouldExtractActivityContextAndBaggage()
Expand All @@ -48,7 +48,7 @@ public async Task TestStatefulServiceContextPropagation_ShouldExtractActivityCon

IServiceRemotingRequestMessageHeader remotingRequestMessageHeader = this.CreateServiceRemotingRequestMessageHeader(typeof(ITestMyStatefulService), nameof(ITestMyStatefulService.TestContextPropagation));

propagator.Inject(new PropagationContext(activityContext, baggage), remotingRequestMessageHeader, this.InjectTraceContextIntoServiceRemotingRequestMessageHeader);
propagator.Inject(new PropagationContext(activityContext, baggage), remotingRequestMessageHeader, ServiceFabricRemotingUtils.InjectTraceContextIntoServiceRemotingRequestMessageHeader);

MockServiceRemotingRequestMessageBody messageBody = new MockServiceRemotingRequestMessageBody();
messageBody.SetParameter(0, "valueToReturn", ValueToSend);
Expand Down Expand Up @@ -88,7 +88,7 @@ public async Task TestActorContextPropagation_ShouldExtractActivityContextAndBag

IServiceRemotingRequestMessageHeader actorRemotingMessageHeaders = this.CreateServiceRemotingRequestMessageHeader(typeof(IMyTestActorService), nameof(IMyTestActorService.TestContextPropagation));

propagator.Inject(new PropagationContext(activityContext, baggage), actorRemotingMessageHeaders, this.InjectTraceContextIntoServiceRemotingRequestMessageHeader);
propagator.Inject(new PropagationContext(activityContext, baggage), actorRemotingMessageHeaders, ServiceFabricRemotingUtils.InjectTraceContextIntoServiceRemotingRequestMessageHeader);

MockServiceRemotingRequestMessageBody messageBody = new MockServiceRemotingRequestMessageBody();
messageBody.SetParameter(0, "valueToReturn", ValueToSend);
Expand All @@ -107,6 +107,41 @@ public async Task TestActorContextPropagation_ShouldExtractActivityContextAndBag
Assert.Equal(BaggageValue, serviceResponse.Baggage.GetBaggage(BaggageKey));
}

[Fact]
public async Task TestServiceRemotingClientContextPropagation_ShouldInjectActivityContextAndBaggage()
{
// Arrange
using TracerProvider provider = Sdk.CreateTracerProviderBuilder()
.AddServiceFabricRemotingInstrumentation()
.AddSource(ActivitySource.Name)
.Build();

// The Baggage set here will be used automatically by TraceContextEnrichedServiceRemotingClientAdapter to inject the baggage into the request message.
Baggage.SetBaggage(BaggageKey, BaggageValue);

// The activity is created here will be used automatically by TraceContextEnrichedServiceRemotingClientAdapter to inject the context into the request message.
using (Activity activity = ActivitySource.StartActivity("TestActivity")!)
{
ServiceRemotingRequestMessageHeaderMock header = new ServiceRemotingRequestMessageHeaderMock();
MockServiceRemotingRequestMessageBody messageBody = new MockServiceRemotingRequestMessageBody();
ServiceRemotingRequestMessageMock requestMessage = new(header, messageBody);

// The ServiceRemotingClientMock reads the headers from the request and injects them into the response, using OpneTelemetry's TextMapPropagator.
ServiceRemotingClientMock innerClient = new ServiceRemotingClientMock();
TraceContextEnrichedServiceRemotingClientAdapter serviceRemotingClientAdapter = new TraceContextEnrichedServiceRemotingClientAdapter(innerClient);

// Act
IServiceRemotingResponseMessage response = await serviceRemotingClientAdapter.RequestResponseAsync(requestMessage);

// Assert
IServiceRemotingResponseMessageHeader responseMessageHeaders = response.GetHeader();
PropagationContext propagationContext = Propagators.DefaultTextMapPropagator.Extract(default, responseMessageHeaders, this.ExtractTraceContextFromRequestMessageHeader);

Assert.Equal(activity.TraceId, propagationContext.ActivityContext.TraceId);
Assert.Equal(BaggageValue, propagationContext.Baggage.GetBaggage(BaggageKey));
}
}

private ServiceRemotingRequestMessageHeaderMock CreateServiceRemotingRequestMessageHeader(Type interfaceType, string methodName)
{
int interfaceId = ServiceFabricUtils.GetInterfaceId(interfaceType);
Expand All @@ -123,13 +158,15 @@ private ServiceRemotingRequestMessageHeaderMock CreateServiceRemotingRequestMess
return serviceRemotingRequestMessageHeader;
}

private void InjectTraceContextIntoServiceRemotingRequestMessageHeader(IServiceRemotingRequestMessageHeader requestMessageHeader, string key, string value)
private IEnumerable<string> ExtractTraceContextFromRequestMessageHeader(IServiceRemotingResponseMessageHeader responseMessageHeaders, string headerKey)
{
if (!requestMessageHeader.TryGetHeaderValue(key, out byte[] _))
if (responseMessageHeaders.TryGetHeaderValue(headerKey, out byte[] headerValueAsBytes))
{
byte[] valueAsBytes = Encoding.UTF8.GetBytes(value);
string headerValue = Encoding.UTF8.GetString(headerValueAsBytes);

requestMessageHeader.AddHeader(key, valueAsBytes);
return [headerValue];
}

return Enumerable.Empty<string>();
}
}

0 comments on commit a0d8e56

Please sign in to comment.