From df87a4765795c4de16a0dd9d05f702cdecbfb13f Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 7 Mar 2024 17:56:18 +0800 Subject: [PATCH 1/3] chore(stream): remove actor_id from log --- src/stream/src/executor/source/fs_source_executor.rs | 6 +++--- src/stream/src/executor/source/source_executor.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index d896a1224f6bf..cd93448205bd6 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -236,12 +236,12 @@ impl FsSourceExecutor { .collect_vec(); if !incompleted.is_empty() { - tracing::debug!(actor_id = self.actor_ctx.id, incompleted = ?incompleted, "take snapshot"); + tracing::debug!(incompleted = ?incompleted, "take snapshot"); core.split_state_store.set_states(incompleted).await? } if !completed.is_empty() { - tracing::debug!(actor_id = self.actor_ctx.id, completed = ?completed, "take snapshot"); + tracing::debug!(completed = ?completed, "take snapshot"); core.split_state_store.set_all_complete(completed).await? } // commit anyway, even if no message saved @@ -336,7 +336,7 @@ impl FsSourceExecutor { // init in-memory split states with persisted state if any self.stream_source_core.init_split_state(boot_state.clone()); let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); - tracing::debug!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state"); + tracing::debug!(state = ?recover_state, "start with state"); let source_chunk_reader = self .build_stream_source_reader(&source_desc, recover_state) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 5a62319daa7e3..d6f020b7e45f8 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -316,7 +316,7 @@ impl SourceExecutor { .collect_vec(); if !cache.is_empty() { - tracing::debug!(actor_id = self.actor_ctx.id, state = ?cache, "take snapshot"); + tracing::debug!(state = ?cache, "take snapshot"); core.split_state_store.set_states(cache).await?; } @@ -407,7 +407,7 @@ impl SourceExecutor { self.stream_source_core = Some(core); let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); - tracing::debug!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state"); + tracing::debug!(state = ?recover_state, "start with state"); let source_chunk_reader = self .build_stream_source_reader(&source_desc, recover_state) .instrument_await("source_build_reader") From 36f3d798dc43b1c39c61721dfd754f7d359555a3 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 8 Mar 2024 11:28:24 +0800 Subject: [PATCH 2/3] remove actor id from executor span --- src/stream/src/executor/wrapper/trace.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/stream/src/executor/wrapper/trace.rs b/src/stream/src/executor/wrapper/trace.rs index df594194966be..88c0b652c653a 100644 --- a/src/stream/src/executor/wrapper/trace.rs +++ b/src/stream/src/executor/wrapper/trace.rs @@ -34,12 +34,10 @@ pub async fn trace( let actor_id_str = actor_ctx.id.to_string(); let fragment_id_str = actor_ctx.fragment_id.to_string(); - let span_name = pretty_identity(&info.identity, actor_ctx.id); - let new_span = || { tracing::info_span!( "executor", - "otel.name" = span_name, + "otel.name" = info.identity, "message" = tracing::field::Empty, // record later "chunk_size" = tracing::field::Empty, // record later ) @@ -104,10 +102,6 @@ pub async fn trace( } } -fn pretty_identity(identity: &str, actor_id: ActorId) -> String { - format!("{} (actor {})", identity, actor_id) -} - /// Streams wrapped by `instrument_await_tree` will be able to print the spans of the /// executors in the stack trace through `await-tree`. #[try_stream(ok = Message, error = StreamExecutorError)] @@ -118,7 +112,7 @@ pub async fn instrument_await_tree( ) { pin_mut!(input); - let span: await_tree::Span = pretty_identity(&info.identity, actor_id).into(); + let span: await_tree::Span = info.identity.into(); while let Some(message) = input .next() From c1f5386807d8abb13739f02d30c3b8f42a776085 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 12 Mar 2024 10:11:05 +0800 Subject: [PATCH 3/3] fix --- src/stream/src/executor/wrapper.rs | 2 +- src/stream/src/executor/wrapper/trace.rs | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/stream/src/executor/wrapper.rs b/src/stream/src/executor/wrapper.rs index dddd94da5ab73..74923928eaf6d 100644 --- a/src/stream/src/executor/wrapper.rs +++ b/src/stream/src/executor/wrapper.rs @@ -66,7 +66,7 @@ impl WrapperExecutor { // -- Shared wrappers -- // Await tree - let stream = trace::instrument_await_tree(info.clone(), actor_ctx.id, stream); + let stream = trace::instrument_await_tree(info.clone(), stream); // Schema check let stream = schema_check::schema_check(info.clone(), stream); diff --git a/src/stream/src/executor/wrapper/trace.rs b/src/stream/src/executor/wrapper/trace.rs index 88c0b652c653a..c95809f534728 100644 --- a/src/stream/src/executor/wrapper/trace.rs +++ b/src/stream/src/executor/wrapper/trace.rs @@ -21,7 +21,6 @@ use tracing::{Instrument, Span}; use crate::executor::error::StreamExecutorError; use crate::executor::{ActorContextRef, ExecutorInfo, Message, MessageStream}; -use crate::task::ActorId; /// Streams wrapped by `trace` will be traced with `tracing` spans and reported to `opentelemetry`. #[try_stream(ok = Message, error = StreamExecutorError)] @@ -105,14 +104,10 @@ pub async fn trace( /// Streams wrapped by `instrument_await_tree` will be able to print the spans of the /// executors in the stack trace through `await-tree`. #[try_stream(ok = Message, error = StreamExecutorError)] -pub async fn instrument_await_tree( - info: Arc, - actor_id: ActorId, - input: impl MessageStream, -) { +pub async fn instrument_await_tree(info: Arc, input: impl MessageStream) { pin_mut!(input); - let span: await_tree::Span = info.identity.into(); + let span: await_tree::Span = info.identity.clone().into(); while let Some(message) = input .next()