diff --git a/appraisal/ruby-2.7.rb b/appraisal/ruby-2.7.rb index 88b3b0f1e98..8f311d3cd48 100644 --- a/appraisal/ruby-2.7.rb +++ b/appraisal/ruby-2.7.rb @@ -202,6 +202,7 @@ gem 'concurrent-ruby' gem 'dalli', '>= 3.0.0' gem 'grpc' + gem 'karafka' gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596 gem 'rack-test' # Dev dependencies for testing rack-based code gem 'rake', '>= 12.3' diff --git a/appraisal/ruby-3.0.rb b/appraisal/ruby-3.0.rb index ac024429e2f..158a314cc98 100644 --- a/appraisal/ruby-3.0.rb +++ b/appraisal/ruby-3.0.rb @@ -124,6 +124,7 @@ gem 'concurrent-ruby' gem 'dalli', '>= 3.0.0' gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support + gem 'karafka' gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596 gem 'rack-test' # Dev dependencies for testing rack-based code gem 'rake', '>= 12.3' diff --git a/appraisal/ruby-3.1.rb b/appraisal/ruby-3.1.rb index ac024429e2f..158a314cc98 100644 --- a/appraisal/ruby-3.1.rb +++ b/appraisal/ruby-3.1.rb @@ -124,6 +124,7 @@ gem 'concurrent-ruby' gem 'dalli', '>= 3.0.0' gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support + gem 'karafka' gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596 gem 'rack-test' # Dev dependencies for testing rack-based code gem 'rake', '>= 12.3' diff --git a/appraisal/ruby-3.2.rb b/appraisal/ruby-3.2.rb index ac024429e2f..158a314cc98 100644 --- a/appraisal/ruby-3.2.rb +++ b/appraisal/ruby-3.2.rb @@ -124,6 +124,7 @@ gem 'concurrent-ruby' gem 'dalli', '>= 3.0.0' gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support + gem 'karafka' gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596 gem 'rack-test' # Dev dependencies for testing rack-based code gem 'rake', '>= 12.3' diff --git a/appraisal/ruby-3.3.rb b/appraisal/ruby-3.3.rb index c68111de4f3..41353458b36 100644 --- a/appraisal/ruby-3.3.rb +++ b/appraisal/ruby-3.3.rb @@ -124,6 +124,7 @@ gem 'concurrent-ruby' gem 'dalli', '>= 3.0.0' gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support + gem 'karafka' gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596 gem 'rack-test' # Dev dependencies for testing rack-based code gem 'rake', '>= 12.3' diff --git a/appraisal/ruby-3.4.rb b/appraisal/ruby-3.4.rb index 17a81807b66..d67dcc6e990 100644 --- a/appraisal/ruby-3.4.rb +++ b/appraisal/ruby-3.4.rb @@ -131,6 +131,7 @@ gem 'concurrent-ruby' gem 'dalli', '>= 3.0.0' gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support + gem 'karafka' gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596 gem 'rack-test' # Dev dependencies for testing rack-based code gem 'rake', '>= 12.3' diff --git a/docs/GettingStarted.md b/docs/GettingStarted.md index ecc44c66745..6f1804798b8 100644 --- a/docs/GettingStarted.md +++ b/docs/GettingStarted.md @@ -1112,6 +1112,27 @@ end | --------- | ------------------------------- | ------ | -------------------------------------------- | ------- | | `enabled` | `DD_TRACE_KAFKA_ENABLED` | `Bool` | Whether the integration should create spans. | `true` | +### Karafka + +The karafka integration provides tracing of the `karafka` gem. +You can enable it through `Datadog.configure`: + +```ruby +require 'karafka' +require 'datadog' + +Datadog.configure do |c| + c.tracing.instrument :karafka, **options +end + +``` +`options` are the following keyword arguments: + +| Key | Env Var | Type | Description | Default | +| --------------------- | ------------------------ | ------ | --------------------------------------------------- | ------- | +| `enabled` | `DD_TRACE_KARAFKA_ENABLED` | `Bool` | Specifies whether the integration should create spans. | `true` | +| `distributed_tracing` | | `Bool` | Enables [distributed tracing](#distributed-tracing). | `false` | + ### MongoDB The integration traces any `Command` that is sent from the [MongoDB Ruby Driver](https://github.com/mongodb/mongo-ruby-driver) to a MongoDB cluster. By extension, Object Document Mappers (ODM) such as Mongoid are automatically instrumented if they use the official Ruby driver. To activate the integration, simply: diff --git a/lib/datadog/tracing/contrib.rb b/lib/datadog/tracing/contrib.rb index 13a15640333..4e51e6a5b78 100644 --- a/lib/datadog/tracing/contrib.rb +++ b/lib/datadog/tracing/contrib.rb @@ -55,6 +55,7 @@ module Contrib require_relative 'contrib/httprb/integration' require_relative 'contrib/integration' require_relative 'contrib/kafka/integration' +require_relative 'contrib/karafka' require_relative 'contrib/lograge/integration' require_relative 'contrib/mongodb/integration' require_relative 'contrib/mysql2/integration' diff --git a/lib/datadog/tracing/contrib/ext.rb b/lib/datadog/tracing/contrib/ext.rb index 0861b1141c2..88a2d23a0ab 100644 --- a/lib/datadog/tracing/contrib/ext.rb +++ b/lib/datadog/tracing/contrib/ext.rb @@ -48,6 +48,7 @@ module GRPC module Messaging TAG_SYSTEM = 'messaging.system' + TAG_DESTINATION = 'messaging.destination' PEER_SERVICE_SOURCES = Array[Tracing::Metadata::Ext::NET::TAG_DESTINATION_NAME, Tracing::Metadata::Ext::TAG_PEER_HOSTNAME, Tracing::Metadata::Ext::NET::TAG_TARGET_HOST,].freeze diff --git a/lib/datadog/tracing/contrib/karafka.rb b/lib/datadog/tracing/contrib/karafka.rb new file mode 100644 index 00000000000..cc456287e18 --- /dev/null +++ b/lib/datadog/tracing/contrib/karafka.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require_relative 'component' +require_relative 'karafka/integration' +require_relative 'karafka/distributed/propagation' + +module Datadog + module Tracing + module Contrib + # `Karafka` integration public API + module Karafka + def self.inject(digest, data) + raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation + + @propagation.inject!(digest, data) + end + + def self.extract(data) + raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation + + @propagation.extract(data) + end + + Contrib::Component.register('karafka') do |config| + tracing = config.tracing + tracing.propagation_style + + @propagation = Sidekiq::Distributed::Propagation.new( + propagation_style_inject: tracing.propagation_style_inject, + propagation_style_extract: tracing.propagation_style_extract, + propagation_extract_first: tracing.propagation_extract_first + ) + end + end + end + end +end diff --git a/lib/datadog/tracing/contrib/karafka/configuration/settings.rb b/lib/datadog/tracing/contrib/karafka/configuration/settings.rb new file mode 100644 index 00000000000..9eae5c6909d --- /dev/null +++ b/lib/datadog/tracing/contrib/karafka/configuration/settings.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +require_relative '../../configuration/settings' +require_relative '../ext' + +module Datadog + module Tracing + module Contrib + module Karafka + module Configuration + # @public_api + class Settings < Contrib::Configuration::Settings + option :enabled do |o| + o.type :bool + o.env Ext::ENV_ENABLED + o.default true + end + + option :service_name + + option :distributed_tracing, default: false, type: :bool + end + end + end + end + end +end diff --git a/lib/datadog/tracing/contrib/karafka/distributed/propagation.rb b/lib/datadog/tracing/contrib/karafka/distributed/propagation.rb new file mode 100644 index 00000000000..5aa8fe8e363 --- /dev/null +++ b/lib/datadog/tracing/contrib/karafka/distributed/propagation.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +require_relative '../../../distributed/fetcher' +require_relative '../../../distributed/propagation' +require_relative '../../../distributed/b3_multi' +require_relative '../../../distributed/b3_single' +require_relative '../../../distributed/datadog' +require_relative '../../../distributed/none' +require_relative '../../../distributed/trace_context' +require_relative '../../../configuration/ext' + +module Datadog + module Tracing + module Contrib + module Karafka + module Distributed + # Extracts and injects propagation through Kafka message headers. + class Propagation < Tracing::Distributed::Propagation + def initialize( + propagation_style_inject:, + propagation_style_extract:, + propagation_extract_first: + ) + super( + propagation_styles: { + Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER => + Tracing::Distributed::B3Multi.new(fetcher: Tracing::Distributed::Fetcher), + Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER => + Tracing::Distributed::B3Single.new(fetcher: Tracing::Distributed::Fetcher), + Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG => + Tracing::Distributed::Datadog.new(fetcher: Tracing::Distributed::Fetcher), + Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT => + Tracing::Distributed::TraceContext.new(fetcher: Tracing::Distributed::Fetcher), + Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => Tracing::Distributed::None.new + }, + propagation_style_inject: propagation_style_inject, + propagation_style_extract: propagation_style_extract, + propagation_extract_first: propagation_extract_first + ) + end + end + end + end + end + end +end + diff --git a/lib/datadog/tracing/contrib/karafka/ext.rb b/lib/datadog/tracing/contrib/karafka/ext.rb new file mode 100644 index 00000000000..ea22eaa5223 --- /dev/null +++ b/lib/datadog/tracing/contrib/karafka/ext.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Datadog + module Tracing + module Contrib + module Karafka + module Ext + ENV_ENABLED = 'DD_TRACE_KARAFKA_ENABLED' + + SPAN_MESSAGE_CONSUME = 'karafka.consume' + SPAN_WORKER_PROCESS = 'worker.process' + + TAG_CONSUMER = 'kafka.consumer' + TAG_TOPIC = 'kafka.topic' + TAG_PARTITION = 'kafka.partition' + TAG_OFFSET = 'kafka.offset' + TAG_OFFSET_LAG = 'kafka.offset_lag' + TAG_MESSAGE_COUNT = 'kafka.message_count' + TAG_MESSAGE_KEY = 'kafka.message_key' + TAG_SYSTEM = 'kafka' + + TAG_OPERATION_PROCESS_BATCH = 'consumer.process_batch' + end + end + end + end +end diff --git a/lib/datadog/tracing/contrib/karafka/integration.rb b/lib/datadog/tracing/contrib/karafka/integration.rb new file mode 100644 index 00000000000..4a89d643287 --- /dev/null +++ b/lib/datadog/tracing/contrib/karafka/integration.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +require_relative '../integration' +require_relative 'patcher' + +module Datadog + module Tracing + module Contrib + module Karafka + # Description of Kafka integration + class Integration + include Contrib::Integration + + # Minimum version of the Karafka library that we support + # https://karafka.io/docs/Versions-Lifecycle-and-EOL/#versioning-strategy + MINIMUM_VERSION = Gem::Version.new('2.2.0') + + # @public_api Changing the integration name or integration options can cause breaking changes + register_as :karafka, auto_patch: false + + def self.version + Gem.loaded_specs['karafka'] && Gem.loaded_specs['karafka'].version + end + + def self.loaded? + !defined?(::Karafka).nil? + end + + def self.compatible? + super && version >= MINIMUM_VERSION + end + + def new_configuration + Configuration::Settings.new + end + + def patcher + Patcher + end + end + end + end + end +end diff --git a/lib/datadog/tracing/contrib/karafka/monitor.rb b/lib/datadog/tracing/contrib/karafka/monitor.rb new file mode 100644 index 00000000000..a0972e0de1a --- /dev/null +++ b/lib/datadog/tracing/contrib/karafka/monitor.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true + +require_relative 'ext' + +module Datadog + module Tracing + module Contrib + module Karafka + # Custom monitor for Karafka. + # Creating a custom monitor, instead of subscribing to an event (e.g. `Karafka.monitor.subscribe 'worker.processed'`), + # is required because event subscriptions cannot wrap the event execution (`yield`). + class Monitor < ::Karafka::Instrumentation::Monitor + TRACEABLE_EVENTS = %w[ + worker.processed + ].freeze + + def instrument(event_id, payload = EMPTY_HASH, &block) + return super unless TRACEABLE_EVENTS.include?(event_id) + + Datadog::Tracing.trace(Ext::SPAN_WORKER_PROCESS) do |span| + job = payload[:job] + job_type = fetch_job_type(job.class) + consumer = job.executor.topic.consumer + + action = case job_type + when 'Periodic' + 'tick' + when 'PeriodicNonBlocking' + 'tick' + when 'Shutdown' + 'shutdown' + when 'Revoked' + 'revoked' + when 'RevokedNonBlocking' + 'revoked' + when 'Idle' + 'idle' + when 'Eofed' + 'eofed' + when 'EofedNonBlocking' + 'eofed' + else + 'consume' + end + + span.resource = "#{consumer}##{action}" + + if action == 'consume' + span.set_tag(Ext::TAG_MESSAGE_COUNT, job.messages.count) + span.set_tag(Ext::TAG_PARTITION, job.executor.partition) + span.set_tag(Ext::TAG_OFFSET, job.messages.first.metadata.offset) + span.set_tag(Ext::TAG_CONSUMER, consumer) + span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, job.executor.topic.name) + span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM) + end + + super + end + end + + private + + def fetch_job_type(job_class) + @job_types_cache ||= {} + @job_types_cache[job_class] ||= job_class.to_s.split('::').last + end + end + end + end + end +end diff --git a/lib/datadog/tracing/contrib/karafka/patcher.rb b/lib/datadog/tracing/contrib/karafka/patcher.rb new file mode 100644 index 00000000000..f9a485e6c01 --- /dev/null +++ b/lib/datadog/tracing/contrib/karafka/patcher.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +require_relative '../patcher' +require_relative 'ext' +require_relative 'distributed/propagation' + +module Datadog + module Tracing + module Contrib + module Karafka + # Patch to add tracing to Karafka::Messages::Messages + module MessagesPatch + def configuration + Datadog.configuration.tracing[:karafka] + end + + def propagation + @propagation ||= Contrib::Karafka::Distributed::Propagation.new + end + + # `each` is the most popular access point to Karafka messages, but not the only one + # Other access patterns do not have a straightforward tracing avenue (e.g. `my_batch_operation messages.payloads`) + # @see https://github.com/karafka/karafka/blob/b06d1f7c17818e1605f80c2bb573454a33376b40/README.md?plain=1#L29-L35 + def each(&block) + @messages_array.each do |message| + if configuration[:distributed_tracing] + trace_digest = Karafka.extract(message.metadata.headers) + Datadog::Tracing.continue_trace!(trace_digest) if trace_digest + end + + Tracing.trace(Ext::SPAN_MESSAGE_CONSUME) do |span| + span.set_tag(Ext::TAG_OFFSET, message.metadata.offset) + span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, message.topic) + span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM) + + span.resource = message.topic + + yield message + end + end + end + end + + # Patcher enables patching of 'karafka' module. + module Patcher + include Contrib::Patcher + + module_function + + def target_version + Integration.version + end + + def patch + require_relative 'monitor' + + ::Karafka::App.config.monitor = Monitor.new + ::Karafka::Messages::Messages.prepend(MessagesPatch) + end + end + end + end + end +end diff --git a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb new file mode 100644 index 00000000000..2a8db3c7464 --- /dev/null +++ b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb @@ -0,0 +1,84 @@ +require 'datadog/tracing/contrib/support/spec_helper' +require 'datadog/tracing/contrib/analytics_examples' + +require 'karafka' +require 'datadog' + +RSpec.describe 'Karafka patcher' do + let(:configuration_options) { {} } + let(:client_id) { SecureRandom.uuid } + let(:span) do + spans.find { |s| s.name == span_name } + end + + before do + Datadog.configure do |c| + c.tracing.instrument :karafka, configuration_options + end + end + + around do |example| + # Reset before and after each example; don't allow global state to linger. + Datadog.registry[:karafka].reset_configuration! + example.run + Datadog.registry[:karafka].reset_configuration! + end + + describe 'Karafka::message#consume' do + let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_MESSAGE_CONSUME } + + it 'is expected to send a span' do + metadata = ::Karafka::Messages::Metadata.new.tap do |metadata| + metadata['offset'] = 412 + end + raw_payload = rand.to_s + + message = Karafka::Messages::Message.new(raw_payload, metadata) + allow(message).to receive(:timestamp).and_return(Time.now) + allow(message).to receive(:topic).and_return('topic_a') + + topic = Karafka::Routing::Topic.new('topic_a', double(id: 0)) + + messages = Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) + + messages.each do |msg| + expect(msg).to be_a(Karafka::Messages::Message) + end + + expect(spans).to have(1).items + expect(span).to_not be nil + expect(span.get_tag('kafka.offset')).to eq 412 + expect(span.get_tag('messaging.destination')).to eq 'topic_a' + expect(span.get_tag('messaging.system')).to eq 'kafka' + expect(span).to_not have_error + expect(span.resource).to eq 'topic_a' + end + end + + describe 'worker.processed' do + let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_WORKER_PROCESS } + + it 'is expected to send a span' do + metadata = ::Karafka::Messages::Metadata.new.tap do |metadata| + metadata['offset'] = 412 + end + raw_payload = rand.to_s + + message = Karafka::Messages::Message.new(raw_payload, metadata) + job = double(executor: double(topic: double(name: 'topic_a', consumer: 'ABC'), partition: 0), messages: [message]) + + Karafka.monitor.instrument('worker.processed', { job: job }) do + # Noop + end + + expect(spans).to have(1).items + expect(span).to_not be nil + expect(span.get_tag('kafka.offset')).to eq 412 + expect(span.get_tag('kafka.partition')).to eq 0 + expect(span.get_tag('kafka.message_count')).to eq 1 + expect(span.get_tag('messaging.destination')).to eq 'topic_a' + expect(span.get_tag('messaging.system')).to eq 'kafka' + expect(span.resource).to eq 'ABC#consume' + end + end +end diff --git a/spec/datadog/tracing/contrib_spec.rb b/spec/datadog/tracing/contrib_spec.rb index ec3f8f37ca3..f50cdfb2739 100644 --- a/spec/datadog/tracing/contrib_spec.rb +++ b/spec/datadog/tracing/contrib_spec.rb @@ -52,7 +52,8 @@ 'sneakers' => 'Sneakers', 'stripe' => 'Stripe', 'sucker_punch' => 'SuckerPunch', - 'trilogy' => 'Trilogy' + 'trilogy' => 'Trilogy', + 'karafka' => 'Karafka' } Dir.chdir("#{root}/lib/datadog/tracing/contrib") do |pwd|