Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Apr 28, 2024
1 parent 6388cfb commit 080b2ec
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 207 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ message FilterNode {

message LogRowSeqScanNode{
plan_common.StorageTableDesc table_desc = 1;
// This records the mandatory column_ids of the original table, excluding op
repeated int32 column_ids = 2;
// The partition to read for scan tasks.
//
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ arrow-schema = { workspace = true }
assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
bytes = { version = "1", features = ["serde"] }
either = "1"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand Down
37 changes: 27 additions & 10 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@
use std::ops::{Bound, Deref};
use std::sync::Arc;

use bytes::Bytes;
use futures::prelude::stream::StreamExt;
use futures_async_stream::try_stream;
use futures_util::pin_mut;
use itertools::Itertools;
use prometheus::Histogram;
use risingwave_common::array::DataChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::ScalarImpl;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::table::{collect_data_chunk, KeyedRow, TableDistribution};
use risingwave_storage::{dispatch_state_store, StateStore};

use super::{
Expand All @@ -40,6 +43,8 @@ use crate::task::BatchTaskContext;
pub struct LogRowSeqScanExecutor<S: StateStore> {
chunk_size: usize,
identity: String,
// It is table schema + op column
schema: Schema,

/// Batch metrics.
/// None: Local mode don't record mertics.
Expand All @@ -61,9 +66,15 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
identity: String,
metrics: Option<BatchMetricsWithTaskLabels>,
) -> Self {
let mut schema = table.schema().clone();
schema.fields.push(Field::with_name(
risingwave_common::types::DataType::Int16,
"op",
));
Self {
chunk_size,
identity,
schema,
metrics,
table,
scan_ranges,
Expand Down Expand Up @@ -125,7 +136,7 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
}
impl<S: StateStore> Executor for LogRowSeqScanExecutor<S> {
fn schema(&self) -> &Schema {
self.table.schema()
&self.schema
}

fn identity(&self) -> &str {
Expand All @@ -148,15 +159,9 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
scan_ranges,
old_epoch,
new_epoch,
schema,
} = *self;
let table = std::sync::Arc::new(table);
let mut schema = table.schema().clone();
// Add op column
schema.fields.push(Field::with_name(
risingwave_common::types::DataType::Int16,
"op",
));
let schema = Arc::new(schema);

// Create collector.
let histogram = metrics.as_ref().map(|metrics| {
Expand All @@ -176,7 +181,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
new_epoch.clone(),
chunk_size,
histogram.clone(),
schema.clone(),
Arc::new(schema.clone()),
);
#[for_await]
for chunk in stream {
Expand Down Expand Up @@ -243,6 +248,18 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
loop {
let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

let mut iter = iter.as_mut().map(|r| match r {
Ok((op, value)) => {
let (k, row) = value.into_owned_row_key();
let full_row = row
.into_iter()
.chain(vec![Some(ScalarImpl::Int16(op))])
.collect_vec();
let row = OwnedRow::new(full_row);
Ok(KeyedRow::<Bytes>::new(k, row))
}
Err(e) => Err(e),
});
let chunk = collect_data_chunk(&mut iter, &schema, Some(chunk_size))
.await
.map_err(BatchError::from)?;
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::types::DataType;

/// Column ID is the unique identifier of a column in a table. Different from table ID, column ID is
/// not globally unique.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ColumnId(i32);

impl std::fmt::Debug for ColumnId {
Expand Down
11 changes: 1 addition & 10 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use either::Either;
use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{is_system_schema, ColumnCatalog, Field};
use risingwave_common::catalog::{is_system_schema, Field};
use risingwave_common::session_config::USER_NAME_WILD_CARD;
use risingwave_connector::WithPropertiesExt;
use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias};
Expand All @@ -42,15 +42,6 @@ pub struct BoundBaseTable {
pub as_of: Option<AsOf>,
}

#[derive(Debug, Clone)]
pub struct BoundLogTable {
pub table_id: TableId,
pub table_catalog: Arc<TableCatalog>,
pub new_epoch: u64,
pub old_epoch: u64,
pub op_column: ColumnCatalog,
}

#[derive(Debug, Clone)]
pub struct BoundSystemTable {
pub table_id: TableId,
Expand Down
28 changes: 4 additions & 24 deletions src/frontend/src/handler/declare_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::rc::Rc;
use fixedbitset::FixedBitSet;
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::session_config::QueryMode;
use risingwave_common::util::epoch::Epoch;
use risingwave_sqlparser::ast::{DeclareCursorStatement, ObjectName, Query, Since, Statement};
Expand Down Expand Up @@ -148,42 +147,23 @@ pub fn create_batch_plan_for_cursor(
new_epoch: u64,
) -> Result<BatchQueryPlanResult> {
let context = OptimizerContext::from_handler_args(handle_args.clone());
let mut out_col_idx = table_catalog
let out_col_idx = table_catalog
.columns
.iter()
.enumerate()
.filter(|(_, v)| !v.is_hidden)
.map(|(i, _)| i)
.collect::<Vec<_>>();
out_col_idx.push(out_col_idx.len() + 1);
let next_column_id = table_catalog
.columns
.iter()
.max_by(|a, b| {
a.column_desc
.column_id
.get_id()
.cmp(&b.column_desc.column_id.get_id())
})
.map(|c| c.column_desc.column_id)
.unwrap_or_default()
.next();
let op_column = ColumnDesc::named(
"op",
next_column_id,
risingwave_common::types::DataType::Int16,
);
let core = generic::LogScan::new(
table_catalog.name.clone(),
out_col_idx,
Rc::new(table_catalog.table_desc()),
Rc::new(op_column),
Rc::new(context),
old_epoch,
new_epoch,
);
let batch_log_seq_scan = BatchLogSeqScan::new(core);
let out_fields = FixedBitSet::from_iter(0..batch_log_seq_scan.schema().len());
let out_fields = FixedBitSet::from_iter(0..batch_log_seq_scan.core().schema().len());
let out_names = batch_log_seq_scan.core().column_names();
// Here we just need a plan_root to call the method, only out_fields and out_names will be used
let mut plan_root = PlanRoot::new(
Expand All @@ -193,11 +173,11 @@ pub fn create_batch_plan_for_cursor(
out_fields,
out_names,
);
let schema = batch_log_seq_scan.schema().clone();
let schema = batch_log_seq_scan.core().schema().clone();
let (batch_log_seq_scan, query_mode) = match handle_args.session.config().query_mode() {
QueryMode::Auto => (
plan_root.gen_batch_distributed_plan(PlanRef::from(batch_log_seq_scan))?,
QueryMode::Distributed,
QueryMode::Local,
),
QueryMode::Local => (
plan_root.gen_batch_local_plan(PlanRef::from(batch_log_seq_scan))?,
Expand Down
30 changes: 5 additions & 25 deletions src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::Schema;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::LogRowSeqScanNode;
use risingwave_pb::common::BatchQueryEpoch;
Expand All @@ -23,7 +22,6 @@ use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, ToDistributedBatch, TryToBatchPb};
use crate::catalog::ColumnId;
use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, DistributionDisplay, Order};
Expand All @@ -39,15 +37,11 @@ pub struct BatchLogSeqScan {
impl BatchLogSeqScan {
fn new_inner(core: generic::LogScan, dist: Distribution) -> Self {
let order = Order::any();
let base = PlanBase::new_batch_with_core(&core, dist, order);
let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order);

Self { base, core }
}

pub fn schema(&self) -> &Schema {
self.base.schema()
}

pub fn new(core: generic::LogScan) -> Self {
// Use `Single` by default, will be updated later with `clone_with_dist`.
Self::new_inner(core, Distribution::Single)
Expand Down Expand Up @@ -84,7 +78,7 @@ impl_plan_tree_node_for_leaf! { BatchLogSeqScan }
impl Distill for BatchLogSeqScan {
fn distill<'a>(&self) -> XmlNode<'a> {
let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(4);
let mut vec = Vec::with_capacity(3);
vec.push(("table", Pretty::from(self.core.table_name.clone())));
vec.push(("columns", self.core.columns_pretty(verbose)));

Expand Down Expand Up @@ -112,7 +106,7 @@ impl TryToBatchPb for BatchLogSeqScan {
table_desc: Some(self.core.table_desc.try_to_protobuf()?),
column_ids: self
.core
.output_column_ids_to_batch()
.output_column_ids()
.iter()
.map(ColumnId::get_id)
.collect(),
Expand Down Expand Up @@ -144,20 +138,6 @@ impl ToLocalBatch for BatchLogSeqScan {
}
}

impl ExprRewritable for BatchLogSeqScan {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
let core = self.core.clone();
core.rewrite_exprs(r);
Self::new(core).into()
}
}
impl ExprRewritable for BatchLogSeqScan {}

impl ExprVisitable for BatchLogSeqScan {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
self.core.visit_exprs(v);
}
}
impl ExprVisitable for BatchLogSeqScan {}
Loading

0 comments on commit 080b2ec

Please sign in to comment.