Skip to content

Commit

Permalink
add log
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 6, 2024
1 parent 783ff1e commit 8fe7325
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/barrier/creating_job_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl CreatingStreamingJobControl {
break;
}
}
error!(
debug!(
epoch,
worker_id,
collected = ?self.collected_barrier.iter().map(|(epoch, _)| *epoch).collect_vec(),
Expand Down
5 changes: 3 additions & 2 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ pub(crate) async fn wait_for_epoch(
) -> StorageResult<()> {
let mut receiver = notifier.subscribe();
// avoid unnecessary check in the loop if the value does not change
let max_committed_epoch = *receiver.borrow_and_update();
let mut max_committed_epoch = *receiver.borrow_and_update();
if max_committed_epoch >= wait_epoch {
return Ok(());
}
Expand All @@ -615,6 +615,7 @@ pub(crate) async fn wait_for_epoch(
// See #3845 for more details.
tracing::warn!(
epoch = wait_epoch,
max_committed_epoch,
"wait_epoch timeout when waiting for version update",
);
continue;
Expand All @@ -623,7 +624,7 @@ pub(crate) async fn wait_for_epoch(
return Err(HummockError::wait_epoch("tx dropped").into());
}
Ok(Ok(_)) => {
let max_committed_epoch = *receiver.borrow();
max_committed_epoch = *receiver.borrow();
if max_committed_epoch >= wait_epoch {
return Ok(());
}
Expand Down
16 changes: 14 additions & 2 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,18 @@ impl LocalBarrierWorker {
create_actor_result: StreamResult<CreateActorOutput>,
) {
let result = create_actor_result.map(|output| {
// TODO: should be removed before merged
let actors = output
.actors
.iter()
.map(|actor| actor.actor_context.id)
.collect_vec();
let senders = output
.senders
.iter()
.map(|(actor_id, senders)| (*actor_id, senders.len()))
.collect::<HashMap<_, _>>();
error!(?actors, ?senders, "actor created");
for (actor_id, senders) in output.senders {
self.register_sender(actor_id, senders);
}
Expand Down Expand Up @@ -715,7 +727,7 @@ impl LocalBarrierWorker {

/// Register sender for source actors, used to send barriers.
fn register_sender(&mut self, actor_id: ActorId, senders: Vec<UnboundedSender<Barrier>>) {
tracing::debug!(
tracing::error!(
target: "events::stream::barrier::manager",
actor_id = actor_id,
"register sender"
Expand Down Expand Up @@ -765,7 +777,7 @@ impl LocalBarrierWorker {
.watermark_epoch
.store(barrier.epoch.curr, std::sync::atomic::Ordering::SeqCst);
}
debug!(
error!(
target: "events::stream::barrier::manager::send",
"send barrier {:?}, senders = {:?}, actor_ids_to_collect = {:?}",
barrier,
Expand Down

0 comments on commit 8fe7325

Please sign in to comment.