Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Karafka integration #4147

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions appraisal/ruby-2.7.rb
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc'
gem 'karafka'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

karafka no longer supports 2.7 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/

gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.0.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

karafka no longer supports 3.0 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/

gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.1.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.4.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
21 changes: 21 additions & 0 deletions docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,27 @@ end
| --------- | ------------------------------- | ------ | -------------------------------------------- | ------- |
| `enabled` | `DD_TRACE_KAFKA_ENABLED` | `Bool` | Whether the integration should create spans. | `true` |

### Karafka
nvh0412 marked this conversation as resolved.
Show resolved Hide resolved

The karafka integration provides tracing of the `karafka` gem.
You can enable it through `Datadog.configure`:

```ruby
require 'karafka'
require 'datadog'

Datadog.configure do |c|
c.tracing.instrument :karafka, **options
end

```
`options` are the following keyword arguments:

| Key | Env Var | Type | Description | Default |
| --------------------- | ------------------------ | ------ | --------------------------------------------------- | ------- |
| `enabled` | `DD_TRACE_KARAFKA_ENABLED` | `Bool` | Specifies whether the integration should create spans. | `true` |
| `distributed_tracing` | | `Bool` | Enables [distributed tracing](#distributed-tracing). | `false` |

### MongoDB

The integration traces any `Command` that is sent from the [MongoDB Ruby Driver](https://github.com/mongodb/mongo-ruby-driver) to a MongoDB cluster. By extension, Object Document Mappers (ODM) such as Mongoid are automatically instrumented if they use the official Ruby driver. To activate the integration, simply:
Expand Down
1 change: 1 addition & 0 deletions lib/datadog/tracing/contrib.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ module Contrib
require_relative 'contrib/httprb/integration'
require_relative 'contrib/integration'
require_relative 'contrib/kafka/integration'
require_relative 'contrib/karafka'
require_relative 'contrib/lograge/integration'
require_relative 'contrib/mongodb/integration'
require_relative 'contrib/mysql2/integration'
Expand Down
37 changes: 37 additions & 0 deletions lib/datadog/tracing/contrib/karafka.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

require_relative 'component'
require_relative 'karafka/integration'
require_relative 'karafka/distributed/propagation'

module Datadog
module Tracing
module Contrib
# `Karafka` integration public API
module Karafka
def self.inject(digest, data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.inject!(digest, data)
end

def self.extract(data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.extract(data)
end

Contrib::Component.register('karafka') do |config|
tracing = config.tracing
tracing.propagation_style

@propagation = Sidekiq::Distributed::Propagation.new(
propagation_style_inject: tracing.propagation_style_inject,
propagation_style_extract: tracing.propagation_style_extract,
propagation_extract_first: tracing.propagation_extract_first
)
end
end
end
end
end
27 changes: 27 additions & 0 deletions lib/datadog/tracing/contrib/karafka/configuration/settings.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

require_relative '../../configuration/settings'
require_relative '../ext'

module Datadog
module Tracing
module Contrib
module Karafka
module Configuration
# @public_api
class Settings < Contrib::Configuration::Settings
option :enabled do |o|
o.type :bool
o.env Ext::ENV_ENABLED
o.default true
end

option :service_name

option :distributed_tracing, default: false, type: :bool
marcotc marked this conversation as resolved.
Show resolved Hide resolved
end
end
end
end
end
end
47 changes: 47 additions & 0 deletions lib/datadog/tracing/contrib/karafka/distributed/propagation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# frozen_string_literal: true

require_relative '../../../distributed/fetcher'
require_relative '../../../distributed/propagation'
require_relative '../../../distributed/b3_multi'
require_relative '../../../distributed/b3_single'
require_relative '../../../distributed/datadog'
require_relative '../../../distributed/none'
require_relative '../../../distributed/trace_context'
require_relative '../../../configuration/ext'

module Datadog
module Tracing
module Contrib
module Karafka
module Distributed
# Extracts and injects propagation through Kafka message headers.
class Propagation < Tracing::Distributed::Propagation
def initialize(
propagation_style_inject:,
propagation_style_extract:,
propagation_extract_first:
)
super(
propagation_styles: {
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER =>
Tracing::Distributed::B3Multi.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER =>
Tracing::Distributed::B3Single.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG =>
Tracing::Distributed::Datadog.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT =>
Tracing::Distributed::TraceContext.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => Tracing::Distributed::None.new
},
propagation_style_inject: propagation_style_inject,
propagation_style_extract: propagation_style_extract,
propagation_extract_first: propagation_extract_first
)
end
end
end
end
end
end
end

26 changes: 26 additions & 0 deletions lib/datadog/tracing/contrib/karafka/ext.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

module Datadog
module Tracing
module Contrib
module Karafka
module Ext
ENV_ENABLED = 'DD_TRACE_KARAFKA_ENABLED'

SPAN_MESSAGE_CONSUME = 'karafka.consume'
SPAN_WORKER_PROCESS = 'worker.process'

TAG_CONSUMER = 'kafka.consumer'
TAG_TOPIC = 'kafka.topic'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a side note because maybe it is done: please keep in mind that karafka supports multi-consumer group operations in one karafka process thus CG (consumer group) always needs to be reported alongside metrics.

TAG_PARTITION = 'kafka.partition'
TAG_OFFSET = 'kafka.offset'
TAG_OFFSET_LAG = 'kafka.offset_lag'
TAG_MESSAGE_COUNT = 'kafka.message_count'
TAG_MESSAGE_KEY = 'kafka.message_key'

TAG_OPERATION_PROCESS_BATCH = 'consumer.process_batch'
end
end
end
end
end
44 changes: 44 additions & 0 deletions lib/datadog/tracing/contrib/karafka/integration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# frozen_string_literal: true

require_relative '../integration'
require_relative 'patcher'

module Datadog
module Tracing
module Contrib
module Karafka
# Description of Kafka integration
class Integration
include Contrib::Integration

# Minimum version of the Karafka library that we support
# https://karafka.io/docs/Versions-Lifecycle-and-EOL/#versioning-strategy
MINIMUM_VERSION = Gem::Version.new('2.2.0')

# @public_api Changing the integration name or integration options can cause breaking changes
register_as :karafka, auto_patch: false

def self.version
Gem.loaded_specs['karafka'] && Gem.loaded_specs['karafka'].version
end

def self.loaded?
!defined?(::Karafka).nil?
end

def self.compatible?
super && version >= MINIMUM_VERSION
nvh0412 marked this conversation as resolved.
Show resolved Hide resolved
end

def new_configuration
Configuration::Settings.new
end

def patcher
Patcher
end
end
end
end
end
end
70 changes: 70 additions & 0 deletions lib/datadog/tracing/contrib/karafka/monitor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# frozen_string_literal: true

require_relative 'ext'

module Datadog
module Tracing
module Contrib
module Karafka
# Custom monitor for Karafka.
# Creating a custom monitor, instead of subscribing to an event (e.g. `Karafka.monitor.subscribe 'worker.processed'`),
# is required because event subscriptions cannot wrap the event execution (`yield`).
class Monitor < ::Karafka::Instrumentation::Monitor
TRACEABLE_EVENTS = %w[
worker.processed
marcotc marked this conversation as resolved.
Show resolved Hide resolved
].freeze

def instrument(event_id, payload = EMPTY_HASH, &block)
return super unless TRACEABLE_EVENTS.include?(event_id)

Datadog::Tracing.trace(Ext::SPAN_WORKER_PROCESS) do |span|
job = payload[:job]
job_type = fetch_job_type(job.class)
consumer = job.executor.topic.consumer

action = case job_type
when 'Periodic'
'tick'
when 'PeriodicNonBlocking'
'tick'
when 'Shutdown'
'shutdown'
when 'Revoked'
'revoked'
when 'RevokedNonBlocking'
'revoked'
when 'Idle'
'idle'
when 'Eofed'
'eofed'
when 'EofedNonBlocking'
'eofed'
else
'consume'
end

span.resource = "#{consumer}##{action}"

if action == 'consume'
span.set_tag(Ext::TAG_MESSAGE_COUNT, job.messages.count)
span.set_tag(Ext::TAG_PARTITION, job.executor.partition)
span.set_tag(Ext::TAG_OFFSET, job.messages.first.metadata.offset)
span.set_tag(Ext::TAG_TOPIC, job.executor.topic.name)
span.set_tag(Ext::TAG_CONSUMER, consumer)
end

super
end
end

private

def fetch_job_type(job_class)
@job_types_cache ||= {}
@job_types_cache[job_class] ||= job_class.to_s.split('::').last
end
end
end
end
end
end
Loading