Skip to content

Commit

Permalink
refactor: per review
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Dec 19, 2024
1 parent 1bc84a9 commit 201b683
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ impl FlowWorkerManager {
let default_interval = Duration::from_secs(1);
let mut avg_spd = 0; // rows/sec
let mut since_last_run = tokio::time::Instant::now();
let run_per_trace = 5;
let run_per_trace = 10;
let mut run_cnt = 0;
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
Expand Down
1 change: 0 additions & 1 deletion src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ impl Context<'_, '_> {
reduce_plan: &ReducePlan,
output_type: &RelationType,
) -> Result<CollectionBundle<Batch>, Error> {
common_telemetry::debug!("render reduce batch");
let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan {
if !accum_plan.distinct_aggrs.is_empty() {
NotImplementedSnafu {
Expand Down
7 changes: 2 additions & 5 deletions src/flow/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,8 @@ impl Batch {
dts.push(datatypes::prelude::ConcreteDataType::null_datatype())
}
}
if self.batch.is_empty() {
other.batch.iter().map(|v| v.data_type()).collect_vec()
} else {
self.batch.iter().map(|v| v.data_type()).collect_vec()
}

dts
};

let batch_builders = dts
Expand Down

0 comments on commit 201b683

Please sign in to comment.