Skip to content

Commit

Permalink
Add SNS/SQS trace propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
marcotc committed Sep 19, 2024
1 parent e32e038 commit 8f0bc5d
Show file tree
Hide file tree
Showing 21 changed files with 650 additions and 16 deletions.
4 changes: 3 additions & 1 deletion docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ require 'aws-sdk'
require 'datadog'

Datadog.configure do |c|
c.tracing.instrument :aws, **options
c.tracing.instrument :aws, propagation: true, **options
end

# Perform traced call
Expand All @@ -562,6 +562,8 @@ Aws::S3::Client.new.list_buckets
| `enabled` | `DD_TRACE_AWS_ENABLED` | `Bool` | Whether the integration should create spans. | `true` |
| `service_name` | `DD_TRACE_AWS_SERVICE_NAME` | `String` | Name of application running the `aws` instrumentation. May be overridden by `global_default_service_name`. [See _Additional Configuration_ for more details](#additional-configuration) | `aws` |
| `peer_service` | `DD_TRACE_AWS_PEER_SERVICE` | `String` | Name of external service the application connects to | `nil` |
| `propagation` | `DD_TRACE_AWS_PROPAGATION_ENABLED` | `Bool` | Enables distributed trace propagation for SNS and SQS messages. | `false` |
| `parentage_style` | `DD_TRACE_AWS_TRACE_PARENTAGE_STYLE` | `String` | Controls whether the local trace is parented to the SQS message consumed. Possible values are: `local`, `distributed`. This option is always disable (the equivalent to `local`) if `propagation` is disabled. | `propagation` |

### Concurrent Ruby

Expand Down
1 change: 1 addition & 0 deletions lib/datadog/tracing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative 'core'
require_relative 'tracing/pipeline'
require_relative 'tracing/distributed'

module Datadog
# Datadog APM tracing public API.
Expand Down
23 changes: 23 additions & 0 deletions lib/datadog/tracing/contrib/aws/configuration/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ class Settings < Contrib::Configuration::Settings
o.type :string, nilable: true
o.env Ext::ENV_PEER_SERVICE
end

# Enables distributed trace propagation for SNS and SQS messages.
# @default `DD_TRACE_AWS_PROPAGATION_ENABLED` environment variable, otherwise `false`
# @return [Boolean]
option :propagation do |o|
o.type :bool
o.env Ext::ENV_PROPAGATION_ENABLED
o.default false
end

# Controls whether the local trace is parented to the SQS message consumed.
# Possible values are:
# `local`: The local active trace is used; SNS has no effect on trace parentage.
# `distributed`: The local active trace becomes a child of the propagation context from the SQS message.
#
# This option is always disable (the equivalent to`local`) if `propagation` is disabled.
# @default `DD_TRACE_AWS_TRACE_PARENTAGE_STYLE` environment variable, otherwise `local`
# @return [String]
option :parentage_style do |o|
o.type :string
o.env Ext::ENV_TRACE_PARENTAGE_STYLE
o.default 'distributed'
end
end
end
end
Expand Down
3 changes: 3 additions & 0 deletions lib/datadog/tracing/contrib/aws/ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ module Ext
# @!visibility private
ENV_ANALYTICS_ENABLED = 'DD_TRACE_AWS_ANALYTICS_ENABLED'
ENV_ANALYTICS_SAMPLE_RATE = 'DD_TRACE_AWS_ANALYTICS_SAMPLE_RATE'
ENV_PROPAGATION_ENABLED = 'DD_TRACE_AWS_PROPAGATION_ENABLED'
ENV_TRACE_PARENTAGE_STYLE = 'DD_TRACE_AWS_TRACE_PARENTAGE_STYLE'

DEFAULT_PEER_SERVICE_NAME = 'aws'
SPAN_COMMAND = 'aws.command'
TAG_AGENT = 'aws.agent'
Expand Down
48 changes: 35 additions & 13 deletions lib/datadog/tracing/contrib/aws/instrumentation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,56 @@ def add_handlers(handlers, _)

# Generates Spans for all interactions with AWS
class Handler < Seahorse::Client::Handler
# Some services contain trace propagation information (e.g. SQS) that affect what active trace
# we'll use for the AWS span.
# But because this information is only available after the request is made, we need to make the AWS
# request first, then create the trace and span with correct distributed trace parenting.
def call(context)
Tracing.trace(Ext::SPAN_COMMAND) do |span|
@handler.call(context).tap do
annotate!(span, ParsedContext.new(context))
end
config = configuration

# Find the AWS service instrumentation
parsed_context = ParsedContext.new(context)
aws_service = parsed_context.safely(:resource).split('.')[0]
handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service]

# Execute handler stack, to ensure we have the response object before the trace and span are created
start_time = Core::Utils::Time.now.utc # Save the start time as the span creation is delayed
begin
response = @handler.call(context)
rescue Exception => e # rubocop:disable Lint/RescueException
# Catch exception to reraise it inside the trace block, to ensure the span has correct error information
# This matches the behavior of {Datadog::Tracing::SpanOperation#measure}
end

Tracing.trace(Ext::SPAN_COMMAND, start_time: start_time) do |span, trace|
handler.before_span(config, context, response) if handler

annotate!(config, span, trace, parsed_context, aws_service)

raise e if e
end

response
end

private

# rubocop:disable Metrics/AbcSize
def annotate!(span, context)
span.service = configuration[:service_name]
def annotate!(config, span, trace, context, aws_service)
span.service = config[:service_name]
span.type = Tracing::Metadata::Ext::HTTP::TYPE_OUTBOUND
span.name = Ext::SPAN_COMMAND
span.resource = context.safely(:resource)
aws_service = span.resource.split('.')[0]
span.set_tag(Ext::TAG_AWS_SERVICE, aws_service)
params = context.safely(:params)
if (handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service])
handler.process(config, trace, context)
handler.add_tags(span, params)
end

if configuration[:peer_service]
if config[:peer_service]
span.set_tag(
Tracing::Metadata::Ext::TAG_PEER_SERVICE,
configuration[:peer_service]
config[:peer_service]
)
end

Expand All @@ -61,8 +84,8 @@ def annotate!(span, context)
span.set_tag(Tracing::Metadata::Ext::TAG_PEER_HOSTNAME, context.safely(:host))

# Set analytics sample rate
if Contrib::Analytics.enabled?(configuration[:analytics_enabled])
Contrib::Analytics.set_sample_rate(span, configuration[:analytics_sample_rate])
if Contrib::Analytics.enabled?(config[:analytics_enabled])
Contrib::Analytics.set_sample_rate(span, config[:analytics_sample_rate])
end
Contrib::Analytics.set_measured(span)

Expand All @@ -77,7 +100,6 @@ def annotate!(span, context)

Contrib::SpanAttributeSchema.set_peer_service!(span, Ext::PEER_SERVICE_SOURCES)
end
# rubocop:enable Metrics/AbcSize

def configuration
Datadog.configuration.tracing[:aws]
Expand Down
31 changes: 31 additions & 0 deletions lib/datadog/tracing/contrib/aws/service/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,38 @@ module Aws
module Service
# Base class for all AWS service-specific tag handlers.
class Base
def before_span(config, context, response); end
def process(config, trace, context); end
def add_tags(span, params); end

MESSAGE_ATTRIBUTES_LIMIT = 10 # Can't set more than 10 message attributes

# Extract the `_datadog` message attribute and decode its JSON content.
def extract_propagation!(response, data_type)
messages = response.data.messages

# DEV: Extract the context from the first message today.
# DEV: Use span links in the future to support multiple messages related to a single span.
return unless (message = messages[0])

message_attributes = message.message_attributes

return unless message_attributes && (datadog = message_attributes['_datadog'])

if (data = datadog[data_type]) && (parsed_data = JSON.parse(data))
Tracing.continue_trace!(Distributed.extract(parsed_data))
end
end

def inject_propagation(trace, params, data_type)
message_attributes = (params[:message_attributes] ||= {})
return if message_attributes.size >= MESSAGE_ATTRIBUTES_LIMIT

data = {}
if Distributed.inject(trace.to_digest, data)
message_attributes['_datadog'] = { :data_type => data_type, :binary_value => data.to_json }
end
end
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions lib/datadog/tracing/contrib/aws/service/sns.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ module Aws
module Service
# SNS tag handlers.
class SNS < Base
PROPAGATION_DATATYPE = 'Binary'

def process(config, trace, context)
return unless config[:propagation]

case context.operation
when :publish
inject_propagation(trace, context.params, PROPAGATION_DATATYPE)
# TODO: when :publish_batch # Future support for batch publishing
end
end

def add_tags(span, params)
topic_arn = params[:topic_arn]
topic_name = params[:name]
Expand Down
18 changes: 18 additions & 0 deletions lib/datadog/tracing/contrib/aws/service/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@ module Aws
module Service
# SQS tag handlers.
class SQS < Base
DATATYPE = 'String'
def before_span(config, context, response)
return unless context.operation == :receive_message && config[:propagation]

# Parent the current trace based on distributed message attributes
extract_propagation!(response, 'string_value') if config[:parentage_style] == 'distributed'
end

def process(config, trace, context)
return unless config[:propagation]

case context.operation
when :send_message
inject_propagation(trace, context.params, 'String')
# TODO: when :send_message_batch # Future support for batch sending
end
end

def add_tags(span, params)
queue_url = params[:queue_url]
queue_name = params[:queue_name]
Expand Down
59 changes: 59 additions & 0 deletions lib/datadog/tracing/distributed.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# frozen_string_literal: true

require_relative 'distributed/b3_multi'
require_relative 'distributed/b3_single'
require_relative 'distributed/datadog'
require_relative 'distributed/none'
require_relative 'distributed/propagation'
require_relative 'distributed/trace_context'
require_relative 'contrib/component'

module Datadog
module Tracing
# Namespace for distributed tracing propagation and correlation
module Distributed
module_function

# Inject distributed headers into the given request
# @param digest [Datadog::Tracing::TraceDigest] the trace to inject
# @param data [Hash] the request to inject
def inject(digest, data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.inject!(digest, data)
end

# Extract distributed headers from the given request
# @param data [Hash] the request to extract from
# @return [Datadog::Tracing::TraceDigest,nil] the extracted trace digest or nil if none was found
def extract(data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.extract(data)
end

Contrib::Component.register('distributed') do |config|
tracing = config.tracing
# DEV: evaluate propagation_style in case it overrides propagation_style_extract & propagation_extract_first
tracing.propagation_style

@propagation = Propagation.new(
propagation_styles: {
Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER =>
B3Multi.new(fetcher: Fetcher),
Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER =>
B3Single.new(fetcher: Fetcher),
Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG =>
Datadog.new(fetcher: Fetcher),
Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT =>
TraceContext.new(fetcher: Fetcher),
Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => None.new
},
propagation_style_inject: tracing.propagation_style_inject,
propagation_style_extract: tracing.propagation_style_extract,
propagation_extract_first: tracing.propagation_extract_first
)
end
end
end
end
54 changes: 54 additions & 0 deletions lib/datadog/tracing/trace_digest.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,60 @@ def merge(field_value_pairs)
}.merge!(field_value_pairs)
)
end

# rubocop:disable Metrics/AbcSize,Metrics/PerceivedComplexity,Metrics/CyclomaticComplexity
def ==(other)
self.class == other.class &&
span_id == other.span_id &&
span_name == other.span_name &&
span_resource == other.span_resource &&
span_service == other.span_service &&
span_type == other.span_type &&
trace_distributed_tags == other.trace_distributed_tags &&
trace_hostname == other.trace_hostname &&
trace_id == other.trace_id &&
trace_name == other.trace_name &&
trace_origin == other.trace_origin &&
trace_process_id == other.trace_process_id &&
trace_resource == other.trace_resource &&
trace_runtime_id == other.trace_runtime_id &&
trace_sampling_priority == other.trace_sampling_priority &&
trace_service == other.trace_service &&
trace_distributed_id == other.trace_distributed_id &&
trace_flags == other.trace_flags &&
trace_state == other.trace_state &&
trace_state_unknown_fields == other.trace_state_unknown_fields &&
span_remote == other.span_remote
end
# rubocop:enable Metrics/AbcSize,Metrics/PerceivedComplexity,Metrics/CyclomaticComplexity

alias eql? ==

def hash
[
self.class,
span_id,
span_name,
span_resource,
span_service,
span_type,
trace_distributed_tags,
trace_hostname,
trace_id,
trace_name,
trace_origin,
trace_process_id,
trace_resource,
trace_runtime_id,
trace_sampling_priority,
trace_service,
trace_distributed_id,
trace_flags,
trace_state,
trace_state_unknown_fields,
span_remote
].hash
end
end
end
end
2 changes: 2 additions & 0 deletions sig/datadog/tracing/contrib/aws/ext.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Datadog
ENV_ENABLED: "DD_TRACE_AWS_ENABLED"

ENV_PEER_SERVICE: "DD_TRACE_AWS_PEER_SERVICE"
ENV_PROPAGATION_ENABLED: string
ENV_SERVICE_NAME: "DD_TRACE_AWS_SERVICE_NAME"

ENV_ANALYTICS_ENABLED: "DD_TRACE_AWS_ANALYTICS_ENABLED"
Expand All @@ -14,6 +15,7 @@ module Datadog

DEFAULT_PEER_SERVICE_NAME: "aws"

ENV_TRACE_PARENTAGE_STYLE: string
PEER_SERVICE_SOURCES: Array[String]

SPAN_COMMAND: "aws.command"
Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/tracing/contrib/aws/service/base.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module Datadog
module Aws
module Service
class Base
MESSAGE_ATTRIBUTES_LIMIT: int

def add_tags: (untyped span, untyped params) -> nil
end
end
Expand Down
7 changes: 7 additions & 0 deletions sig/datadog/tracing/contrib/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ module Datadog
module Contrib
module Component
@registry: Hash[String, Proc]

def self.register: (string name) { (untyped) -> void } -> void
def self.configure: (Core::Configuration::Settings config) -> void

private

def self.unregister: (string name) -> void
end
end
end
Expand Down
Loading

0 comments on commit 8f0bc5d

Please sign in to comment.