Skip to content

Commit

Permalink
refactor: cleanup deprecated methods of Row (risingwavelabs#6577)
Browse files Browse the repository at this point in the history
* data chunk builder

Signed-off-by: Bugen Zhao <[email protected]>

* fewer datum

Signed-off-by: Bugen Zhao <[email protected]>

* add repeat_n

Signed-off-by: Bugen Zhao <[email protected]>

* more row

Signed-off-by: Bugen Zhao <[email protected]>

* hash datum ref

Signed-off-by: Bugen Zhao <[email protected]>

* fewer and fewer &Datum

Signed-off-by: Bugen Zhao <[email protected]>

* remove more row interfaces

Signed-off-by: Bugen Zhao <[email protected]>

* remove concat

Signed-off-by: Bugen Zhao <[email protected]>

* remove size

Signed-off-by: Bugen Zhao <[email protected]>

* remove values

Signed-off-by: Bugen Zhao <[email protected]>

* remove serialize

Signed-off-by: Bugen Zhao <[email protected]>

* clippy

Signed-off-by: Bugen Zhao <[email protected]>

* restore ord

Signed-off-by: Bugen Zhao <[email protected]>

* add header

Signed-off-by: Bugen Zhao <[email protected]>

* remove from

Signed-off-by: Bugen Zhao <[email protected]>

* make row inner private

Signed-off-by: Bugen Zhao <[email protected]>

* minor fixes

Signed-off-by: Bugen Zhao <[email protected]>

* remove to datum ref

Signed-off-by: Bugen Zhao <[email protected]>

* fix compiling

Signed-off-by: Bugen Zhao <[email protected]>

Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
BugenZhao and mergify[bot] authored Nov 26, 2022
1 parent 1c3cce7 commit 0128ff1
Show file tree
Hide file tree
Showing 85 changed files with 714 additions and 841 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl<K: HashKey> GroupTopNExecutor<K> {
for (_, heap) in groups {
for HeapElem { chunk, row_id, .. } in heap.dump() {
if let Some(spilled) =
chunk_builder.append_one_row_ref(chunk.row_at_unchecked_vis(row_id))
chunk_builder.append_one_row(chunk.row_at_unchecked_vis(row_id))
{
yield spilled
}
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
swap(&mut new_row_list, &mut self.row_list);

for row in new_row_list {
if let Some(chunk) = data_chunk_builder.append_one_row_from_datums(row.values()) {
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
chunk_list.push(chunk);
}
}
Expand Down
15 changes: 5 additions & 10 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ use std::sync::Arc;

use fixedbitset::FixedBitSet;
use futures_async_stream::try_stream;
use itertools::{repeat_n, Itertools};
use itertools::Itertools;
use risingwave_common::array::{Array, DataChunk, RowRef};
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::Schema;
use risingwave_common::error::{Result, RwError};
use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
use risingwave_common::types::DataType;
use risingwave_common::row::{repeat_n, RowExt};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression};
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -1376,21 +1377,15 @@ impl<K: HashKey> HashJoinExecutor<K> {
probe_row_ref: RowRef<'_>,
build_column_count: usize,
) -> Option<DataChunk> {
chunk_builder.append_one_row_from_datum_refs(
probe_row_ref
.values()
.chain(repeat_n(None, build_column_count)),
)
chunk_builder.append_one_row(probe_row_ref.chain(repeat_n(Datum::None, build_column_count)))
}

fn append_one_row_with_null_probe_side(
chunk_builder: &mut DataChunkBuilder,
build_row_ref: RowRef<'_>,
probe_column_count: usize,
) -> Option<DataChunk> {
chunk_builder.append_one_row_from_datum_refs(
repeat_n(None, probe_column_count).chain(build_row_ref.values()),
)
chunk_builder.append_one_row(repeat_n(Datum::None, probe_column_count).chain(build_row_ref))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ fn convert_datum_refs_to_chunk(
.collect();
for _i in 0..num_tuples {
for (builder, datum_ref) in output_array_builders.iter_mut().zip_eq(datum_refs) {
builder.append_datum_ref(*datum_ref);
builder.append_datum(*datum_ref);
}
}

Expand Down
27 changes: 12 additions & 15 deletions src/batch/src/executor/join/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

use futures::TryStreamExt;
use futures_async_stream::try_stream;
use itertools::{repeat_n, Itertools};
use itertools::Itertools;
use risingwave_common::array::data_chunk_iter::RowRef;
use risingwave_common::array::{Array, DataChunk};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::Schema;
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_common::row::{repeat_n, RowExt};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_expr::expr::{
build_from_prost as expr_build_from_prost, BoxedExpression, Expression,
Expand Down Expand Up @@ -281,10 +282,8 @@ impl NestedLoopJoinExecutor {
.zip_eq(matched.finish().iter())
.filter(|(_, matched)| !*matched)
{
let datum_refs = left_row
.values()
.chain(repeat_n(None, right_data_types.len()));
if let Some(chunk) = chunk_builder.append_one_row_from_datum_refs(datum_refs) {
let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
if let Some(chunk) = chunk_builder.append_one_row(row) {
yield chunk
}
}
Expand Down Expand Up @@ -323,7 +322,7 @@ impl NestedLoopJoinExecutor {
.zip_eq(matched.finish().iter())
.filter(|(_, matched)| if ANTI_JOIN { !*matched } else { *matched })
{
if let Some(chunk) = chunk_builder.append_one_row_ref(left_row) {
if let Some(chunk) = chunk_builder.append_one_row(left_row) {
yield chunk
}
}
Expand Down Expand Up @@ -363,8 +362,8 @@ impl NestedLoopJoinExecutor {
.zip_eq(matched.iter())
.filter(|(_, matched)| !*matched)
{
let datum_refs = repeat_n(None, left_data_types.len()).chain(right_row.values());
if let Some(chunk) = chunk_builder.append_one_row_from_datum_refs(datum_refs) {
let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
if let Some(chunk) = chunk_builder.append_one_row(row) {
yield chunk
}
}
Expand Down Expand Up @@ -445,8 +444,8 @@ impl NestedLoopJoinExecutor {
.zip_eq(right_matched.iter())
.filter(|(_, matched)| !*matched)
{
let datum_refs = repeat_n(None, left_data_types.len()).chain(right_row.values());
if let Some(chunk) = chunk_builder.append_one_row_from_datum_refs(datum_refs) {
let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
if let Some(chunk) = chunk_builder.append_one_row(row) {
yield chunk
}
}
Expand All @@ -458,10 +457,8 @@ impl NestedLoopJoinExecutor {
.zip_eq(left_matched.finish().iter())
.filter(|(_, matched)| !*matched)
{
let datum_refs = left_row
.values()
.chain(repeat_n(None, right_data_types.len()));
if let Some(chunk) = chunk_builder.append_one_row_from_datum_refs(datum_refs) {
let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
if let Some(chunk) = chunk_builder.append_one_row(row) {
yield chunk
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/batch/src/executor/join/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use futures_async_stream::try_stream;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::RwError;
use risingwave_common::row::RowExt;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -135,7 +136,7 @@ impl SortMergeJoinExecutor {
if let Some(last_probe_key) = &last_probe_key && *last_probe_key == probe_key {
for (chunk, row_idx) in &last_matched_build_rows {
let build_row = chunk.row_at_unchecked_vis(*row_idx);
if let Some(spilled) = chunk_builder.append_one_row_from_datum_refs(probe_row.values().chain(build_row.values())) {
if let Some(spilled) = chunk_builder.append_one_row((&probe_row).chain(build_row)) {
yield spilled
}
}
Expand All @@ -152,7 +153,7 @@ impl SortMergeJoinExecutor {
// [`ScalarPartialOrd`].
if probe_key == build_key {
last_matched_build_rows.push((build_chunk.clone(), next_build_row_idx));
if let Some(spilled) = chunk_builder.append_one_row_from_datum_refs(probe_row.values().chain(build_row.values())) {
if let Some(spilled) = chunk_builder.append_one_row((&probe_row).chain(build_row)) {
yield spilled
}
} else if ASCENDING && probe_key < build_key || !ASCENDING && probe_key > build_key {
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl SortExecutor {
encoded_rows.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));

for (row, _) in encoded_rows {
if let Some(spilled) = chunk_builder.append_one_row_ref(row) {
if let Some(spilled) = chunk_builder.append_one_row(row) {
yield spilled
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl ProjectSetExecutor {
}
Either::Right(datum_ref) => {
for _ in 0..max_tf_len {
builder.append_datum_ref(datum_ref);
builder.append_datum(datum_ref);
}
}
}
Expand Down
28 changes: 15 additions & 13 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::array::DataChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
use risingwave_common::error::{Result, RwError};
use risingwave_common::row::Row;
use risingwave_common::row::{Row, Row2};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::select_all;
Expand Down Expand Up @@ -76,14 +76,16 @@ impl ScanRange {
scan_range: ProstScanRange,
mut pk_types: impl Iterator<Item = DataType>,
) -> Result<Self> {
let pk_prefix = Row(scan_range
.eq_conds
.iter()
.map(|v| {
let ty = pk_types.next().unwrap();
deserialize_datum(v.as_slice(), &ty)
})
.try_collect()?);
let pk_prefix = Row::new(
scan_range
.eq_conds
.iter()
.map(|v| {
let ty = pk_types.next().unwrap();
deserialize_datum(v.as_slice(), &ty)
})
.try_collect()?,
);
if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() {
return Ok(Self {
pk_prefix,
Expand Down Expand Up @@ -307,15 +309,15 @@ impl<S: StateStore> RowSeqScanExecutor<S> {

let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges
.into_iter()
.partition(|x| x.pk_prefix.size() == table.pk_indices().len());
.partition(|x| x.pk_prefix.len() == table.pk_indices().len());

let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);
// Point Get
for point_get in point_gets {
let table = table.clone();
let histogram = histogram.clone();
if let Some(row) = Self::execute_point_get(table, point_get, epoch, histogram).await? {
if let Some(chunk) = data_chunk_builder.append_one_row_from_datums(row.values()) {
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
yield chunk;
}
}
Expand Down Expand Up @@ -345,7 +347,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
histogram: Option<Histogram>,
) -> Result<Option<Row>> {
let pk_prefix = scan_range.pk_prefix;
assert!(pk_prefix.size() == table.pk_indices().len());
assert!(pk_prefix.len() == table.pk_indices().len());

let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

Expand Down Expand Up @@ -375,7 +377,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
} = scan_range;

// Range Scan.
assert!(pk_prefix.size() < table.pk_indices().len());
assert!(pk_prefix.len() < table.pk_indices().len());
let iter = table
.batch_iter_with_pk_bounds(
HummockReadEpoch::Committed(epoch),
Expand Down
5 changes: 3 additions & 2 deletions src/batch/src/executor/sys_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, SysCatalogReaderRef, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::row::Row;
use risingwave_common::row::{Row, Row2};
use risingwave_common::types::ToOwnedDatum;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::executor::{
Expand Down Expand Up @@ -113,7 +114,7 @@ impl SysRowSeqScanExecutor {
let datums = self
.column_ids
.iter()
.map(|column_id| row.0.get(column_id.get_id() as usize).cloned().unwrap())
.map(|column_id| row.datum_at(column_id.get_id() as usize).to_owned_datum())
.collect_vec();
Row::new(datums)
})
Expand Down
3 changes: 2 additions & 1 deletion src/batch/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::array::{DataChunk, DataChunkTestExt};
use risingwave_common::catalog::Schema;
use risingwave_common::error::{Result, RwError};
use risingwave_common::field_generator::FieldGeneratorImpl;
use risingwave_common::row::Row2;
use risingwave_common::types::{DataType, Datum, ToOwnedDatum};
use risingwave_expr::expr::BoxedExpression;
use risingwave_pb::batch_plan::ExchangeSource as ProstExchangeSource;
Expand Down Expand Up @@ -326,7 +327,7 @@ impl LookupExecutorBuilder for FakeInnerSideExecutorBuilder {
let probe_row = base_data_chunk.row_at_unchecked_vis(idx);
for datum in &self.datums {
if datum[0] == probe_row.value_at(0).to_owned_datum() {
let owned_row = probe_row.to_owned_row();
let owned_row = probe_row.into_owned_row();
let chunk =
DataChunk::from_rows(&[owned_row], &[DataType::Int32, DataType::Float32]);
mock_executor.add(chunk);
Expand Down
3 changes: 1 addition & 2 deletions src/batch/src/executor/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ impl TopNExecutor {

let mut chunk_builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
for HeapElem { chunk, row_id, .. } in heap.dump() {
if let Some(spilled) =
chunk_builder.append_one_row_ref(chunk.row_at_unchecked_vis(row_id))
if let Some(spilled) = chunk_builder.append_one_row(chunk.row_at_unchecked_vis(row_id))
{
yield spilled
}
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl UpdateExecutor {
.flat_map(|(a, b)| [a, b])
{
for (datum_ref, builder) in row.values().zip_eq(builders.iter_mut()) {
builder.append_datum_ref(datum_ref);
builder.append_datum(datum_ref);
}
}
let columns = builders.into_iter().map(|b| b.finish().into()).collect();
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/consistent_hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn generate_hash_values(
chunk: &DataChunk,
consistent_hash_info: &ConsistentHashInfo,
) -> BatchResult<Vec<usize>> {
let hasher_builder = Crc32FastBuilder {};
let hasher_builder = Crc32FastBuilder;

let hash_values = chunk
.get_hash_values(
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct HashShuffleReceiver {
fn generate_hash_values(chunk: &DataChunk, hash_info: &HashInfo) -> BatchResult<Vec<usize>> {
let output_count = hash_info.output_count as usize;

let hasher_builder = Crc32FastBuilder {};
let hasher_builder = Crc32FastBuilder;

let hash_values = chunk
.get_hash_values(
Expand Down
14 changes: 7 additions & 7 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use crate::array::data_chunk_iter::RowRef;
use crate::array::{ArrayBuilderImpl, StructValue};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::hash::HashCode;
use crate::row::Row;
use crate::row::{Row, Row2};
use crate::types::struct_type::StructType;
use crate::types::to_text::ToText;
use crate::types::{DataType, Datum, NaiveDateTimeWrapper, ToOwnedDatum};
use crate::util::hash_util::finalize_hashers;
use crate::util::value_encoding::serialize_datum_ref;
use crate::util::value_encoding::serialize_datum;

/// `DataChunk` is a collection of arrays with visibility mask.
#[derive(Clone, PartialEq)]
Expand Down Expand Up @@ -72,7 +72,7 @@ impl DataChunk {
.collect::<Vec<_>>();

for row in rows {
for (datum, builder) in row.0.iter().zip_eq(array_builders.iter_mut()) {
for (datum, builder) in row.iter().zip_eq(array_builders.iter_mut()) {
builder.append_datum(datum);
}
}
Expand Down Expand Up @@ -259,7 +259,7 @@ impl DataChunk {
.array_ref()
.create_builder(end_row_idx - start_row_idx + 1);
for row_idx in start_row_idx..=end_row_idx {
array_builder.append_datum_ref(column.array_ref().value_at(row_idx));
array_builder.append_datum(column.array_ref().value_at(row_idx));
}
builder.append_array(&array_builder.finish());
});
Expand Down Expand Up @@ -385,7 +385,7 @@ impl DataChunk {
.collect();
for &i in indexes {
for (builder, col) in array_builders.iter_mut().zip_eq(&self.columns) {
builder.append_datum_ref(col.array_ref().value_at(i));
builder.append_datum(col.array_ref().value_at(i));
}
}
let columns = array_builders
Expand All @@ -411,7 +411,7 @@ impl DataChunk {
// SAFETY(value_at_unchecked): the idx is always in bound.
unsafe {
if vis.is_set_unchecked(i) {
serialize_datum_ref(&c.value_at_unchecked(i), buffer);
serialize_datum(c.value_at_unchecked(i), buffer);
}
}
}
Expand All @@ -426,7 +426,7 @@ impl DataChunk {
for (i, buffer) in buffers.iter_mut().enumerate() {
// SAFETY(value_at_unchecked): the idx is always in bound.
unsafe {
serialize_datum_ref(&c.value_at_unchecked(i), buffer);
serialize_datum(c.value_at_unchecked(i), buffer);
}
}
}
Expand Down
Loading

0 comments on commit 0128ff1

Please sign in to comment.