From 8fe7325561c9880cac332245048be640fb7540db Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 6 Aug 2024 16:12:35 +0800 Subject: [PATCH] add log --- src/meta/src/barrier/creating_job_control.rs | 2 +- src/storage/src/hummock/utils.rs | 5 +++-- src/stream/src/task/barrier_manager.rs | 16 ++++++++++++++-- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/meta/src/barrier/creating_job_control.rs b/src/meta/src/barrier/creating_job_control.rs index 89a00c6be5d54..08ac840df5765 100644 --- a/src/meta/src/barrier/creating_job_control.rs +++ b/src/meta/src/barrier/creating_job_control.rs @@ -380,7 +380,7 @@ impl CreatingStreamingJobControl { break; } } - error!( + debug!( epoch, worker_id, collected = ?self.collected_barrier.iter().map(|(epoch, _)| *epoch).collect_vec(), diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 1e1b230964020..883bae08cb00c 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -597,7 +597,7 @@ pub(crate) async fn wait_for_epoch( ) -> StorageResult<()> { let mut receiver = notifier.subscribe(); // avoid unnecessary check in the loop if the value does not change - let max_committed_epoch = *receiver.borrow_and_update(); + let mut max_committed_epoch = *receiver.borrow_and_update(); if max_committed_epoch >= wait_epoch { return Ok(()); } @@ -615,6 +615,7 @@ pub(crate) async fn wait_for_epoch( // See #3845 for more details. tracing::warn!( epoch = wait_epoch, + max_committed_epoch, "wait_epoch timeout when waiting for version update", ); continue; @@ -623,7 +624,7 @@ pub(crate) async fn wait_for_epoch( return Err(HummockError::wait_epoch("tx dropped").into()); } Ok(Ok(_)) => { - let max_committed_epoch = *receiver.borrow(); + max_committed_epoch = *receiver.borrow(); if max_committed_epoch >= wait_epoch { return Ok(()); } diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 0c0ef21a74ac1..28bb450d3c07d 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -521,6 +521,18 @@ impl LocalBarrierWorker { create_actor_result: StreamResult, ) { let result = create_actor_result.map(|output| { + // TODO: should be removed before merged + let actors = output + .actors + .iter() + .map(|actor| actor.actor_context.id) + .collect_vec(); + let senders = output + .senders + .iter() + .map(|(actor_id, senders)| (*actor_id, senders.len())) + .collect::>(); + error!(?actors, ?senders, "actor created"); for (actor_id, senders) in output.senders { self.register_sender(actor_id, senders); } @@ -715,7 +727,7 @@ impl LocalBarrierWorker { /// Register sender for source actors, used to send barriers. fn register_sender(&mut self, actor_id: ActorId, senders: Vec>) { - tracing::debug!( + tracing::error!( target: "events::stream::barrier::manager", actor_id = actor_id, "register sender" @@ -765,7 +777,7 @@ impl LocalBarrierWorker { .watermark_epoch .store(barrier.epoch.curr, std::sync::atomic::Ordering::SeqCst); } - debug!( + error!( target: "events::stream::barrier::manager::send", "send barrier {:?}, senders = {:?}, actor_ids_to_collect = {:?}", barrier,