From 902d79e7475cf1c49f7a89a283ea6a0459b55c04 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 29 Jan 2024 12:28:08 +0800 Subject: [PATCH] Revert "feat(streaming): create a new span for every yielded message (#14645)" This reverts commit 2b7f295b6c917deb5b9516f6f2651844a24857ab. --- Cargo.lock | 13 ++--- src/stream/src/executor/dispatch.rs | 26 ++++------ src/stream/src/executor/wrapper/trace.rs | 64 ++++++++++++------------ 3 files changed, 48 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ebfc5c51617a..4a2c36567a5c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12340,10 +12340,11 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ + "cfg-if", "log", "pin-project-lite", "tracing-attributes", @@ -12352,9 +12353,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", @@ -12363,9 +12364,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" dependencies = [ "once_cell", "valuable", diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index a21012c5f1664..b8bebd1966334 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -386,22 +386,16 @@ impl StreamConsumer for DispatchExecutor { #[for_await] for msg in input { let msg: Message = msg?; - let (barrier, span, tracing_span) = match msg { - Message::Chunk(_) => ( - None, - "dispatch_chunk", - tracing::info_span!("dispatch_chunk"), - ), - Message::Barrier(ref barrier) => ( - Some(barrier.clone()), - "dispatch_barrier", - tracing::info_span!("dispatch_barrier"), - ), - Message::Watermark(_) => ( - None, - "dispatch_watermark", - tracing::info_span!("dispatch_watermark"), - ), + let (barrier, span) = match msg { + Message::Chunk(_) => (None, "dispatch_chunk"), + Message::Barrier(ref barrier) => (Some(barrier.clone()), "dispatch_barrier"), + Message::Watermark(_) => (None, "dispatch_watermark"), + }; + + let tracing_span = if let Some(_barrier) = &barrier { + tracing::info_span!("dispatch_barrier") + } else { + tracing::Span::none() }; self.inner diff --git a/src/stream/src/executor/wrapper/trace.rs b/src/stream/src/executor/wrapper/trace.rs index df594194966be..b480d0934a96f 100644 --- a/src/stream/src/executor/wrapper/trace.rs +++ b/src/stream/src/executor/wrapper/trace.rs @@ -40,8 +40,7 @@ pub async fn trace( tracing::info_span!( "executor", "otel.name" = span_name, - "message" = tracing::field::Empty, // record later - "chunk_size" = tracing::field::Empty, // record later + "actor_id" = actor_ctx.id ) }; let mut span = new_span(); @@ -49,58 +48,57 @@ pub async fn trace( pin_mut!(input); while let Some(message) = input.next().instrument(span.clone()).await.transpose()? { - // Emit a debug event and record the message type. - match &message { + // Trace the message in the span's scope. + span.in_scope(|| match &message { Message::Chunk(chunk) => { - if enable_executor_row_count { - actor_ctx - .streaming_metrics - .executor_row_count - .with_label_values(&[&actor_id_str, &fragment_id_str, &info.identity]) - .inc_by(chunk.cardinality() as u64); + if chunk.cardinality() > 0 { + if enable_executor_row_count { + actor_ctx + .streaming_metrics + .executor_row_count + .with_label_values(&[&actor_id_str, &fragment_id_str, &info.identity]) + .inc_by(chunk.cardinality() as u64); + } + tracing::debug!( + target: "events::stream::message::chunk", + cardinality = chunk.cardinality(), + capacity = chunk.capacity(), + "\n{}\n", chunk.to_pretty_with_schema(&info.schema), + ); } - tracing::debug!( - target: "events::stream::message::chunk", - parent: &span, - cardinality = chunk.cardinality(), - capacity = chunk.capacity(), - "\n{}\n", chunk.to_pretty_with_schema(&info.schema), - ); - span.record("message", "chunk"); - span.record("chunk_size", chunk.cardinality()); } Message::Watermark(watermark) => { tracing::debug!( target: "events::stream::message::watermark", - parent: &span, value = ?watermark.val, col_idx = watermark.col_idx, ); - span.record("message", "watermark"); } Message::Barrier(barrier) => { tracing::debug!( target: "events::stream::message::barrier", - parent: &span, prev_epoch = barrier.epoch.prev, curr_epoch = barrier.epoch.curr, kind = ?barrier.kind, ); - span.record("message", "barrier"); } - }; + }); - // Drop the span as the inner executor has yielded a new message. - // - // This is essentially similar to `.instrument(new_span())`, but it allows us to - // emit the debug event and record the message type. - let _ = std::mem::replace(&mut span, Span::none()); + // Yield the message and update the span. + match &message { + Message::Chunk(_) | Message::Watermark(_) => yield message, + Message::Barrier(_) => { + // Drop the span as the inner executor has finished processing the barrier (then all + // data from the previous epoch). + let _ = std::mem::replace(&mut span, Span::none()); - yield message; + yield message; - // Create a new span after we're called again. The parent span may also have been - // updated. - span = new_span(); + // Create a new span after we're called again. Now we're in a new epoch and the + // parent of the span is updated. + span = new_span(); + } + } } }