-
Notifications
You must be signed in to change notification settings - Fork 377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Karafka integration #4147
base: master
Are you sure you want to change the base?
Add Karafka integration #4147
Changes from 13 commits
dd02868
10645a7
ecb1897
f9b6e36
41cb65f
9dd469c
fbbbcb0
f79c7fb
27e5711
e19fcd7
21128e9
c88a710
a00244d
9007248
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,6 +124,7 @@ | |
gem 'concurrent-ruby' | ||
gem 'dalli', '>= 3.0.0' | ||
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support | ||
gem 'karafka' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. karafka no longer supports 3.0 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/ |
||
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596 | ||
gem 'rack-test' # Dev dependencies for testing rack-based code | ||
gem 'rake', '>= 12.3' | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
# frozen_string_literal: true | ||
|
||
require_relative 'component' | ||
require_relative 'karafka/integration' | ||
require_relative 'karafka/distributed/propagation' | ||
|
||
module Datadog | ||
module Tracing | ||
module Contrib | ||
# `Karafka` integration public API | ||
module Karafka | ||
def self.inject(digest, data) | ||
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation | ||
|
||
@propagation.inject!(digest, data) | ||
end | ||
|
||
def self.extract(data) | ||
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation | ||
|
||
@propagation.extract(data) | ||
end | ||
|
||
Contrib::Component.register('karafka') do |config| | ||
tracing = config.tracing | ||
tracing.propagation_style | ||
|
||
@propagation = Sidekiq::Distributed::Propagation.new( | ||
propagation_style_inject: tracing.propagation_style_inject, | ||
propagation_style_extract: tracing.propagation_style_extract, | ||
propagation_extract_first: tracing.propagation_extract_first | ||
) | ||
end | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# frozen_string_literal: true | ||
|
||
require_relative '../../configuration/settings' | ||
require_relative '../ext' | ||
|
||
module Datadog | ||
module Tracing | ||
module Contrib | ||
module Karafka | ||
module Configuration | ||
# @public_api | ||
class Settings < Contrib::Configuration::Settings | ||
option :enabled do |o| | ||
o.type :bool | ||
o.env Ext::ENV_ENABLED | ||
o.default true | ||
end | ||
|
||
option :service_name | ||
|
||
option :distributed_tracing, default: false, type: :bool | ||
marcotc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
end | ||
end | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
# frozen_string_literal: true | ||
|
||
require_relative '../../../distributed/fetcher' | ||
require_relative '../../../distributed/propagation' | ||
require_relative '../../../distributed/b3_multi' | ||
require_relative '../../../distributed/b3_single' | ||
require_relative '../../../distributed/datadog' | ||
require_relative '../../../distributed/none' | ||
require_relative '../../../distributed/trace_context' | ||
require_relative '../../../configuration/ext' | ||
|
||
module Datadog | ||
module Tracing | ||
module Contrib | ||
module Karafka | ||
module Distributed | ||
# Extracts and injects propagation through Kafka message headers. | ||
class Propagation < Tracing::Distributed::Propagation | ||
def initialize( | ||
propagation_style_inject:, | ||
propagation_style_extract:, | ||
propagation_extract_first: | ||
) | ||
super( | ||
propagation_styles: { | ||
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER => | ||
Tracing::Distributed::B3Multi.new(fetcher: Tracing::Distributed::Fetcher), | ||
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER => | ||
Tracing::Distributed::B3Single.new(fetcher: Tracing::Distributed::Fetcher), | ||
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG => | ||
Tracing::Distributed::Datadog.new(fetcher: Tracing::Distributed::Fetcher), | ||
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT => | ||
Tracing::Distributed::TraceContext.new(fetcher: Tracing::Distributed::Fetcher), | ||
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => Tracing::Distributed::None.new | ||
}, | ||
propagation_style_inject: propagation_style_inject, | ||
propagation_style_extract: propagation_style_extract, | ||
propagation_extract_first: propagation_extract_first | ||
) | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# frozen_string_literal: true | ||
|
||
module Datadog | ||
module Tracing | ||
module Contrib | ||
module Karafka | ||
module Ext | ||
ENV_ENABLED = 'DD_TRACE_KARAFKA_ENABLED' | ||
|
||
SPAN_MESSAGE_CONSUME = 'karafka.consume' | ||
SPAN_WORKER_PROCESS = 'worker.process' | ||
|
||
TAG_CONSUMER = 'kafka.consumer' | ||
TAG_TOPIC = 'kafka.topic' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a side note because maybe it is done: please keep in mind that karafka supports multi-consumer group operations in one karafka process thus CG (consumer group) always needs to be reported alongside metrics. |
||
TAG_PARTITION = 'kafka.partition' | ||
TAG_OFFSET = 'kafka.offset' | ||
TAG_OFFSET_LAG = 'kafka.offset_lag' | ||
TAG_MESSAGE_COUNT = 'kafka.message_count' | ||
TAG_MESSAGE_KEY = 'kafka.message_key' | ||
|
||
TAG_OPERATION_PROCESS_BATCH = 'consumer.process_batch' | ||
end | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# frozen_string_literal: true | ||
|
||
require_relative '../integration' | ||
require_relative 'patcher' | ||
|
||
module Datadog | ||
module Tracing | ||
module Contrib | ||
module Karafka | ||
# Description of Kafka integration | ||
class Integration | ||
include Contrib::Integration | ||
|
||
# Minimum version of the Karafka library that we support | ||
# https://karafka.io/docs/Versions-Lifecycle-and-EOL/#versioning-strategy | ||
MINIMUM_VERSION = Gem::Version.new('2.2.0') | ||
|
||
# @public_api Changing the integration name or integration options can cause breaking changes | ||
register_as :karafka, auto_patch: false | ||
|
||
def self.version | ||
Gem.loaded_specs['karafka'] && Gem.loaded_specs['karafka'].version | ||
end | ||
|
||
def self.loaded? | ||
!defined?(::Karafka).nil? | ||
end | ||
|
||
def self.compatible? | ||
super && version >= MINIMUM_VERSION | ||
nvh0412 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
end | ||
|
||
def new_configuration | ||
Configuration::Settings.new | ||
end | ||
|
||
def patcher | ||
Patcher | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
# frozen_string_literal: true | ||
|
||
require_relative 'ext' | ||
|
||
module Datadog | ||
module Tracing | ||
module Contrib | ||
module Karafka | ||
# Custom monitor for Karafka. | ||
# Creating a custom monitor, instead of subscribing to an event (e.g. `Karafka.monitor.subscribe 'worker.processed'`), | ||
# is required because event subscriptions cannot wrap the event execution (`yield`). | ||
class Monitor < ::Karafka::Instrumentation::Monitor | ||
TRACEABLE_EVENTS = %w[ | ||
worker.processed | ||
marcotc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
].freeze | ||
|
||
def instrument(event_id, payload = EMPTY_HASH, &block) | ||
return super unless TRACEABLE_EVENTS.include?(event_id) | ||
|
||
Datadog::Tracing.trace(Ext::SPAN_WORKER_PROCESS) do |span| | ||
job = payload[:job] | ||
job_type = fetch_job_type(job.class) | ||
consumer = job.executor.topic.consumer | ||
|
||
action = case job_type | ||
when 'Periodic' | ||
'tick' | ||
when 'PeriodicNonBlocking' | ||
'tick' | ||
when 'Shutdown' | ||
'shutdown' | ||
when 'Revoked' | ||
'revoked' | ||
when 'RevokedNonBlocking' | ||
'revoked' | ||
when 'Idle' | ||
'idle' | ||
when 'Eofed' | ||
'eofed' | ||
when 'EofedNonBlocking' | ||
'eofed' | ||
else | ||
'consume' | ||
end | ||
|
||
span.resource = "#{consumer}##{action}" | ||
|
||
if action == 'consume' | ||
span.set_tag(Ext::TAG_MESSAGE_COUNT, job.messages.count) | ||
span.set_tag(Ext::TAG_PARTITION, job.executor.partition) | ||
span.set_tag(Ext::TAG_OFFSET, job.messages.first.metadata.offset) | ||
span.set_tag(Ext::TAG_TOPIC, job.executor.topic.name) | ||
span.set_tag(Ext::TAG_CONSUMER, consumer) | ||
end | ||
|
||
super | ||
end | ||
end | ||
|
||
private | ||
|
||
def fetch_job_type(job_class) | ||
@job_types_cache ||= {} | ||
@job_types_cache[job_class] ||= job_class.to_s.split('::').last | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
karafka no longer supports 2.7 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/