Skip to content

Commit

Permalink
Revert "feat(streaming): create a new span for every yielded message (#…
Browse files Browse the repository at this point in the history
…14645)"

This reverts commit 2b7f295.
  • Loading branch information
BugenZhao committed Jan 29, 2024
1 parent d1ff9fb commit 902d79e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 55 deletions.
13 changes: 7 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 10 additions & 16 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,22 +386,16 @@ impl StreamConsumer for DispatchExecutor {
#[for_await]
for msg in input {
let msg: Message = msg?;
let (barrier, span, tracing_span) = match msg {
Message::Chunk(_) => (
None,
"dispatch_chunk",
tracing::info_span!("dispatch_chunk"),
),
Message::Barrier(ref barrier) => (
Some(barrier.clone()),
"dispatch_barrier",
tracing::info_span!("dispatch_barrier"),
),
Message::Watermark(_) => (
None,
"dispatch_watermark",
tracing::info_span!("dispatch_watermark"),
),
let (barrier, span) = match msg {
Message::Chunk(_) => (None, "dispatch_chunk"),
Message::Barrier(ref barrier) => (Some(barrier.clone()), "dispatch_barrier"),
Message::Watermark(_) => (None, "dispatch_watermark"),
};

let tracing_span = if let Some(_barrier) = &barrier {
tracing::info_span!("dispatch_barrier")
} else {
tracing::Span::none()
};

self.inner
Expand Down
64 changes: 31 additions & 33 deletions src/stream/src/executor/wrapper/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,67 +40,65 @@ pub async fn trace(
tracing::info_span!(
"executor",
"otel.name" = span_name,
"message" = tracing::field::Empty, // record later
"chunk_size" = tracing::field::Empty, // record later
"actor_id" = actor_ctx.id
)
};
let mut span = new_span();

pin_mut!(input);

while let Some(message) = input.next().instrument(span.clone()).await.transpose()? {
// Emit a debug event and record the message type.
match &message {
// Trace the message in the span's scope.
span.in_scope(|| match &message {
Message::Chunk(chunk) => {
if enable_executor_row_count {
actor_ctx
.streaming_metrics
.executor_row_count
.with_label_values(&[&actor_id_str, &fragment_id_str, &info.identity])
.inc_by(chunk.cardinality() as u64);
if chunk.cardinality() > 0 {
if enable_executor_row_count {
actor_ctx
.streaming_metrics
.executor_row_count
.with_label_values(&[&actor_id_str, &fragment_id_str, &info.identity])
.inc_by(chunk.cardinality() as u64);
}
tracing::debug!(
target: "events::stream::message::chunk",
cardinality = chunk.cardinality(),
capacity = chunk.capacity(),
"\n{}\n", chunk.to_pretty_with_schema(&info.schema),
);
}
tracing::debug!(
target: "events::stream::message::chunk",
parent: &span,
cardinality = chunk.cardinality(),
capacity = chunk.capacity(),
"\n{}\n", chunk.to_pretty_with_schema(&info.schema),
);
span.record("message", "chunk");
span.record("chunk_size", chunk.cardinality());
}
Message::Watermark(watermark) => {
tracing::debug!(
target: "events::stream::message::watermark",
parent: &span,
value = ?watermark.val,
col_idx = watermark.col_idx,
);
span.record("message", "watermark");
}
Message::Barrier(barrier) => {
tracing::debug!(
target: "events::stream::message::barrier",
parent: &span,
prev_epoch = barrier.epoch.prev,
curr_epoch = barrier.epoch.curr,
kind = ?barrier.kind,
);
span.record("message", "barrier");
}
};
});

// Drop the span as the inner executor has yielded a new message.
//
// This is essentially similar to `.instrument(new_span())`, but it allows us to
// emit the debug event and record the message type.
let _ = std::mem::replace(&mut span, Span::none());
// Yield the message and update the span.
match &message {
Message::Chunk(_) | Message::Watermark(_) => yield message,
Message::Barrier(_) => {
// Drop the span as the inner executor has finished processing the barrier (then all
// data from the previous epoch).
let _ = std::mem::replace(&mut span, Span::none());

yield message;
yield message;

// Create a new span after we're called again. The parent span may also have been
// updated.
span = new_span();
// Create a new span after we're called again. Now we're in a new epoch and the
// parent of the span is updated.
span = new_span();
}
}
}
}

Expand Down

0 comments on commit 902d79e

Please sign in to comment.