Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
xxhZs committed Apr 25, 2024
1 parent da9d45d commit 24a6a38
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 60 deletions.
27 changes: 12 additions & 15 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures_util::pin_mut;
use prometheus::Histogram;
use risingwave_common::array::DataChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnId, Schema};
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::BatchQueryEpoch;
Expand Down Expand Up @@ -91,12 +91,6 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
)?;

let table_desc: &StorageTableDesc = log_store_seq_scan_node.get_table_desc()?;
let op_id = log_store_seq_scan_node
.column_ids
.iter()
.max()
.map(ColumnId::from)
.unwrap();
let column_ids = log_store_seq_scan_node
.column_ids
.iter()
Expand All @@ -116,13 +110,7 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
let metrics = source.context().batch_metrics();

dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial_inner(
state_store,
column_ids,
vnodes,
table_desc,
vec![op_id],
);
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
Ok(Box::new(LogRowSeqScanExecutor::new(
table,
scan_ranges,
Expand Down Expand Up @@ -162,6 +150,13 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
new_epoch,
} = *self;
let table = std::sync::Arc::new(table);
let mut schema = table.schema().clone();
// Add op column
schema.fields.push(Field::with_name(
risingwave_common::types::DataType::Int16,
"op",
));
let schema = Arc::new(schema);

// Create collector.
let histogram = metrics.as_ref().map(|metrics| {
Expand All @@ -181,6 +176,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
new_epoch.clone(),
chunk_size,
histogram.clone(),
schema.clone(),
);
#[for_await]
for chunk in stream {
Expand All @@ -198,6 +194,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
new_epoch: BatchQueryEpoch,
chunk_size: usize,
histogram: Option<impl Deref<Target = Histogram>>,
schema: Arc<Schema>,
) {
let ScanRange {
pk_prefix,
Expand Down Expand Up @@ -246,7 +243,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
loop {
let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

let chunk = collect_data_chunk(&mut iter, table.schema(), Some(chunk_size))
let chunk = collect_data_chunk(&mut iter, &schema, Some(chunk_size))
.await
.map_err(BatchError::from)?;
if let Some(timer) = timer {
Expand Down
21 changes: 12 additions & 9 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,8 @@ impl Binder {
old_epoch: u64,
new_epoch: u64,
) -> Result<Relation> {
let resolve_log_table_relation = |log_table_catalog: &std::sync::Arc<TableCatalog>| {
let log_table_catalog = &*log_table_catalog.clone();
let mut log_table_catalog = log_table_catalog.clone();
let next_column_id = log_table_catalog
let resolve_log_table_relation = |table_catalog: &std::sync::Arc<TableCatalog>| {
let next_column_id = table_catalog
.columns
.iter()
.max_by(|a, b| {
Expand All @@ -429,21 +427,26 @@ impl Binder {
.map(|c| c.column_desc.column_id)
.unwrap_or_default()
.next();

log_table_catalog.columns.push(ColumnCatalog {
let op_column = ColumnCatalog {
column_desc: ColumnDesc::named(
"op",
next_column_id,
risingwave_common::types::DataType::Int16,
),
is_hidden: false,
});
};

let log_table_catalog = &*table_catalog.clone();
let mut log_table_catalog = log_table_catalog.clone();
log_table_catalog.columns.push(op_column.clone());

let log_table_catalog = std::sync::Arc::new(log_table_catalog);
let table = BoundLogTable {
table_id: log_table_catalog.id(),
table_catalog: log_table_catalog.clone(),
table_id: table_catalog.id(),
table_catalog: table_catalog.clone(), // We save the original TableCatalog
old_epoch,
new_epoch,
op_column,
};
(
Relation::LogTable(Box::new(table)),
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use either::Either;
use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{is_system_schema, Field};
use risingwave_common::catalog::{is_system_schema, ColumnCatalog, Field};
use risingwave_common::session_config::USER_NAME_WILD_CARD;
use risingwave_connector::WithPropertiesExt;
use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias};
Expand Down Expand Up @@ -48,6 +48,7 @@ pub struct BoundLogTable {
pub table_catalog: Arc<TableCatalog>,
pub new_epoch: u64,
pub old_epoch: u64,
pub op_column: ColumnCatalog,
}

#[derive(Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl TryToBatchPb for BatchLogSeqScan {
table_desc: Some(self.core.table_desc.try_to_protobuf()?),
column_ids: self
.core
.output_column_ids()
.output_column_ids_to_batch()
.iter()
.map(ColumnId::get_id)
.collect(),
Expand Down
29 changes: 22 additions & 7 deletions src/frontend/src/optimizer/plan_node/generic/log_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::rc::Rc;

use educe::Educe;
use pretty_xmlish::Pretty;
use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema, TableDesc};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::sort_util::ColumnOrder;

Expand All @@ -35,6 +35,8 @@ pub struct LogScan {
pub output_col_idx: Vec<usize>,
/// Descriptor of the table
pub table_desc: Rc<TableDesc>,
/// Catalog of the op column
pub op_column: Rc<ColumnCatalog>,
/// Help `RowSeqLogScan` executor use a better chunk size
pub chunk_size: Option<u32>,

Expand All @@ -59,6 +61,14 @@ impl LogScan {
.collect()
}

pub fn output_column_ids_to_batch(&self) -> Vec<ColumnId> {
self.output_col_idx
.iter()
.map(|i| self.get_table_columns()[*i].column_id)
.filter(|i| i != &self.op_column.column_desc.column_id)
.collect()
}

pub fn primary_key(&self) -> &[ColumnOrder] {
&self.table_desc.pk
}
Expand Down Expand Up @@ -116,6 +126,7 @@ impl LogScan {
table_name: String,
output_col_idx: Vec<usize>,
table_desc: Rc<TableDesc>,
op_column: Rc<ColumnCatalog>,
ctx: OptimizerContextRef,
old_epoch: u64,
new_epoch: u64,
Expand All @@ -124,6 +135,7 @@ impl LogScan {
table_name,
output_col_idx,
table_desc,
op_column,
chunk_size: None,
ctx,
old_epoch,
Expand All @@ -145,14 +157,14 @@ impl LogScan {

pub(crate) fn get_id_to_op_idx_mapping(
output_col_idx: &[usize],
table_desc: &Rc<TableDesc>,
columns: Vec<ColumnDesc>,
) -> HashMap<ColumnId, usize> {
let mut id_to_op_idx = HashMap::new();
output_col_idx
.iter()
.enumerate()
.for_each(|(op_idx, tb_idx)| {
let col = &table_desc.columns[*tb_idx];
let col = &columns[*tb_idx];
id_to_op_idx.insert(col.column_id, op_idx);
});
id_to_op_idx
Expand All @@ -173,13 +185,14 @@ impl GenericPlanNode for LogScan {
}

fn stream_key(&self) -> Option<Vec<usize>> {
let id_to_op_idx = Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_desc);
let id_to_op_idx =
Self::get_id_to_op_idx_mapping(&self.output_col_idx, self.get_table_columns());
self.table_desc
.stream_key
.iter()
.map(|&c| {
id_to_op_idx
.get(&self.table_desc.columns[c].column_id)
.get(&self.get_table_columns()[c].column_id)
.copied()
})
.collect::<Option<Vec<_>>>()
Expand All @@ -200,8 +213,10 @@ impl GenericPlanNode for LogScan {
}

impl LogScan {
pub fn get_table_columns(&self) -> &[ColumnDesc] {
&self.table_desc.columns
pub fn get_table_columns(&self) -> Vec<ColumnDesc> {
let mut columns = self.table_desc.columns.clone();
columns.push(self.op_column.column_desc.clone());
columns
}

/// Get the descs of the output columns.
Expand Down
9 changes: 6 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_log_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::rc::Rc;

use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{ColumnDesc, TableDesc};
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, TableDesc};

use super::batch_log_seq_scan::BatchLogSeqScan;
use super::generic::GenericPlanRef;
Expand Down Expand Up @@ -62,14 +62,16 @@ impl LogicalLogScan {
pub fn create(
table_name: String, // explain-only
table_desc: Rc<TableDesc>,
op_column: Rc<ColumnCatalog>,
ctx: OptimizerContextRef,
old_epoch: u64,
new_epoch: u64,
) -> Self {
generic::LogScan::new(
table_name,
(0..table_desc.columns.len()).collect(),
(0..(table_desc.columns.len() + 1)).collect(),
table_desc,
op_column,
ctx,
old_epoch,
new_epoch,
Expand Down Expand Up @@ -98,6 +100,7 @@ impl LogicalLogScan {
self.table_name().to_string(),
output_col_idx,
self.core.table_desc.clone(),
self.core.op_column.clone(),
self.base.ctx().clone(),
self.core.old_epoch,
self.core.new_epoch,
Expand Down Expand Up @@ -131,7 +134,7 @@ impl Distill for LogicalLogScan {
self.output_col_idx()
.iter()
.map(|i| {
let col_name = &self.log_table_desc().columns[*i].name;
let col_name = &self.column_descs()[*i].name;
Pretty::from(if verbose {
format!("{}.{}", self.table_name(), col_name)
} else {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl Planner {
Ok(LogicalLogScan::create(
log_table.table_catalog.name().to_string(),
Rc::new(log_table.table_catalog.table_desc()),
Rc::new(log_table.op_column.clone()),
self.ctx(),
log_table.old_epoch,
log_table.new_epoch,
Expand Down
27 changes: 3 additions & 24 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,6 @@ impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for StorageTableInner<S,

// init
impl<S: StateStore> StorageTableInner<S, EitherSerde> {
pub fn new_partial(
store: S,
output_column_ids: Vec<ColumnId>,
vnodes: Option<Arc<Bitmap>>,
table_desc: &StorageTableDesc,
) -> Self {
Self::new_partial_inner(store, output_column_ids, vnodes, table_desc, vec![])
}

/// Create a [`StorageTableInner`] given a complete set of `columns` and a partial
/// set of `output_column_ids`.
/// When reading from the storage table,
Expand All @@ -129,13 +120,11 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
/// from those supplied to associated executors.
/// These `output_column_ids` may have `pk` appended, since they will be needed to scan from
/// storage. The associated executors may not have these `pk` fields.
pub fn new_partial_inner(
pub fn new_partial(
store: S,
output_column_ids: Vec<ColumnId>,
vnodes: Option<Arc<Bitmap>>,
table_desc: &StorageTableDesc,
// Use for log iter's op
excluded_indices: Vec<ColumnId>,
) -> Self {
let table_id = TableId {
table_id: table_desc.table_id,
Expand Down Expand Up @@ -179,7 +168,6 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
distribution,
table_option,
value_indices,
excluded_indices,
prefix_hint_len,
versioned,
)
Expand All @@ -204,7 +192,6 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
TableDistribution::singleton(),
Default::default(),
value_indices,
vec![],
0,
false,
)
Expand Down Expand Up @@ -241,27 +228,19 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
distribution: TableDistribution,
table_option: TableOption,
value_indices: Vec<usize>,
excluded_column_ids: Vec<ColumnId>,
read_prefix_len_hint: usize,
versioned: bool,
) -> Self {
assert_eq!(order_types.len(), pk_indices.len());

let (output_columns, output_indices) =
find_columns_by_ids(&table_columns, &output_column_ids);
let (excluded_columns, excluded_indices) =
find_columns_by_ids(&table_columns, &excluded_column_ids);
if !excluded_columns.is_empty() {
// Now we only exclude the 'op'
assert_eq!(excluded_columns.first().unwrap().name, "op")
}

let mut value_output_indices = vec![];
let mut key_output_indices = vec![];

for idx in &output_indices {
if excluded_indices.contains(idx) {
continue;
} else if value_indices.contains(idx) {
if value_indices.contains(idx) {
value_output_indices.push(*idx);
} else {
key_output_indices.push(*idx);
Expand Down

0 comments on commit 24a6a38

Please sign in to comment.