Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
fmt

fmt

fmt
  • Loading branch information
xxhZs committed Apr 25, 2024
1 parent dcddd3d commit da9d45d
Show file tree
Hide file tree
Showing 31 changed files with 417 additions and 493 deletions.
8 changes: 3 additions & 5 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,12 @@ message FilterNode {
message LogRowSeqScanNode{
plan_common.StorageTableDesc table_desc = 1;
repeated int32 column_ids = 2;
// All the ranges need to be read. i.e., they are OR'ed.
//
// Empty `scan_ranges` means full table scan.
repeated ScanRange scan_ranges = 3;
// The partition to read for scan tasks.
//
// Will be filled by the scheduler.
common.Buffer vnode_bitmap = 4;
common.Buffer vnode_bitmap = 3;
common.BatchQueryEpoch old_epoch = 4;
common.BatchQueryEpoch new_epoch = 5;
}

message InsertNode {
Expand Down
1 change: 0 additions & 1 deletion src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
source: &ExecutorBuilder<'_, C>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
println!("333");
ensure!(
inputs.is_empty(),
"Exchange executor should not have children!"
Expand Down
123 changes: 38 additions & 85 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::{Bound, Deref, RangeBounds};
use std::ops::{Bound, Deref};
use std::sync::Arc;

use futures::prelude::stream::StreamExt;
Expand All @@ -23,9 +23,6 @@ use risingwave_common::array::DataChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnId, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::epoch::{MAX_EPOCH, Epoch};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
Expand All @@ -34,8 +31,7 @@ use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::{dispatch_state_store, StateStore};

use super::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
RowSeqScanExecutor, ScanRange,
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, ScanRange,
};
use crate::error::{BatchError, Result};
use crate::monitor::BatchMetricsWithTaskLabels;
Expand All @@ -51,14 +47,16 @@ pub struct LogRowSeqScanExecutor<S: StateStore> {

table: StorageTable<S>,
scan_ranges: Vec<ScanRange>,
epoch: BatchQueryEpoch,
old_epoch: BatchQueryEpoch,
new_epoch: BatchQueryEpoch,
}

impl<S: StateStore> LogRowSeqScanExecutor<S> {
pub fn new(
table: StorageTable<S>,
scan_ranges: Vec<ScanRange>,
epoch: BatchQueryEpoch,
old_epoch: BatchQueryEpoch,
new_epoch: BatchQueryEpoch,
chunk_size: usize,
identity: String,
metrics: Option<BatchMetricsWithTaskLabels>,
Expand All @@ -69,7 +67,8 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
metrics,
table,
scan_ranges,
epoch,
old_epoch,
new_epoch,
}
}
}
Expand All @@ -82,7 +81,6 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
source: &ExecutorBuilder<'_, C>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
println!("111");
ensure!(
inputs.is_empty(),
"LogStore row sequential scan should not have input executor!"
Expand All @@ -93,6 +91,12 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
)?;

let table_desc: &StorageTableDesc = log_store_seq_scan_node.get_table_desc()?;
let op_id = log_store_seq_scan_node
.column_ids
.iter()
.max()
.map(ColumnId::from)
.unwrap();
let column_ids = log_store_seq_scan_node
.column_ids
.iter()
Expand All @@ -106,39 +110,24 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
None => Some(TableDistribution::all_vnodes()),
};
let scan_ranges = vec![ScanRange::full()];

let scan_ranges = {
let scan_ranges = &log_store_seq_scan_node.scan_ranges;
if scan_ranges.is_empty() {
vec![ScanRange::full()]
} else {
scan_ranges
.iter()
.map(|scan_range| {
let pk_types = table_desc.pk.iter().map(|order| {
DataType::from(
table_desc.columns[order.column_index as usize]
.column_type
.as_ref()
.unwrap(),
)
});
ScanRange::new(scan_range.clone(), pk_types)
})
.try_collect()?
}
};

let epoch = source.epoch.clone();
let chunk_size = source.context.get_config().developer.chunk_size as u32;
let metrics = source.context().batch_metrics();

dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
let table = StorageTable::new_partial_inner(
state_store,
column_ids,
vnodes,
table_desc,
vec![op_id],
);
Ok(Box::new(LogRowSeqScanExecutor::new(
table,
scan_ranges,
epoch,
log_store_seq_scan_node.old_epoch.clone().unwrap(),
log_store_seq_scan_node.new_epoch.clone().unwrap(),
chunk_size as usize,
source.plan_node().get_identity().clone(),
metrics,
Expand Down Expand Up @@ -169,7 +158,8 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
metrics,
table,
scan_ranges,
epoch,
old_epoch,
new_epoch,
} = *self;
let table = std::sync::Arc::new(table);

Expand All @@ -180,44 +170,22 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
.row_seq_scan_next_duration
.with_guarded_label_values(&metrics.executor_labels(&identity))
});

// if ordered {
// // Currently we execute range-scans concurrently so the order is not guaranteed if
// // there're multiple ranges.
// // TODO: reserve the order for multiple ranges.
// assert_eq!(scan_ranges.len(), 1);
// }

// the number of rows have been returned as execute result
let mut returned = 0;

let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);

// Range Scan
// WARN: DO NOT use `select` to execute range scans concurrently
// it can consume too much memory if there're too many ranges.
for range in scan_ranges {
println!("test");
let stream = Self::execute_range(
table.clone(),
range,
// ordered,
epoch.clone(),
old_epoch.clone(),
new_epoch.clone(),
chunk_size,
// limit,
histogram.clone(),
);
#[for_await]
for chunk in stream {
println!("{:?}", chunk);
let chunk = chunk?;
returned += chunk.cardinality() as u64;
yield chunk;
// if let Some(limit) = &limit
// && returned >= *limit
// {
// return Ok(());
// }
}
}
}
Expand All @@ -226,63 +194,49 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
async fn execute_range(
table: Arc<StorageTable<S>>,
scan_range: ScanRange,
// ordered: bool,
epoch: BatchQueryEpoch,
old_epoch: BatchQueryEpoch,
new_epoch: BatchQueryEpoch,
chunk_size: usize,
// limit: Option<u64>,
histogram: Option<impl Deref<Target = Histogram>>,
) {
let epoch1 = BatchQueryEpoch{ epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(65536))};
let epoch2 = BatchQueryEpoch{ epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(MAX_EPOCH - 1))};

let ScanRange {
pk_prefix,
next_col_bounds,
} = scan_range;

let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()];
let (start_bound, end_bound) = if order_type.is_ascending() {
(next_col_bounds.0, next_col_bounds.1)
} else {
(next_col_bounds.1, next_col_bounds.0)
};

let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded);
let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded);

// Range Scan.
let (start_bound, end_bound) = (next_col_bounds.0, next_col_bounds.1);
assert!(pk_prefix.len() < table.pk_indices().len());
// Range Scan.
let iter = table
.batch_iter_log_with_pk_bounds(
epoch1.into(),
epoch2.into(),
old_epoch.into(),
new_epoch.into(),
&pk_prefix,
(
match start_bound {
Bound::Unbounded => {
if end_bound_is_bounded && order_type.nulls_are_first() {
if order_type.nulls_are_first() {
// `NULL`s are at the start bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])),
Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])),
_ => unimplemented!("Log iter range need full"),
},
match end_bound {
Bound::Unbounded => {
if start_bound_is_bounded && order_type.nulls_are_last() {
if order_type.nulls_are_last() {
// `NULL`s are at the end bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])),
Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])),
_ => unimplemented!("Log iter range need full"),
},
),
)
Expand All @@ -298,7 +252,6 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
if let Some(timer) = timer {
timer.observe_duration()
}
println!("chunk...{:?}", chunk);

if let Some(chunk) = chunk {
yield chunk
Expand Down
1 change: 0 additions & 1 deletion src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
#[async_recursion]
async fn try_build(&self) -> Result<BoxedExecutor> {
let mut inputs = Vec::with_capacity(self.plan_node.children.len());
println!("input_node: {:?}", self.plan_node);
for input_node in &self.plan_node.children {
let input = self.clone_for_plan(input_node).build().await?;
inputs.push(input);
Expand Down
2 changes: 0 additions & 2 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
source: &ExecutorBuilder<'_, C>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
println!("222");
ensure!(
inputs.is_empty(),
"Row sequential scan should not have input executor!"
Expand All @@ -178,7 +177,6 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
.copied()
.map(ColumnId::from)
.collect();

let vnodes = match &seq_scan_node.vnode_bitmap {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::types::DataType;

/// Column ID is the unique identifier of a column in a table. Different from table ID, column ID is
/// not globally unique.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct ColumnId(i32);

impl std::fmt::Debug for ColumnId {
Expand Down
Loading

0 comments on commit da9d45d

Please sign in to comment.