Skip to content

Commit

Permalink
chore(stream): remove actor_id from log and executor span (#15517)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Mar 12, 2024
1 parent 72d37ef commit 2e2a865
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 20 deletions.
6 changes: 3 additions & 3 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,12 @@ impl<S: StateStore> FsSourceExecutor<S> {
.collect_vec();

if !incompleted.is_empty() {
tracing::debug!(actor_id = self.actor_ctx.id, incompleted = ?incompleted, "take snapshot");
tracing::debug!(incompleted = ?incompleted, "take snapshot");
core.split_state_store.set_states(incompleted).await?
}

if !completed.is_empty() {
tracing::debug!(actor_id = self.actor_ctx.id, completed = ?completed, "take snapshot");
tracing::debug!(completed = ?completed, "take snapshot");
core.split_state_store.set_all_complete(completed).await?
}
// commit anyway, even if no message saved
Expand Down Expand Up @@ -335,7 +335,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
// init in-memory split states with persisted state if any
self.stream_source_core.init_split_state(boot_state.clone());
let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
tracing::debug!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state");
tracing::debug!(state = ?recover_state, "start with state");

let source_chunk_reader = self
.build_stream_source_reader(&source_desc, recover_state)
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl<S: StateStore> SourceExecutor<S> {
.collect_vec();

if !cache.is_empty() {
tracing::debug!(actor_id = self.actor_ctx.id, state = ?cache, "take snapshot");
tracing::debug!(state = ?cache, "take snapshot");
core.split_state_store.set_states(cache).await?;
}

Expand Down Expand Up @@ -406,7 +406,7 @@ impl<S: StateStore> SourceExecutor<S> {
self.stream_source_core = Some(core);

let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
tracing::debug!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state");
tracing::debug!(state = ?recover_state, "start with state");
let source_chunk_reader = self
.build_stream_source_reader(&source_desc, recover_state)
.instrument_await("source_build_reader")
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl WrapperExecutor {
// -- Shared wrappers --

// Await tree
let stream = trace::instrument_await_tree(info.clone(), actor_ctx.id, stream);
let stream = trace::instrument_await_tree(info.clone(), stream);

// Schema check
let stream = schema_check::schema_check(info.clone(), stream);
Expand Down
17 changes: 3 additions & 14 deletions src/stream/src/executor/wrapper/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use tracing::{Instrument, Span};

use crate::executor::error::StreamExecutorError;
use crate::executor::{ActorContextRef, ExecutorInfo, Message, MessageStream};
use crate::task::ActorId;

/// Streams wrapped by `trace` will be traced with `tracing` spans and reported to `opentelemetry`.
#[try_stream(ok = Message, error = StreamExecutorError)]
Expand All @@ -34,12 +33,10 @@ pub async fn trace(
let actor_id_str = actor_ctx.id.to_string();
let fragment_id_str = actor_ctx.fragment_id.to_string();

let span_name = pretty_identity(&info.identity, actor_ctx.id);

let new_span = || {
tracing::info_span!(
"executor",
"otel.name" = span_name,
"otel.name" = info.identity,
"message" = tracing::field::Empty, // record later
"chunk_size" = tracing::field::Empty, // record later
)
Expand Down Expand Up @@ -104,21 +101,13 @@ pub async fn trace(
}
}

fn pretty_identity(identity: &str, actor_id: ActorId) -> String {
format!("{} (actor {})", identity, actor_id)
}

/// Streams wrapped by `instrument_await_tree` will be able to print the spans of the
/// executors in the stack trace through `await-tree`.
#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn instrument_await_tree(
info: Arc<ExecutorInfo>,
actor_id: ActorId,
input: impl MessageStream,
) {
pub async fn instrument_await_tree(info: Arc<ExecutorInfo>, input: impl MessageStream) {
pin_mut!(input);

let span: await_tree::Span = pretty_identity(&info.identity, actor_id).into();
let span: await_tree::Span = info.identity.clone().into();

while let Some(message) = input
.next()
Expand Down

0 comments on commit 2e2a865

Please sign in to comment.