Skip to content

Commit

Permalink
Merge pull request #4240 from DataDog/ivoanjo/prof-11003-fix-profiler…
Browse files Browse the repository at this point in the history
…-otel-hash-lookup

[PROF-11003] Fix allocation profiling + otel tracing causing Ruby crash
  • Loading branch information
ivoanjo authored Jan 3, 2025
2 parents d26c7bf + 442690b commit b1400d5
Show file tree
Hide file tree
Showing 13 changed files with 307 additions and 28 deletions.
6 changes: 3 additions & 3 deletions benchmarks/profiler_gc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def create_profiler

# We take a dummy sample so that the context for the main thread is created, as otherwise the GC profiling methods do
# not create it (because we don't want to do memory allocations in the middle of GC)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, Thread.current)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, Thread.current, false)
end

def run_benchmark
Expand All @@ -29,7 +29,7 @@ def run_benchmark
x.report('profiler gc') do
Datadog::Profiling::Collectors::ThreadContext::Testing._native_on_gc_start(@collector)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_on_gc_finish(@collector)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample_after_gc(@collector, false)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample_after_gc(@collector, false, false)
end

x.save! "#{File.basename(__FILE__)}-results.json" unless VALIDATE_BENCHMARK_MODE
Expand All @@ -52,7 +52,7 @@ def run_benchmark
estimated_gc_per_minute.times do
Datadog::Profiling::Collectors::ThreadContext::Testing._native_on_gc_start(@collector)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_on_gc_finish(@collector)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample_after_gc(@collector, false)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample_after_gc(@collector, false, false)
end

@recorder.serialize
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/profiler_sample_gvl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def initialize
@target_thread = thread_with_very_deep_stack

# Sample once to trigger thread context creation for all threads (including @target_thread)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD, false)
end

def create_profiler
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/profiler_sample_loop_v2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def run_benchmark
)

x.report("stack collector #{ENV['CONFIG']}") do
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD, false)
end

x.save! "#{File.basename(__FILE__)}-results.json" unless VALIDATE_BENCHMARK_MODE
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/profiler_sample_serialize.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def run_benchmark
simulate_seconds = 60

(samples_per_second * simulate_seconds).times do
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD)
Datadog::Profiling::Collectors::ThreadContext::Testing._native_sample(@collector, PROFILER_OVERHEAD_STACK_THREAD, false)
end

@recorder.serialize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,16 @@ static void on_newobj_event(DDTRACE_UNUSED VALUE unused1, DDTRACE_UNUSED void *u
return;
}

// If Ruby is in the middle of raising an exception, we don't want to try to sample. This is because if we accidentally
// trigger an exception inside the profiler code, bad things will happen (specifically, Ruby will try to kill off the
// thread even though we may try to catch the exception).
//
// Note that "in the middle of raising an exception" means the exception itself has already been allocated.
// What's getting allocated now is probably the backtrace objects (@ivoanjo or at least that's what I've observed)
if (is_raised_flag_set(rb_thread_current())) {
return;
}

// Hot path: Dynamic sampling rate is usually enabled and the sampling decision is usually false
if (RB_LIKELY(state->dynamic_sampling_rate_enabled && !discrete_dynamic_sampler_should_sample(&state->allocation_sampler))) {
state->stats.allocation_skipped++;
Expand Down
119 changes: 108 additions & 11 deletions ext/datadog_profiling_native_extension/collectors_thread_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "private_vm_api_access.h"
#include "stack_recorder.h"
#include "time_helpers.h"
#include "unsafe_api_calls_check.h"

// Used to trigger sampling of threads, based on external "events", such as:
// * periodic timer for cpu-time and wall-time
Expand Down Expand Up @@ -203,10 +204,10 @@ static int hash_map_per_thread_context_mark(st_data_t key_thread, st_data_t _val
static int hash_map_per_thread_context_free_values(st_data_t _thread, st_data_t value_per_thread_context, st_data_t _argument);
static VALUE _native_new(VALUE klass);
static VALUE _native_initialize(int argc, VALUE *argv, DDTRACE_UNUSED VALUE _self);
static VALUE _native_sample(VALUE self, VALUE collector_instance, VALUE profiler_overhead_stack_thread);
static VALUE _native_sample(VALUE self, VALUE collector_instance, VALUE profiler_overhead_stack_thread, VALUE allow_exception);
static VALUE _native_on_gc_start(VALUE self, VALUE collector_instance);
static VALUE _native_on_gc_finish(VALUE self, VALUE collector_instance);
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state);
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state, VALUE allow_exception);
static void update_metrics_and_sample(
struct thread_context_collector_state *state,
VALUE thread_being_sampled,
Expand Down Expand Up @@ -290,6 +291,7 @@ static void otel_without_ddtrace_trace_identifiers_for(
);
static struct otel_span otel_span_from(VALUE otel_context, VALUE otel_current_span_key);
static uint64_t otel_span_id_to_uint(VALUE otel_span_id);
static VALUE safely_lookup_hash_without_going_into_ruby_code(VALUE hash, VALUE key);

void collectors_thread_context_init(VALUE profiling_module) {
VALUE collectors_module = rb_define_module_under(profiling_module, "Collectors");
Expand All @@ -310,11 +312,11 @@ void collectors_thread_context_init(VALUE profiling_module) {
rb_define_singleton_method(collectors_thread_context_class, "_native_initialize", _native_initialize, -1);
rb_define_singleton_method(collectors_thread_context_class, "_native_inspect", _native_inspect, 1);
rb_define_singleton_method(collectors_thread_context_class, "_native_reset_after_fork", _native_reset_after_fork, 1);
rb_define_singleton_method(testing_module, "_native_sample", _native_sample, 2);
rb_define_singleton_method(testing_module, "_native_sample", _native_sample, 3);
rb_define_singleton_method(testing_module, "_native_sample_allocation", _native_sample_allocation, 3);
rb_define_singleton_method(testing_module, "_native_on_gc_start", _native_on_gc_start, 1);
rb_define_singleton_method(testing_module, "_native_on_gc_finish", _native_on_gc_finish, 1);
rb_define_singleton_method(testing_module, "_native_sample_after_gc", _native_sample_after_gc, 2);
rb_define_singleton_method(testing_module, "_native_sample_after_gc", _native_sample_after_gc, 3);
rb_define_singleton_method(testing_module, "_native_thread_list", _native_thread_list, 0);
rb_define_singleton_method(testing_module, "_native_per_thread_context", _native_per_thread_context, 1);
rb_define_singleton_method(testing_module, "_native_stats", _native_stats, 1);
Expand Down Expand Up @@ -504,31 +506,49 @@ static VALUE _native_initialize(int argc, VALUE *argv, DDTRACE_UNUSED VALUE _sel

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_sample(DDTRACE_UNUSED VALUE _self, VALUE collector_instance, VALUE profiler_overhead_stack_thread) {
static VALUE _native_sample(DDTRACE_UNUSED VALUE _self, VALUE collector_instance, VALUE profiler_overhead_stack_thread, VALUE allow_exception) {
ENFORCE_BOOLEAN(allow_exception);

if (!is_thread_alive(profiler_overhead_stack_thread)) rb_raise(rb_eArgError, "Unexpected: profiler_overhead_stack_thread is not alive");

if (allow_exception == Qfalse) debug_enter_unsafe_context();

thread_context_collector_sample(collector_instance, monotonic_wall_time_now_ns(RAISE_ON_FAILURE), profiler_overhead_stack_thread);

if (allow_exception == Qfalse) debug_leave_unsafe_context();

return Qtrue;
}

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_on_gc_start(DDTRACE_UNUSED VALUE self, VALUE collector_instance) {
debug_enter_unsafe_context();

thread_context_collector_on_gc_start(collector_instance);

debug_leave_unsafe_context();

return Qtrue;
}

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_on_gc_finish(DDTRACE_UNUSED VALUE self, VALUE collector_instance) {
debug_enter_unsafe_context();

(void) !thread_context_collector_on_gc_finish(collector_instance);

debug_leave_unsafe_context();

return Qtrue;
}

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state) {
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state, VALUE allow_exception) {
ENFORCE_BOOLEAN(reset_monotonic_to_system_state);
ENFORCE_BOOLEAN(allow_exception);

struct thread_context_collector_state *state;
TypedData_Get_Struct(collector_instance, struct thread_context_collector_state, &thread_context_collector_typed_data, state);
Expand All @@ -537,7 +557,12 @@ static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_
state->time_converter_state = (monotonic_to_system_epoch_state) MONOTONIC_TO_SYSTEM_EPOCH_INITIALIZER;
}

if (allow_exception == Qfalse) debug_enter_unsafe_context();

thread_context_collector_sample_after_gc(collector_instance);

if (allow_exception == Qfalse) debug_leave_unsafe_context();

return Qtrue;
}

Expand Down Expand Up @@ -982,7 +1007,13 @@ static void trigger_sample_for_thread(
// It SHOULD NOT be used for other purposes.
static VALUE _native_thread_list(DDTRACE_UNUSED VALUE _self) {
VALUE result = rb_ary_new();

debug_enter_unsafe_context();

ddtrace_thread_list(result);

debug_leave_unsafe_context();

return result;
}

Expand Down Expand Up @@ -1501,7 +1532,12 @@ void thread_context_collector_sample_allocation(VALUE self_instance, unsigned in
// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_sample_allocation(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE sample_weight, VALUE new_object) {
debug_enter_unsafe_context();

thread_context_collector_sample_allocation(collector_instance, NUM2UINT(sample_weight), new_object);

debug_leave_unsafe_context();

return Qtrue;
}

Expand Down Expand Up @@ -1597,7 +1633,7 @@ static void ddtrace_otel_trace_identifiers_for(
// trace and span representing it. Each ddtrace trace is then connected to the previous otel span, forming a linked
// list. The local root span is going to be the trace/span we find at the end of this linked list.
while (otel_values != Qnil) {
VALUE otel_span = rb_hash_lookup(otel_values, otel_current_span_key);
VALUE otel_span = safely_lookup_hash_without_going_into_ruby_code(otel_values, otel_current_span_key);
if (otel_span == Qnil) break;
VALUE next_trace = rb_ivar_get(otel_span, at_datadog_trace_id);
if (next_trace == Qnil) break;
Expand Down Expand Up @@ -1640,7 +1676,12 @@ void thread_context_collector_sample_skipped_allocation_samples(VALUE self_insta
}

static VALUE _native_sample_skipped_allocation_samples(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE skipped_samples) {
debug_enter_unsafe_context();

thread_context_collector_sample_skipped_allocation_samples(collector_instance, NUM2UINT(skipped_samples));

debug_leave_unsafe_context();

return Qtrue;
}

Expand Down Expand Up @@ -1709,7 +1750,7 @@ static void otel_without_ddtrace_trace_identifiers_for(

VALUE root_span_type = rb_ivar_get(local_root_span.span, at_kind_id /* @kind */);
// We filter out spans that don't have `kind: :server`
if (root_span_type == Qnil || !RB_TYPE_P(root_span_type, T_SYMBOL) || SYM2ID(root_span_type) != server_id) return;
if (root_span_type == Qnil || !RB_TYPE_P(root_span_type, T_SYMBOL) || !RB_STATIC_SYM_P(root_span_type) || SYM2ID(root_span_type) != server_id) return;

VALUE trace_resource = rb_ivar_get(local_root_span.span, at_name_id /* @name */);
if (!RB_TYPE_P(trace_resource, T_STRING)) return;
Expand All @@ -1726,7 +1767,7 @@ static struct otel_span otel_span_from(VALUE otel_context, VALUE otel_current_sp
if (context_entries == Qnil || !RB_TYPE_P(context_entries, T_HASH)) return failed;

// If it exists, context_entries is expected to be a Hash[OpenTelemetry::Context::Key, OpenTelemetry::Trace::Span]
VALUE span = rb_hash_lookup(context_entries, otel_current_span_key);
VALUE span = safely_lookup_hash_without_going_into_ruby_code(context_entries, otel_current_span_key);
if (span == Qnil) return failed;

// If it exists, span_context is expected to be a OpenTelemetry::Trace::SpanContext (don't confuse it with OpenTelemetry::Context)
Expand Down Expand Up @@ -1979,31 +2020,53 @@ static uint64_t otel_span_id_to_uint(VALUE otel_span_id) {
static VALUE _native_on_gvl_waiting(DDTRACE_UNUSED VALUE self, VALUE thread) {
ENFORCE_THREAD(thread);

debug_enter_unsafe_context();

thread_context_collector_on_gvl_waiting(thread_from_thread_object(thread));

debug_leave_unsafe_context();

return Qnil;
}

static VALUE _native_gvl_waiting_at_for(DDTRACE_UNUSED VALUE self, VALUE thread) {
ENFORCE_THREAD(thread);

debug_enter_unsafe_context();

intptr_t gvl_waiting_at = gvl_profiling_state_thread_object_get(thread);

debug_leave_unsafe_context();

return LONG2NUM(gvl_waiting_at);
}

static VALUE _native_on_gvl_running(DDTRACE_UNUSED VALUE self, VALUE thread) {
ENFORCE_THREAD(thread);

return thread_context_collector_on_gvl_running(thread_from_thread_object(thread)) == ON_GVL_RUNNING_SAMPLE ? Qtrue : Qfalse;
debug_enter_unsafe_context();

VALUE result = thread_context_collector_on_gvl_running(thread_from_thread_object(thread)) == ON_GVL_RUNNING_SAMPLE ? Qtrue : Qfalse;

debug_leave_unsafe_context();

return result;
}

static VALUE _native_sample_after_gvl_running(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE thread) {
ENFORCE_THREAD(thread);

return thread_context_collector_sample_after_gvl_running(
debug_enter_unsafe_context();

VALUE result = thread_context_collector_sample_after_gvl_running(
collector_instance,
thread,
monotonic_wall_time_now_ns(RAISE_ON_FAILURE)
);

debug_leave_unsafe_context();

return result;
}

static VALUE _native_apply_delta_to_cpu_time_at_previous_sample_ns(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE thread, VALUE delta_ns) {
Expand All @@ -2030,3 +2093,37 @@ static uint64_t otel_span_id_to_uint(VALUE otel_span_id) {
DDTRACE_UNUSED long current_cpu_time_ns
) { return false; }
#endif // NO_GVL_INSTRUMENTATION

#define MAX_SAFE_LOOKUP_SIZE 16

typedef struct { VALUE lookup_key; VALUE result; } safe_lookup_hash_state;

static int safe_lookup_hash_iterate(VALUE key, VALUE value, VALUE state_ptr) {
safe_lookup_hash_state *state = (safe_lookup_hash_state *) state_ptr;

if (key == state->lookup_key) {
state->result = value;
return ST_STOP;
}

return ST_CONTINUE;
}

// This method exists because we need to look up a hash during sampling, but we don't want to invoke any
// Ruby code as a side effect. To be able to look up by hash, `rb_hash_lookup` calls `#hash` on the key,
// which we want to avoid.
// Thus, instead, we opt to just iterate through the hash and check if we can find what we're looking for.
//
// To avoid having too much overhead here we only iterate in hashes up to MAX_SAFE_LOOKUP_SIZE.
// Note that we don't even try to iterate if the hash is bigger -- this is to avoid flaky behavior where
// depending on the internal storage order of the hash we may or not find the key, and instead we always
// enforce the size.
static VALUE safely_lookup_hash_without_going_into_ruby_code(VALUE hash, VALUE key) {
if (!RB_TYPE_P(hash, T_HASH) || RHASH_SIZE(hash) > MAX_SAFE_LOOKUP_SIZE) return Qnil;

safe_lookup_hash_state state = {.lookup_key = key, .result = Qnil};

rb_hash_foreach(hash, safe_lookup_hash_iterate, (VALUE) &state);

return state.result;
}
Original file line number Diff line number Diff line change
Expand Up @@ -800,3 +800,6 @@ static inline int ddtrace_imemo_type(VALUE imemo) {
return current_thread;
}
#endif

// Is the VM smack in the middle of raising an exception?
bool is_raised_flag_set(VALUE thread) { return thread_struct_from_object(thread)->ec->raised_flag > 0; }
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,5 @@ const char *imemo_kind(VALUE imemo);

#define ENFORCE_THREAD(value) \
{ if (RB_UNLIKELY(!rb_typeddata_is_kind_of(value, RTYPEDDATA_TYPE(rb_thread_current())))) raise_unexpected_type(value, ADD_QUOTES(value), "Thread", __FILE__, __LINE__, __func__); }

bool is_raised_flag_set(VALUE thread);
2 changes: 2 additions & 0 deletions ext/datadog_profiling_native_extension/profiling.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "ruby_helpers.h"
#include "setup_signal_handler.h"
#include "time_helpers.h"
#include "unsafe_api_calls_check.h"

// Each class/module here is implemented in their separate file
void collectors_cpu_and_wall_time_worker_init(VALUE profiling_module);
Expand Down Expand Up @@ -56,6 +57,7 @@ void DDTRACE_EXPORT Init_datadog_profiling_native_extension(void) {
collectors_thread_context_init(profiling_module);
http_transport_init(profiling_module);
stack_recorder_init(profiling_module);
unsafe_api_calls_check_init();

// Hosts methods used for testing the native code using RSpec
VALUE testing_module = rb_define_module_under(native_extension_module, "Testing");
Expand Down
Loading

0 comments on commit b1400d5

Please sign in to comment.