diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 959c39f9724d5..94576d6a4c459 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -235,12 +235,12 @@ impl FsSourceExecutor { .collect_vec(); if !incompleted.is_empty() { - tracing::debug!(actor_id = self.actor_ctx.id, incompleted = ?incompleted, "take snapshot"); + tracing::debug!(incompleted = ?incompleted, "take snapshot"); core.split_state_store.set_states(incompleted).await? } if !completed.is_empty() { - tracing::debug!(actor_id = self.actor_ctx.id, completed = ?completed, "take snapshot"); + tracing::debug!(completed = ?completed, "take snapshot"); core.split_state_store.set_all_complete(completed).await? } // commit anyway, even if no message saved @@ -335,7 +335,7 @@ impl FsSourceExecutor { // init in-memory split states with persisted state if any self.stream_source_core.init_split_state(boot_state.clone()); let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); - tracing::debug!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state"); + tracing::debug!(state = ?recover_state, "start with state"); let source_chunk_reader = self .build_stream_source_reader(&source_desc, recover_state) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index be9372efa70fc..36358bdcd372e 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -315,7 +315,7 @@ impl SourceExecutor { .collect_vec(); if !cache.is_empty() { - tracing::debug!(actor_id = self.actor_ctx.id, state = ?cache, "take snapshot"); + tracing::debug!(state = ?cache, "take snapshot"); core.split_state_store.set_states(cache).await?; } @@ -406,7 +406,7 @@ impl SourceExecutor { self.stream_source_core = Some(core); let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); - tracing::debug!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state"); + tracing::debug!(state = ?recover_state, "start with state"); let source_chunk_reader = self .build_stream_source_reader(&source_desc, recover_state) .instrument_await("source_build_reader") diff --git a/src/stream/src/executor/wrapper.rs b/src/stream/src/executor/wrapper.rs index dddd94da5ab73..74923928eaf6d 100644 --- a/src/stream/src/executor/wrapper.rs +++ b/src/stream/src/executor/wrapper.rs @@ -66,7 +66,7 @@ impl WrapperExecutor { // -- Shared wrappers -- // Await tree - let stream = trace::instrument_await_tree(info.clone(), actor_ctx.id, stream); + let stream = trace::instrument_await_tree(info.clone(), stream); // Schema check let stream = schema_check::schema_check(info.clone(), stream); diff --git a/src/stream/src/executor/wrapper/trace.rs b/src/stream/src/executor/wrapper/trace.rs index df594194966be..c95809f534728 100644 --- a/src/stream/src/executor/wrapper/trace.rs +++ b/src/stream/src/executor/wrapper/trace.rs @@ -21,7 +21,6 @@ use tracing::{Instrument, Span}; use crate::executor::error::StreamExecutorError; use crate::executor::{ActorContextRef, ExecutorInfo, Message, MessageStream}; -use crate::task::ActorId; /// Streams wrapped by `trace` will be traced with `tracing` spans and reported to `opentelemetry`. #[try_stream(ok = Message, error = StreamExecutorError)] @@ -34,12 +33,10 @@ pub async fn trace( let actor_id_str = actor_ctx.id.to_string(); let fragment_id_str = actor_ctx.fragment_id.to_string(); - let span_name = pretty_identity(&info.identity, actor_ctx.id); - let new_span = || { tracing::info_span!( "executor", - "otel.name" = span_name, + "otel.name" = info.identity, "message" = tracing::field::Empty, // record later "chunk_size" = tracing::field::Empty, // record later ) @@ -104,21 +101,13 @@ pub async fn trace( } } -fn pretty_identity(identity: &str, actor_id: ActorId) -> String { - format!("{} (actor {})", identity, actor_id) -} - /// Streams wrapped by `instrument_await_tree` will be able to print the spans of the /// executors in the stack trace through `await-tree`. #[try_stream(ok = Message, error = StreamExecutorError)] -pub async fn instrument_await_tree( - info: Arc, - actor_id: ActorId, - input: impl MessageStream, -) { +pub async fn instrument_await_tree(info: Arc, input: impl MessageStream) { pin_mut!(input); - let span: await_tree::Span = pretty_identity(&info.identity, actor_id).into(); + let span: await_tree::Span = info.identity.clone().into(); while let Some(message) = input .next()