Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Karafka integration #4147

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open

Conversation

nvh0412
Copy link
Contributor

@nvh0412 nvh0412 commented Nov 22, 2024

What does this PR do?

Fixed #1660

In this PR, we introduce the Kafka integration for the Karafka gem. Which includes:

  1. Distributed tracing by utilizing message.metadata.headers
  2. Traces for worker.process and each message executor inside the batch, so we can link them to the origin trace if the distributed tracing is on

Motivation:

We’re integrating Karafka to implement proper distributed tracing in our system with Datadog, as it lacks an official integration. This integration will also enable distributed tracing if the message headers include distributed tracing data.

Distributed tracing will help us create a proper service map, connecting Kafka producers and consumers.

Change log entry

Yes. Add Karafka integration for distributed tracing.

(Added by @ivoanjo)

Additional Notes:

How to test the change?

Screenshot 2024-11-22 at 3 43 09 PM

@github-actions github-actions bot added integrations Involves tracing integrations tracing labels Nov 22, 2024
@nvh0412 nvh0412 marked this pull request as ready for review November 22, 2024 06:07
@nvh0412 nvh0412 requested review from a team as code owners November 22, 2024 06:07
@drichards-87
Copy link

Created a Jira card for Docs Team editorial review.

@drichards-87 drichards-87 added the editorial review Waiting for a review from the docs team label Nov 22, 2024
Comment on lines 16 to 60
::Karafka.monitor.subscribe 'worker.process' do |event|
# Start a trace
span = Tracing.trace(Ext::SPAN_WORKER_PROCESS, **span_options)

job = event[:job]
job_type = fetch_job_type(job.class)
consumer = job.executor.topic.consumer
topic = job.executor.topic.name

action = case job_type
when 'Periodic'
'tick'
when 'PeriodicNonBlocking'
'tick'
when 'Shutdown'
'shutdown'
when 'Revoked'
'revoked'
when 'RevokedNonBlocking'
'revoked'
when 'Idle'
'idle'
when 'Eofed'
'eofed'
when 'EofedNonBlocking'
'eofed'
else
'consume'
end

span.resource = "#{consumer}##{action}"
span.set_tag(Ext::TAG_TOPIC, topic) if topic

if action == 'consume'
span.set_tag(Ext::TAG_MESSAGE_COUNT, job.messages.count)
span.set_tag(Ext::TAG_PARTITION, job.executor.partition)
span.set_tag(Ext::TAG_OFFSET, job.messages.first.metadata.offset)
end

span
end

::Karafka.monitor.subscribe 'worker.completed' do |event|
Tracing.active_span&.finish
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having separate locations for span creation Tracing.trace and span conclusion Tracing.active_span&.finish is always a possible source of hard-to-debug errors and span leakage (and thus memory leaks). We normally only do it when it's impossible to use Tracing.trace { do_work_here }.

Also, Tracing.trace { do_work_here } takes care of error handling, properly tagging the current span with error information.

In this case, we have the event worker.processed that looks like it's just what we need:
https://github.com/karafka/karafka/blob/ab4f9bcd3620f46adb8c0d158b5396b245619ed3/lib/karafka/processing/worker.rb#L58-L78

Except that it doesn't call the event listeners when an error is raised by the job. There are no error handlers in this method that call assigned_listeners: https://github.com/karafka/karafka-core/blob/a1425725d275796673424c1cd9be517d06518ec9/lib/karafka/core/monitoring/notifications.rb#L120

I opened a PR to Karafka to address this, but even if it's approved, it won't affect users of older versions of the library: karafka/karafka-core#145

The being said, I still lean towards using a single Tracing.trace { do_work_here } only because the safety of not having to worry about leaky spans is too advantageous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s precisely what I’d like to hear from the Datadog team regarding this. My initial intuition when using Tracing.active_span was that I couldn’t be certain whether the span I’m currently working on is the one I want to complete. Considering this and your insights, would it be feasible to remove this tracing event? At the moment, I prefer to retain the Ext::SPAN_MESSAGE_CONSUME event only for this integration, as it effectively meets my requirement: enabling distributed tracing, ensuring that each message span is linked to the root trace in the distributed tracing system.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened a PR to Karafka to address this, but even if it's approved, it won't affect users of older versions of the library

This PR is not needed to achieve the expected wrapping because it is already done for example for OpenTelemetry:

ref1: https://karafka.io/docs/Monitoring-and-Logging/#opentelemetry
ref2: https://karafka.io/docs/Monitoring-and-Logging/#monitor-wrapping-and-replacement

@@ -193,6 +193,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc'
gem 'karafka'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

karafka no longer supports 2.7 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/

@@ -115,6 +115,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

karafka no longer supports 3.0 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/

include Karafka::Event

def self.subscribe!
::Karafka.monitor.subscribe 'worker.process' do |event|

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not recommend using this layer for instrumentation of that type and would advise you to reconsider something "closer" to the actual execution of work.

Copy link
Contributor Author

@nvh0412 nvh0412 Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible for you to explain what "closer" means here :)? I utilised what you did on instrumentation/vendors/datadog/logger_listener.rb here, but what I'm reconsidering here that we don't actually need to subscribe to this event anymore, because our goal is to have message traces for each message inside messages enumerator, so we can link them to the distributed traces. So I can remove this even if everything is getting out of control here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Karafka has several layers of reporting when a "job" is executed. The worker level is the highest, and I think of it as conceptually "distant" from the end user code execution layer. In between those, there are coordinators, executors, and more. The closest to the user code is the one that has consumer. events, and while I myself use the worker level once in a while, I, in general, do not recommend it and recommend using the one mentioned above. At some point, I will probably migrate the once that I wrote myself. There is nothing fundamentally wrong about using the worker one but as mentioned, there's a lot in between.

because our goal is to have message traces for each message inside messages enumerator

But this is not the only way users process code. You need to keep in mind users that do batch operations as well, thus you want to trace around all the operational code also.

@codecov-commenter
Copy link

codecov-commenter commented Nov 22, 2024

Codecov Report

Attention: Patch coverage is 80.58252% with 20 lines in your changes missing coverage. Please review.

Project coverage is 97.75%. Comparing base (ce4393e) to head (27e5711).
Report is 41 commits behind head on master.

Files with missing lines Patch % Lines
lib/datadog/tracing/contrib/karafka/patcher.rb 51.61% 15 Missing ⚠️
lib/datadog/tracing/contrib/karafka.rb 80.00% 4 Missing ⚠️
lib/datadog/tracing/contrib/karafka/integration.rb 95.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #4147      +/-   ##
==========================================
- Coverage   97.78%   97.75%   -0.03%     
==========================================
  Files        1353     1358       +5     
  Lines       81817    81920     +103     
  Branches     4145     4150       +5     
==========================================
+ Hits        80001    80081      +80     
- Misses       1816     1839      +23     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@mensfeld
Copy link

FYI feel free to ping me once remarks are done. I will be happy to help and maybe in the future retire my own instrumentation in favour of the DD one ;)

SPAN_MESSAGE_CONSUME = 'karafka.consume'
SPAN_WORKER_PROCESS = 'worker.process'

TAG_TOPIC = 'kafka.topic'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a side note because maybe it is done: please keep in mind that karafka supports multi-consumer group operations in one karafka process thus CG (consumer group) always needs to be reported alongside metrics.

Copy link

@drichards-87 drichards-87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple of very small suggestions from Docs and approved the PR.

docs/GettingStarted.md Outdated Show resolved Hide resolved
docs/GettingStarted.md Outdated Show resolved Hide resolved
Use Tracing.trace wrapper to add consumer trace
Use Instrumentation::Monitor to instrument and have a proper trace wrapper
@nvh0412
Copy link
Contributor Author

nvh0412 commented Nov 27, 2024

Hi team and @marcotc

Let's settle this. My gut feeling about this PR is that we do have some limitations in how tracing is integrated into the Karafka gem. I’ve switched to using Tracing.trace with a block rather than starting and finishing traces manually. However, the core of this PR is the distributed tracing with the message block.

This implementation fits perfectly with my system and has been working well with an internal patch. That said, let me know your thoughts on this integration. I’m fine if we can't settle on it, as I can always maintain the internal patch if needed.

@marcotc
Copy link
Member

marcotc commented Jan 13, 2025

Sorry for the delay, I'll circle back here next week to get this PR unblocked and hopefully merged.

@mensfeld
Copy link

@marcotc feel free to ping me as well prior to merging/deciding what to do with it. I will give it a second look.

Copy link
Member

@marcotc marcotc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work @nvh0412! And thank you so much for the insightful review @mensfeld!

docs/GettingStarted.md Outdated Show resolved Hide resolved
Comment on lines 32 to 33
span.set_tag(Ext::TAG_OFFSET, message.metadata.offset)
span.set_tag(Ext::TAG_TOPIC, message.topic)
Copy link
Member

@marcotc marcotc Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a few standard tags for any messaging system spans, as these enable richer product integration for messaging system observability:

Suggested change
span.set_tag(Ext::TAG_OFFSET, message.metadata.offset)
span.set_tag(Ext::TAG_TOPIC, message.topic)
span.set_tag(Ext::TAG_OFFSET, message.metadata.offset)
span.set_tag('messaging.destination', message.topic) # Please create a constant for this one, close to `Contrib::Ext::Messaging::TAG_SYSTEM`
span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, 'kafka')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same should be done for the span Datadog::Tracing.trace(Ext::SPAN_WORKER_PROCESS) do |span|.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's the last of it! Thank you so much, @nvh0412!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @marcotc, let me know if the standard tags I’ve added are correct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
editorial review Waiting for a review from the docs team integrations Involves tracing integrations tracing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Distributed tracing support through Kafka integration
5 participants