Skip to content

Commit

Permalink
refactor(hash): remove Result in HashKey::build (#15375)
Browse files Browse the repository at this point in the history
Signed-off-by: TennyZhuang <[email protected]>
  • Loading branch information
TennyZhuang authored Mar 1, 2024
1 parent d888f28 commit 8d836d6
Show file tree
Hide file tree
Showing 12 changed files with 31 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl<K: HashKey> GroupTopNExecutor<K> {
#[for_await]
for chunk in self.child.execute() {
let chunk = Arc::new(chunk?);
let keys = K::build(self.group_key.as_slice(), &chunk)?;
let keys = K::build_many(self.group_key.as_slice(), &chunk);

for (row_id, ((encoded_row, key), visible)) in
encode_chunk(&chunk, &self.column_orders)?
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
#[for_await]
for chunk in self.child.execute() {
let chunk = StreamChunk::from(chunk?);
let keys = K::build(self.group_key_columns.as_slice(), &chunk)?;
let keys = K::build_many(self.group_key_columns.as_slice(), &chunk);
let mut memory_usage_diff = 0;
for (row_id, (key, visible)) in keys
.into_iter()
Expand Down
26 changes: 13 additions & 13 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl<K: HashKey> HashJoinExecutor<K> {

// Build hash map
for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
let build_keys = K::build(&self.build_key_idxs, build_chunk)?;
let build_keys = K::build_many(&self.build_key_idxs, build_chunk);

for (build_row_id, (build_key, visible)) in build_keys
.into_iter()
Expand Down Expand Up @@ -354,7 +354,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down Expand Up @@ -417,7 +417,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down Expand Up @@ -486,7 +486,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down Expand Up @@ -564,7 +564,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down Expand Up @@ -628,7 +628,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down Expand Up @@ -703,7 +703,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down Expand Up @@ -785,7 +785,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down Expand Up @@ -850,7 +850,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down Expand Up @@ -922,7 +922,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_key, visible) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down Expand Up @@ -976,7 +976,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down Expand Up @@ -1049,7 +1049,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down Expand Up @@ -1131,7 +1131,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for probe_chunk in probe_side.execute() {
let probe_chunk = probe_chunk?;
let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?;
let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
for (probe_row_id, (probe_key, visible)) in probe_keys
.iter()
.zip_eq_fast(probe_chunk.visibility().iter())
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/lookup_join_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl<K: HashKey> LookupJoinBase<K> {

// Build hash map
for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
let build_keys = K::build(&hash_join_build_side_key_idxs, build_chunk)?;
let build_keys = K::build_many(&hash_join_build_side_key_idxs, build_chunk);

for (build_row_id, (build_key, visible)) in build_keys
.into_iter()
Expand Down
4 changes: 2 additions & 2 deletions src/common/benches/bench_hash_key_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<K: HashKey> HashKeyBenchCase<K> {
// please reference the `bench_vec_deser` and `bench_deser` method for benchmarking partial
// `col_idxes`
let col_idxes = (0..input_chunk.columns().len()).collect_vec();
let keys = HashKey::build(&col_idxes, &input_chunk).unwrap();
let keys = HashKey::build_many(&col_idxes, &input_chunk);
Self {
id,
input_chunk,
Expand All @@ -94,7 +94,7 @@ impl<K: HashKey> HashKeyBenchCase<K> {
pub fn bench_vec_ser(&self, c: &mut Criterion) {
let vectorize_serialize_id = "vec ser ".to_string() + &self.id;
c.bench_function(&vectorize_serialize_id, |b| {
b.iter(|| K::build(&self.col_idxes, &self.input_chunk).unwrap())
b.iter(|| K::build_many(&self.col_idxes, &self.input_chunk))
});
}

Expand Down
10 changes: 4 additions & 6 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,8 +698,7 @@ mod tests {
100,
PrecomputedBuildHasher,
);
let keys =
K::build(column_indexes.as_slice(), &data).expect("Failed to build hash keys");
let keys = K::build_many(column_indexes.as_slice(), &data);

for (row_id, key) in keys.into_iter().enumerate() {
let row_ids = fast_hash_map.entry(key).or_default();
Expand Down Expand Up @@ -741,7 +740,7 @@ mod tests {
F: FnOnce() -> (DataChunk, Vec<DataType>),
{
let (data, types) = data_gen();
let keys = K::build(column_indexes.as_slice(), &data).expect("Failed to build hash keys");
let keys = K::build_many(column_indexes.as_slice(), &data);

let mut array_builders = column_indexes
.iter()
Expand Down Expand Up @@ -839,15 +838,14 @@ mod tests {
// losslessly.
#[test]
fn test_simple_hash_key_nullable_serde() {
let keys = Key64::build(
let keys = Key64::build_many(
&[0, 1],
&DataChunk::from_pretty(
"i i
1 .
. 2",
),
)
.unwrap();
);

let mut array_builders = [0, 1]
.iter()
Expand Down
8 changes: 3 additions & 5 deletions src/common/src/hash/key_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,8 @@ pub trait HashKey:
// TODO: rename to `NullBitmap` and note that high bit represents null!
type Bitmap: NullBitmap;

// TODO: remove result and rename to `build_many`
/// Build hash keys from the given `data_chunk` with `column_indices` in a batch.
fn build(column_indices: &[usize], data_chunk: &DataChunk) -> ArrayResult<Vec<Self>>;
fn build_many(column_indices: &[usize], data_chunk: &DataChunk) -> Vec<Self>;

/// Deserializes the hash key into a row.
fn deserialize(&self, data_types: &[DataType]) -> ArrayResult<OwnedRow>;
Expand Down Expand Up @@ -288,7 +287,7 @@ impl<S: KeyStorage, N: NullBitmap> EstimateSize for HashKeyImpl<S, N> {
impl<S: KeyStorage, N: NullBitmap> HashKey for HashKeyImpl<S, N> {
type Bitmap = N;

fn build(column_indices: &[usize], data_chunk: &DataChunk) -> ArrayResult<Vec<Self>> {
fn build_many(column_indices: &[usize], data_chunk: &DataChunk) -> Vec<Self> {
let hash_codes = data_chunk.get_hash_values(column_indices, XxHash64Builder);

let mut serializers = {
Expand Down Expand Up @@ -331,8 +330,7 @@ impl<S: KeyStorage, N: NullBitmap> HashKey for HashKeyImpl<S, N> {
});
}

let hash_keys = serializers.into_iter().map(|s| s.finish()).collect();
Ok(hash_keys)
serializers.into_iter().map(|s| s.finish()).collect()
}

fn deserialize(&self, data_types: &[DataType]) -> ArrayResult<OwnedRow> {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
chunk: StreamChunk,
) -> StreamExecutorResult<()> {
// Find groups in this chunk and generate visibility for each group key.
let keys = K::build(&this.group_key_indices, chunk.data_chunk())?;
let keys = K::build_many(&this.group_key_indices, chunk.data_chunk());
let group_visibilities = Self::get_group_visibilities(keys, chunk.visibility());

// Ensure all `AggGroup`s are in `dirty_groups`.
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
&side_update.ht.table_id().to_string(),
]);

let keys = K::build(&side_update.join_key_indices, chunk.data_chunk())?;
let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
let Some((op, row)) = r else {
continue;
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl<K: HashKey, S: StateStore> TemporalSide<K, S> {
right_stream_key_indices: &[usize],
) -> StreamExecutorResult<()> {
for chunk in chunks {
let keys = K::build(join_keys, chunk.data_chunk())?;
let keys = K::build_many(join_keys, chunk.data_chunk());
for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
let Some((op, row)) = r else {
continue;
Expand Down Expand Up @@ -436,7 +436,7 @@ mod phase1 {
chunk: StreamChunk,
) {
let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
let keys = K::build(left_join_keys, chunk.data_chunk())?;
let keys = K::build_many(left_join_keys, chunk.data_chunk());
let to_fetch_keys = chunk
.visibility()
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ where
async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult<StreamChunk> {
let mut res_ops = Vec::with_capacity(self.limit);
let mut res_rows = Vec::with_capacity(self.limit);
let keys = K::build(&self.group_by, chunk.data_chunk())?;
let keys = K::build_many(&self.group_by, chunk.data_chunk());
let table_id_str = self.managed_state.table().table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n/group_top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where
async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult<StreamChunk> {
let mut res_ops = Vec::with_capacity(self.limit);
let mut res_rows = Vec::with_capacity(self.limit);
let keys = K::build(&self.group_by, chunk.data_chunk())?;
let keys = K::build_many(&self.group_by, chunk.data_chunk());

let data_types = self.schema.data_types();
let row_deserializer = RowDeserializer::new(data_types.clone());
Expand Down

0 comments on commit 8d836d6

Please sign in to comment.