-
Notifications
You must be signed in to change notification settings - Fork 377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Karafka integration #4147
base: master
Are you sure you want to change the base?
Add Karafka integration #4147
Conversation
595ea74
to
dd02868
Compare
Created a Jira card for Docs Team editorial review. |
::Karafka.monitor.subscribe 'worker.process' do |event| | ||
# Start a trace | ||
span = Tracing.trace(Ext::SPAN_WORKER_PROCESS, **span_options) | ||
|
||
job = event[:job] | ||
job_type = fetch_job_type(job.class) | ||
consumer = job.executor.topic.consumer | ||
topic = job.executor.topic.name | ||
|
||
action = case job_type | ||
when 'Periodic' | ||
'tick' | ||
when 'PeriodicNonBlocking' | ||
'tick' | ||
when 'Shutdown' | ||
'shutdown' | ||
when 'Revoked' | ||
'revoked' | ||
when 'RevokedNonBlocking' | ||
'revoked' | ||
when 'Idle' | ||
'idle' | ||
when 'Eofed' | ||
'eofed' | ||
when 'EofedNonBlocking' | ||
'eofed' | ||
else | ||
'consume' | ||
end | ||
|
||
span.resource = "#{consumer}##{action}" | ||
span.set_tag(Ext::TAG_TOPIC, topic) if topic | ||
|
||
if action == 'consume' | ||
span.set_tag(Ext::TAG_MESSAGE_COUNT, job.messages.count) | ||
span.set_tag(Ext::TAG_PARTITION, job.executor.partition) | ||
span.set_tag(Ext::TAG_OFFSET, job.messages.first.metadata.offset) | ||
end | ||
|
||
span | ||
end | ||
|
||
::Karafka.monitor.subscribe 'worker.completed' do |event| | ||
Tracing.active_span&.finish | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having separate locations for span creation Tracing.trace
and span conclusion Tracing.active_span&.finish
is always a possible source of hard-to-debug errors and span leakage (and thus memory leaks). We normally only do it when it's impossible to use Tracing.trace { do_work_here }
.
Also, Tracing.trace { do_work_here }
takes care of error handling, properly tagging the current span with error information.
In this case, we have the event worker.processed
that looks like it's just what we need:
https://github.com/karafka/karafka/blob/ab4f9bcd3620f46adb8c0d158b5396b245619ed3/lib/karafka/processing/worker.rb#L58-L78
Except that it doesn't call the event listeners when an error is raised by the job. There are no error handlers in this method that call assigned_listeners
: https://github.com/karafka/karafka-core/blob/a1425725d275796673424c1cd9be517d06518ec9/lib/karafka/core/monitoring/notifications.rb#L120
I opened a PR to Karafka to address this, but even if it's approved, it won't affect users of older versions of the library: karafka/karafka-core#145
The being said, I still lean towards using a single Tracing.trace { do_work_here }
only because the safety of not having to worry about leaky spans is too advantageous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That’s precisely what I’d like to hear from the Datadog team regarding this. My initial intuition when using Tracing.active_span
was that I couldn’t be certain whether the span I’m currently working on is the one I want to complete. Considering this and your insights, would it be feasible to remove this tracing event? At the moment, I prefer to retain the Ext::SPAN_MESSAGE_CONSUME
event only for this integration, as it effectively meets my requirement: enabling distributed tracing, ensuring that each message span is linked to the root trace in the distributed tracing system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened a PR to Karafka to address this, but even if it's approved, it won't affect users of older versions of the library
This PR is not needed to achieve the expected wrapping because it is already done for example for OpenTelemetry:
ref1: https://karafka.io/docs/Monitoring-and-Logging/#opentelemetry
ref2: https://karafka.io/docs/Monitoring-and-Logging/#monitor-wrapping-and-replacement
@@ -193,6 +193,7 @@ | |||
gem 'concurrent-ruby' | |||
gem 'dalli', '>= 3.0.0' | |||
gem 'grpc' | |||
gem 'karafka' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
karafka no longer supports 2.7 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/
@@ -115,6 +115,7 @@ | |||
gem 'concurrent-ruby' | |||
gem 'dalli', '>= 3.0.0' | |||
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support | |||
gem 'karafka' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
karafka no longer supports 3.0 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/
include Karafka::Event | ||
|
||
def self.subscribe! | ||
::Karafka.monitor.subscribe 'worker.process' do |event| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not recommend using this layer for instrumentation of that type and would advise you to reconsider something "closer" to the actual execution of work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible for you to explain what "closer" means here :)? I utilised what you did on instrumentation/vendors/datadog/logger_listener.rb here, but what I'm reconsidering here that we don't actually need to subscribe to this event anymore, because our goal is to have message traces for each message inside messages enumerator, so we can link them to the distributed traces. So I can remove this even if everything is getting out of control here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Karafka has several layers of reporting when a "job" is executed. The worker level is the highest, and I think of it as conceptually "distant" from the end user code execution layer. In between those, there are coordinators, executors, and more. The closest to the user code is the one that has consumer.
events, and while I myself use the worker level once in a while, I, in general, do not recommend it and recommend using the one mentioned above. At some point, I will probably migrate the once that I wrote myself. There is nothing fundamentally wrong about using the worker one but as mentioned, there's a lot in between.
because our goal is to have message traces for each message inside messages enumerator
But this is not the only way users process code. You need to keep in mind users that do batch operations as well, thus you want to trace around all the operational code also.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #4147 +/- ##
==========================================
- Coverage 97.78% 97.75% -0.03%
==========================================
Files 1353 1358 +5
Lines 81817 81920 +103
Branches 4145 4150 +5
==========================================
+ Hits 80001 80081 +80
- Misses 1816 1839 +23 ☔ View full report in Codecov by Sentry. |
FYI feel free to ping me once remarks are done. I will be happy to help and maybe in the future retire my own instrumentation in favour of the DD one ;) |
SPAN_MESSAGE_CONSUME = 'karafka.consume' | ||
SPAN_WORKER_PROCESS = 'worker.process' | ||
|
||
TAG_TOPIC = 'kafka.topic' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a side note because maybe it is done: please keep in mind that karafka supports multi-consumer group operations in one karafka process thus CG (consumer group) always needs to be reported alongside metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a couple of very small suggestions from Docs and approved the PR.
Use Tracing.trace wrapper to add consumer trace Use Instrumentation::Monitor to instrument and have a proper trace wrapper
Hi team and @marcotc Let's settle this. My gut feeling about this PR is that we do have some limitations in how tracing is integrated into the Karafka gem. I’ve switched to using This implementation fits perfectly with my system and has been working well with an internal patch. That said, let me know your thoughts on this integration. I’m fine if we can't settle on it, as I can always maintain the internal patch if needed. |
Sorry for the delay, I'll circle back here next week to get this PR unblocked and hopefully merged. |
@marcotc feel free to ping me as well prior to merging/deciding what to do with it. I will give it a second look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Co-authored-by: Marco Costa <[email protected]>
Co-authored-by: Marco Costa <[email protected]>
Co-authored-by: Marco Costa <[email protected]>
span.set_tag(Ext::TAG_OFFSET, message.metadata.offset) | ||
span.set_tag(Ext::TAG_TOPIC, message.topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a few standard tags for any messaging system spans, as these enable richer product integration for messaging system observability:
span.set_tag(Ext::TAG_OFFSET, message.metadata.offset) | |
span.set_tag(Ext::TAG_TOPIC, message.topic) | |
span.set_tag(Ext::TAG_OFFSET, message.metadata.offset) | |
span.set_tag('messaging.destination', message.topic) # Please create a constant for this one, close to `Contrib::Ext::Messaging::TAG_SYSTEM` | |
span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, 'kafka') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same should be done for the span Datadog::Tracing.trace(Ext::SPAN_WORKER_PROCESS) do |span|
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's the last of it! Thank you so much, @nvh0412!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @marcotc, let me know if the standard tags I’ve added are correct.
What does this PR do?
Fixed #1660
In this PR, we introduce the Kafka integration for the Karafka gem. Which includes:
Motivation:
We’re integrating Karafka to implement proper distributed tracing in our system with Datadog, as it lacks an official integration. This integration will also enable distributed tracing if the message headers include distributed tracing data.
Distributed tracing will help us create a proper service map, connecting Kafka producers and consumers.
Change log entry
Yes. Add Karafka integration for distributed tracing.
(Added by @ivoanjo)
Additional Notes:
How to test the change?