Skip to content

Commit

Permalink
fix: fix again ck ty before cmp
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Dec 25, 2024
1 parent 03a1c51 commit 3cf378e
Showing 1 changed file with 39 additions and 28 deletions.
67 changes: 39 additions & 28 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::ops::Range;
use std::sync::Arc;

use arrow::array::BooleanArray;
use arrow::array::new_null_array;
use common_telemetry::trace;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
Expand Down Expand Up @@ -399,54 +399,65 @@ fn reduce_batch_subgraph(
}
}

// TODO: here reduce numbers of eq to minimal by keeping slicing key/val batch
// TODO(discord9): here reduce numbers of eq to minimal by keeping slicing key/val batch
for key_row in distinct_keys {
let key_scalar_value = {
let mut key_scalar_value = Vec::with_capacity(key_row.len());
for key in key_row.iter() {
for (key_idx, key) in key_row.iter().enumerate() {
let v =
key.try_to_scalar_value(&key.data_type())
.context(DataTypeSnafu {
msg: "can't convert key values to datafusion value",
})?;
let arrow_value =

// TODO(discord9): type check!!!!
let key_data_type = output_type
.column_types
.get(key_idx)
.context(InternalSnafu {
reason: format!(
"Key index out of bound, expected at most {} but got {}",
output_type.column_types.len(),
key_idx
),
})?
.clone()
.scalar_type;

if key_data_type.as_arrow_type() != v.data_type() {
crate::expr::error::InternalSnafu {
reason: format!(
"Key data type mismatch, expected {:?} but got {:?}",
key_data_type.as_arrow_type(),
v.data_type()
),
}
.fail()?
}

// handle single null key
let arrow_value = if v.data_type().is_null() {
let arrow_key_ty = key_data_type.as_arrow_type();
let ret = new_null_array(&arrow_key_ty, 1);
arrow::array::Scalar::new(ret)
} else {
v.to_scalar().context(crate::expr::error::DatafusionSnafu {
context: "can't convert key values to arrow value",
})?;
})?
};

key_scalar_value.push(arrow_value);
}
key_scalar_value
};

let all_false_array = {
let batch_len = key_batch.row_count();
let mut builder = BooleanArray::builder(batch_len);
builder
.append_values(&vec![false; batch_len], &vec![true; batch_len])
.context(ArrowSnafu {
context: "Failed to build a all false boolean array",
})?;
builder.finish()
};

// first compute equal from separate columns
let eq_results = key_scalar_value
.into_iter()
.zip(key_batch.batch().iter())
.map(|(key, col)| {
// TODO(discord9): this takes half of the cpu! And this is redundant amount of `eq`!
let lhs_ty = arrow::array::Datum::get(&key).0.data_type();
let rhs = col.to_arrow_array();
let rhs_ty = rhs.data_type();
if lhs_ty != rhs_ty {
// if type not match, return all false(this happens when one of them is null array)
Ok(all_false_array.clone())
} else {
arrow::compute::kernels::cmp::eq(
&key,
&col.to_arrow_array().as_ref() as _,
)
}
arrow::compute::kernels::cmp::eq(&key, &col.to_arrow_array().as_ref() as _)
})
.try_collect::<_, Vec<_>, _>()
.context(ArrowSnafu {
Expand Down

0 comments on commit 3cf378e

Please sign in to comment.