From 2373108eaf3890c46c87e7a0620ad37a96f48525 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 22 Oct 2024 21:11:53 +0300 Subject: [PATCH] [BugFix] Prevent exporting duplicate OpenTelemetry spans (#9017) Signed-off-by: charlifu --- tests/tracing/test_tracing.py | 30 ++++++++++++++++++++++++++---- vllm/engine/llm_engine.py | 13 ++++++++++--- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/tests/tracing/test_tracing.py b/tests/tracing/test_tracing.py index 64ed8e26f38ed..fe5fc979c66a3 100644 --- a/tests/tracing/test_tracing.py +++ b/tests/tracing/test_tracing.py @@ -87,8 +87,19 @@ def test_traces(trace_service): f"The fake trace service didn't receive a trace within " f"the {timeout} seconds timeout") - attributes = decode_attributes(trace_service.request.resource_spans[0]. - scope_spans[0].spans[0].attributes) + request = trace_service.request + assert len(request.resource_spans) == 1, ( + f"Expected 1 resource span, " + f"but got {len(request.resource_spans)}") + assert len(request.resource_spans[0].scope_spans) == 1, ( + f"Expected 1 scope span, " + f"but got {len(request.resource_spans[0].scope_spans)}") + assert len(request.resource_spans[0].scope_spans[0].spans) == 1, ( + f"Expected 1 span, " + f"but got {len(request.resource_spans[0].scope_spans[0].spans)}") + + attributes = decode_attributes( + request.resource_spans[0].scope_spans[0].spans[0].attributes) assert attributes.get(SpanAttributes.LLM_RESPONSE_MODEL) == model assert attributes.get( SpanAttributes.LLM_REQUEST_ID) == outputs[0].request_id @@ -142,8 +153,19 @@ def test_traces_with_detailed_steps(trace_service): f"The fake trace service didn't receive a trace within " f"the {timeout} seconds timeout") - attributes = decode_attributes(trace_service.request.resource_spans[0]. - scope_spans[0].spans[0].attributes) + request = trace_service.request + assert len(request.resource_spans) == 1, ( + f"Expected 1 resource span, " + f"but got {len(request.resource_spans)}") + assert len(request.resource_spans[0].scope_spans) == 1, ( + f"Expected 1 scope span, " + f"but got {len(request.resource_spans[0].scope_spans)}") + assert len(request.resource_spans[0].scope_spans[0].spans) == 1, ( + f"Expected 1 span, " + f"but got {len(request.resource_spans[0].scope_spans[0].spans)}") + + attributes = decode_attributes( + request.resource_spans[0].scope_spans[0].spans[0].attributes) assert attributes.get(SpanAttributes.LLM_RESPONSE_MODEL) == model assert attributes.get( SpanAttributes.LLM_REQUEST_ID) == outputs[0].request_id diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 25c4e76d9b159..3a29e6a9ae094 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -1245,7 +1245,7 @@ def _process_model_outputs(self, skip) # Tracing - self.do_tracing(scheduler_outputs) + self.do_tracing(scheduler_outputs, finished_before) return None @@ -1840,11 +1840,18 @@ def stop_profile(self) -> None: def is_tracing_enabled(self) -> bool: return self.tracer is not None - def do_tracing(self, scheduler_outputs: SchedulerOutputs) -> None: + def do_tracing(self, + scheduler_outputs: SchedulerOutputs, + finished_before: Optional[List[int]] = None) -> None: if self.tracer is None: return - for scheduled_seq_group in scheduler_outputs.scheduled_seq_groups: + for idx, scheduled_seq_group in enumerate( + scheduler_outputs.scheduled_seq_groups): + # Skip double tracing when using async output proc + if finished_before and idx in finished_before: + continue + seq_group = scheduled_seq_group.seq_group if seq_group.is_finished(): self.create_trace_span(seq_group)