Skip to content

Commit

Permalink
add some logs
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Aug 14, 2024
1 parent 7091646 commit 67cda7c
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
3 changes: 2 additions & 1 deletion ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ risingwave_meta::rpc::ddl_controller=debug,\
risingwave_meta::barrier::mod=debug,\
risingwave_simulation=debug,\
risingwave_meta::stream::stream_manager=debug,\
risingwave_meta::barrier::progress=debug"
risingwave_meta::barrier::progress=debug,\
risingwave_stream::executor::approx_percentile=debug"

# Extra logs you can enable if the existing trace does not give enough info.
#risingwave_stream::executor::backfill=trace,
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/approx_percentile/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl<S: StateStore> GlobalApproxPercentileExecutor<S> {
state.apply_chunk(chunk).await?;
}
Message::Barrier(barrier) => {
yield Message::Chunk(state.get_output());
let output = state.get_output();
yield Message::Chunk(output);
state.commit(barrier.epoch).await?;
yield Message::Barrier(barrier);
}
Expand Down
11 changes: 8 additions & 3 deletions src/stream/src/executor/approx_percentile/global_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl<S: StateStore> GlobalApproxPercentileState<S> {
} else {
Some(self.cache.get_output(row_count, self.quantile, self.base))
};
tracing::debug!(?last_output, "recovered last_output");
self.last_output = last_output;
Ok(())
}
Expand Down Expand Up @@ -222,15 +223,16 @@ impl<S: StateStore> GlobalApproxPercentileState<S> {
pub fn get_output(&mut self) -> StreamChunk {
let last_output = mem::take(&mut self.last_output);
let new_output = if !self.output_changed {
tracing::debug!("last_output: {:#?}", last_output);
last_output.clone().flatten()
} else {
let new_output = self
.cache
.get_output(self.row_count, self.quantile, self.base);
self.last_output = Some(new_output.clone());
new_output
};
match last_output {
self.last_output = Some(new_output.clone());
let output_chunk = match last_output {
None => StreamChunk::from_rows(&[(Op::Insert, &[new_output])], &[DataType::Float64]),
Some(last_output) if !self.output_changed => StreamChunk::from_rows(
&[
Expand All @@ -246,7 +248,10 @@ impl<S: StateStore> GlobalApproxPercentileState<S> {
],
&[DataType::Float64],
),
}
};
tracing::debug!("get_output: {:#?}", output_chunk, );
self.output_changed = false;
output_chunk
}
}

Expand Down

0 comments on commit 67cda7c

Please sign in to comment.