From 67cda7c6734a8a4b7b2e8e33111a92642127e892 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 14 Aug 2024 12:09:10 +0800 Subject: [PATCH] add some logs --- ci/scripts/deterministic-recovery-test.sh | 3 ++- src/stream/src/executor/approx_percentile/global.rs | 3 ++- .../src/executor/approx_percentile/global_state.rs | 11 ++++++++--- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/ci/scripts/deterministic-recovery-test.sh b/ci/scripts/deterministic-recovery-test.sh index 1a400d4ade9e0..627139069f32b 100755 --- a/ci/scripts/deterministic-recovery-test.sh +++ b/ci/scripts/deterministic-recovery-test.sh @@ -15,7 +15,8 @@ risingwave_meta::rpc::ddl_controller=debug,\ risingwave_meta::barrier::mod=debug,\ risingwave_simulation=debug,\ risingwave_meta::stream::stream_manager=debug,\ -risingwave_meta::barrier::progress=debug" +risingwave_meta::barrier::progress=debug,\ +risingwave_stream::executor::approx_percentile=debug" # Extra logs you can enable if the existing trace does not give enough info. #risingwave_stream::executor::backfill=trace, diff --git a/src/stream/src/executor/approx_percentile/global.rs b/src/stream/src/executor/approx_percentile/global.rs index 4fb450062ea75..6815c9240e26d 100644 --- a/src/stream/src/executor/approx_percentile/global.rs +++ b/src/stream/src/executor/approx_percentile/global.rs @@ -64,7 +64,8 @@ impl GlobalApproxPercentileExecutor { state.apply_chunk(chunk).await?; } Message::Barrier(barrier) => { - yield Message::Chunk(state.get_output()); + let output = state.get_output(); + yield Message::Chunk(output); state.commit(barrier.epoch).await?; yield Message::Barrier(barrier); } diff --git a/src/stream/src/executor/approx_percentile/global_state.rs b/src/stream/src/executor/approx_percentile/global_state.rs index e414142d4db31..89a31145eae07 100644 --- a/src/stream/src/executor/approx_percentile/global_state.rs +++ b/src/stream/src/executor/approx_percentile/global_state.rs @@ -77,6 +77,7 @@ impl GlobalApproxPercentileState { } else { Some(self.cache.get_output(row_count, self.quantile, self.base)) }; + tracing::debug!(?last_output, "recovered last_output"); self.last_output = last_output; Ok(()) } @@ -222,15 +223,16 @@ impl GlobalApproxPercentileState { pub fn get_output(&mut self) -> StreamChunk { let last_output = mem::take(&mut self.last_output); let new_output = if !self.output_changed { + tracing::debug!("last_output: {:#?}", last_output); last_output.clone().flatten() } else { let new_output = self .cache .get_output(self.row_count, self.quantile, self.base); - self.last_output = Some(new_output.clone()); new_output }; - match last_output { + self.last_output = Some(new_output.clone()); + let output_chunk = match last_output { None => StreamChunk::from_rows(&[(Op::Insert, &[new_output])], &[DataType::Float64]), Some(last_output) if !self.output_changed => StreamChunk::from_rows( &[ @@ -246,7 +248,10 @@ impl GlobalApproxPercentileState { ], &[DataType::Float64], ), - } + }; + tracing::debug!("get_output: {:#?}", output_chunk, ); + self.output_changed = false; + output_chunk } }