From 1b2e3cebd44011e91a7fc11adcd1a8d84a8ec575 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 18 Jan 2024 20:13:31 +0800 Subject: [PATCH] feat(streaming): create a new span for every yielded message (#14645) Signed-off-by: Bugen Zhao --- Cargo.lock | 13 +++-- src/stream/src/executor/dispatch.rs | 26 ++++++---- src/stream/src/executor/wrapper/trace.rs | 64 ++++++++++++------------ 3 files changed, 55 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 663875ba23db9..fac9e97c5f8b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12240,11 +12240,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "cfg-if", "log", "pin-project-lite", "tracing-attributes", @@ -12253,9 +12252,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", @@ -12264,9 +12263,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", "valuable", diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index c958d6ab09556..6df4f1c636e08 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -386,16 +386,22 @@ impl StreamConsumer for DispatchExecutor { #[for_await] for msg in input { let msg: Message = msg?; - let (barrier, span) = match msg { - Message::Chunk(_) => (None, "dispatch_chunk"), - Message::Barrier(ref barrier) => (Some(barrier.clone()), "dispatch_barrier"), - Message::Watermark(_) => (None, "dispatch_watermark"), - }; - - let tracing_span = if let Some(_barrier) = &barrier { - tracing::info_span!("dispatch_barrier") - } else { - tracing::Span::none() + let (barrier, span, tracing_span) = match msg { + Message::Chunk(_) => ( + None, + "dispatch_chunk", + tracing::info_span!("dispatch_chunk"), + ), + Message::Barrier(ref barrier) => ( + Some(barrier.clone()), + "dispatch_barrier", + tracing::info_span!("dispatch_barrier"), + ), + Message::Watermark(_) => ( + None, + "dispatch_watermark", + tracing::info_span!("dispatch_watermark"), + ), }; self.inner diff --git a/src/stream/src/executor/wrapper/trace.rs b/src/stream/src/executor/wrapper/trace.rs index b480d0934a96f..df594194966be 100644 --- a/src/stream/src/executor/wrapper/trace.rs +++ b/src/stream/src/executor/wrapper/trace.rs @@ -40,7 +40,8 @@ pub async fn trace( tracing::info_span!( "executor", "otel.name" = span_name, - "actor_id" = actor_ctx.id + "message" = tracing::field::Empty, // record later + "chunk_size" = tracing::field::Empty, // record later ) }; let mut span = new_span(); @@ -48,57 +49,58 @@ pub async fn trace( pin_mut!(input); while let Some(message) = input.next().instrument(span.clone()).await.transpose()? { - // Trace the message in the span's scope. - span.in_scope(|| match &message { + // Emit a debug event and record the message type. + match &message { Message::Chunk(chunk) => { - if chunk.cardinality() > 0 { - if enable_executor_row_count { - actor_ctx - .streaming_metrics - .executor_row_count - .with_label_values(&[&actor_id_str, &fragment_id_str, &info.identity]) - .inc_by(chunk.cardinality() as u64); - } - tracing::debug!( - target: "events::stream::message::chunk", - cardinality = chunk.cardinality(), - capacity = chunk.capacity(), - "\n{}\n", chunk.to_pretty_with_schema(&info.schema), - ); + if enable_executor_row_count { + actor_ctx + .streaming_metrics + .executor_row_count + .with_label_values(&[&actor_id_str, &fragment_id_str, &info.identity]) + .inc_by(chunk.cardinality() as u64); } + tracing::debug!( + target: "events::stream::message::chunk", + parent: &span, + cardinality = chunk.cardinality(), + capacity = chunk.capacity(), + "\n{}\n", chunk.to_pretty_with_schema(&info.schema), + ); + span.record("message", "chunk"); + span.record("chunk_size", chunk.cardinality()); } Message::Watermark(watermark) => { tracing::debug!( target: "events::stream::message::watermark", + parent: &span, value = ?watermark.val, col_idx = watermark.col_idx, ); + span.record("message", "watermark"); } Message::Barrier(barrier) => { tracing::debug!( target: "events::stream::message::barrier", + parent: &span, prev_epoch = barrier.epoch.prev, curr_epoch = barrier.epoch.curr, kind = ?barrier.kind, ); + span.record("message", "barrier"); } - }); + }; - // Yield the message and update the span. - match &message { - Message::Chunk(_) | Message::Watermark(_) => yield message, - Message::Barrier(_) => { - // Drop the span as the inner executor has finished processing the barrier (then all - // data from the previous epoch). - let _ = std::mem::replace(&mut span, Span::none()); + // Drop the span as the inner executor has yielded a new message. + // + // This is essentially similar to `.instrument(new_span())`, but it allows us to + // emit the debug event and record the message type. + let _ = std::mem::replace(&mut span, Span::none()); - yield message; + yield message; - // Create a new span after we're called again. Now we're in a new epoch and the - // parent of the span is updated. - span = new_span(); - } - } + // Create a new span after we're called again. The parent span may also have been + // updated. + span = new_span(); } }