Skip to content

Commit

Permalink
trace each message
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 18, 2024
1 parent 7b1c6cc commit e7c1d5e
Showing 1 changed file with 33 additions and 31 deletions.
64 changes: 33 additions & 31 deletions src/stream/src/executor/wrapper/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,65 +40,67 @@ pub async fn trace(
tracing::info_span!(
"executor",
"otel.name" = span_name,
"actor_id" = actor_ctx.id
"message" = tracing::field::Empty, // record later
"chunk_size" = tracing::field::Empty, // record later
)
};
let mut span = new_span();

pin_mut!(input);

while let Some(message) = input.next().instrument(span.clone()).await.transpose()? {
// Trace the message in the span's scope.
span.in_scope(|| match &message {
// Emit a debug event and record the message type.
match &message {
Message::Chunk(chunk) => {
if chunk.cardinality() > 0 {
if enable_executor_row_count {
actor_ctx
.streaming_metrics
.executor_row_count
.with_label_values(&[&actor_id_str, &fragment_id_str, &info.identity])
.inc_by(chunk.cardinality() as u64);
}
tracing::debug!(
target: "events::stream::message::chunk",
cardinality = chunk.cardinality(),
capacity = chunk.capacity(),
"\n{}\n", chunk.to_pretty_with_schema(&info.schema),
);
if enable_executor_row_count {
actor_ctx
.streaming_metrics
.executor_row_count
.with_label_values(&[&actor_id_str, &fragment_id_str, &info.identity])
.inc_by(chunk.cardinality() as u64);
}
tracing::debug!(
target: "events::stream::message::chunk",
parent: &span,
cardinality = chunk.cardinality(),
capacity = chunk.capacity(),
"\n{}\n", chunk.to_pretty_with_schema(&info.schema),
);
span.record("message", "chunk");
span.record("chunk_size", chunk.cardinality());
}
Message::Watermark(watermark) => {
tracing::debug!(
target: "events::stream::message::watermark",
parent: &span,
value = ?watermark.val,
col_idx = watermark.col_idx,
);
span.record("message", "watermark");
}
Message::Barrier(barrier) => {
tracing::debug!(
target: "events::stream::message::barrier",
parent: &span,
prev_epoch = barrier.epoch.prev,
curr_epoch = barrier.epoch.curr,
kind = ?barrier.kind,
);
span.record("message", "barrier");
}
});
};

// Yield the message and update the span.
match &message {
Message::Chunk(_) | Message::Watermark(_) => yield message,
Message::Barrier(_) => {
// Drop the span as the inner executor has finished processing the barrier (then all
// data from the previous epoch).
let _ = std::mem::replace(&mut span, Span::none());
// Drop the span as the inner executor has yielded a new message.
//
// This is essentially similar to `.instrument(new_span())`, but it allows us to
// emit the debug event and record the message type.
let _ = std::mem::replace(&mut span, Span::none());

yield message;
yield message;

// Create a new span after we're called again. Now we're in a new epoch and the
// parent of the span is updated.
span = new_span();
}
}
// Create a new span after we're called again. The parent span may also have been
// updated.
span = new_span();
}
}

Expand Down

0 comments on commit e7c1d5e

Please sign in to comment.