Skip to content

Commit

Permalink
fix(frontend): avoid panic when querying log store internal table (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored and Little-Wallace committed Jan 20, 2024
1 parent 96903ef commit 5500c70
Show file tree
Hide file tree
Showing 22 changed files with 167 additions and 103 deletions.
18 changes: 10 additions & 8 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;

use anyhow::anyhow;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_pb::common::PbColumnOrder;
Expand Down Expand Up @@ -79,7 +80,7 @@ impl TableDesc {
.collect()
}

pub fn to_protobuf(&self) -> StorageTableDesc {
pub fn try_to_protobuf(&self) -> anyhow::Result<StorageTableDesc> {
let dist_key_indices: Vec<u32> = self.distribution_key.iter().map(|&k| k as u32).collect();
let pk_indices: Vec<u32> = self
.pk
Expand All @@ -102,19 +103,20 @@ impl TableDesc {
pk_indices
.iter()
.position(|&pi| di == pi)
.unwrap_or_else(|| {
panic!(
.ok_or_else(|| {
anyhow!(
"distribution key {:?} must be a subset of primary key {:?}",
dist_key_indices, pk_indices
dist_key_indices,
pk_indices
)
})
.map(|d| d as u32)
})
.map(|d| d as u32)
.collect_vec()
.try_collect()?
} else {
Vec::new()
};
StorageTableDesc {
Ok(StorageTableDesc {
table_id: self.table_id.into(),
columns: self.columns.iter().map(Into::into).collect(),
pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
Expand All @@ -125,7 +127,7 @@ impl TableDesc {
versioned: self.versioned,
stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
vnode_col_idx_in_pk,
}
})
}

/// Helper function to create a mapping from `column id` to `column index`
Expand Down
13 changes: 8 additions & 5 deletions src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,21 @@ pub async fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -
.await
}

pub fn make_storage_table<S: StateStore>(hummock: S, table: &TableCatalog) -> StorageTable<S> {
pub fn make_storage_table<S: StateStore>(
hummock: S,
table: &TableCatalog,
) -> Result<StorageTable<S>> {
let output_columns_ids = table
.columns()
.iter()
.map(|x| x.column_desc.column_id)
.collect();
StorageTable::new_partial(
Ok(StorageTable::new_partial(
hummock,
output_columns_ids,
Some(TableDistribution::all_vnodes()),
&table.table_desc().to_protobuf(),
)
&table.table_desc().try_to_protobuf()?,
))
}

pub async fn scan(context: &CtlContext, mv_name: String, data_dir: Option<String>) -> Result<()> {
Expand All @@ -106,7 +109,7 @@ async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore<HummockStorag

println!("Rows:");
let read_epoch = hummock.inner().get_pinned_version().max_committed_epoch();
let storage_table = make_storage_table(hummock, &table);
let storage_table = make_storage_table(hummock, &table)?;
let stream = storage_table
.batch_iter(
HummockReadEpoch::Committed(read_epoch),
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ impl TestCase {
// Only generate batch_plan_proto if it is specified in test case
if self.expected_outputs.contains(&TestType::BatchPlanProto) {
ret.batch_plan_proto = Some(serde_yaml::to_string(
&batch_plan.to_batch_prost_identity(false),
&batch_plan.to_batch_prost_identity(false)?,
)?);
}
}
Expand Down Expand Up @@ -772,7 +772,7 @@ impl TestCase {

// Only generate stream_dist_plan if it is specified in test case
if dist_plan {
let graph = build_graph(stream_plan);
let graph = build_graph(stream_plan)?;
*ret_dist_plan_str = Some(explain_stream_graph(&graph, false));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ pub async fn handle_create_index(
include,
distributed_by,
)?;
let mut graph = build_graph(plan);
let mut graph = build_graph(plan)?;
graph.parallelism =
session
.config()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ It only indicates the physical clustering of the data, which may improve the per
}
let can_run_in_background = plan_has_backfill_leaf_nodes(&plan);
let context = plan.plan_base().ctx().clone();
let mut graph = build_graph(plan);
let mut graph = build_graph(plan)?;
graph.parallelism =
session
.config()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ pub async fn handle_create_sink(
);
}

let mut graph = build_graph(plan);
let mut graph = build_graph(plan)?;

graph.parallelism =
session
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ pub async fn handle_create_source(

// generate stream graph for cdc source job
let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
let mut graph = build_graph(stream_plan);
let mut graph = build_graph(stream_plan)?;
graph.parallelism =
session
.config()
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ pub async fn handle_create_table(
)
.await?;

let mut graph = build_graph(plan);
let mut graph = build_graph(plan)?;
graph.parallelism =
session
.config()
Expand Down Expand Up @@ -1106,7 +1106,7 @@ pub async fn generate_stream_graph_for_table(
.map(|parallelism| Parallelism {
parallelism: parallelism.get(),
}),
..build_graph(plan)
..build_graph(plan)?
};

// Fill the original table ID.
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub async fn handle_create_as(
append_only,
Some(col_id_gen.into_version()),
)?;
let mut graph = build_graph(plan);
let mut graph = build_graph(plan)?;
graph.parallelism =
session
.config()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async fn do_handle_explain(
)?);
}
Convention::Stream => {
let graph = build_graph(plan.clone());
let graph = build_graph(plan.clone())?;
blocks.push(explain_stream_graph(&graph, explain_verbose));
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#![feature(result_flattening)]
#![feature(error_generic_member_access)]
#![feature(round_ties_even)]
#![feature(iterator_try_collect)]
#![recursion_limit = "256"]

#[cfg(test)]
Expand Down
17 changes: 9 additions & 8 deletions src/frontend/src/optimizer/plan_node/batch_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use crate::expr::{Expr, ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{
EqJoinPredicate, EqJoinPredicateDisplay, LogicalScan, PlanBase, PlanTreeNodeUnary, ToBatchPb,
ToDistributedBatch, ToLocalBatch,
EqJoinPredicate, EqJoinPredicateDisplay, LogicalScan, PlanBase, PlanTreeNodeUnary,
ToDistributedBatch, ToLocalBatch, TryToBatchPb,
};
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::optimizer::PlanRef;
use crate::scheduler::SchedulerResult;
use crate::utils::ColIndexMappingRewriteExt;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -200,9 +201,9 @@ impl ToDistributedBatch for BatchLookupJoin {
}
}

impl ToBatchPb for BatchLookupJoin {
fn to_batch_prost_body(&self) -> NodeBody {
if self.distributed_lookup {
impl TryToBatchPb for BatchLookupJoin {
fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
Ok(if self.distributed_lookup {
NodeBody::DistributedLookupJoin(DistributedLookupJoinNode {
join_type: self.core.join_type as i32,
condition: self
Expand All @@ -222,7 +223,7 @@ impl ToBatchPb for BatchLookupJoin {
.into_iter()
.map(|a| a as _)
.collect(),
inner_side_table_desc: Some(self.right_table_desc.to_protobuf()),
inner_side_table_desc: Some(self.right_table_desc.try_to_protobuf()?),
inner_side_column_ids: self
.right_output_column_ids
.iter()
Expand Down Expand Up @@ -252,7 +253,7 @@ impl ToBatchPb for BatchLookupJoin {
.into_iter()
.map(|a| a as _)
.collect(),
inner_side_table_desc: Some(self.right_table_desc.to_protobuf()),
inner_side_table_desc: Some(self.right_table_desc.try_to_protobuf()?),
inner_side_vnode_mapping: vec![], // To be filled in at local.rs
inner_side_column_ids: self
.right_output_column_ids
Expand All @@ -264,7 +265,7 @@ impl ToBatchPb for BatchLookupJoin {
null_safe: self.eq_join_predicate.null_safes(),
lookup_prefix_len: self.lookup_prefix_len as u32,
})
}
})
}
}

Expand Down
15 changes: 8 additions & 7 deletions src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ use risingwave_pb::batch_plan::RowSeqScanNode;

use super::batch::prelude::*;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch};
use super::{generic, ExprRewritable, PlanBase, PlanRef, ToDistributedBatch};
use crate::catalog::ColumnId;
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::plan_node::{ToLocalBatch, TryToBatchPb};
use crate::optimizer::property::{Distribution, DistributionDisplay, Order};
use crate::scheduler::SchedulerResult;

/// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -232,10 +233,10 @@ impl ToDistributedBatch for BatchSeqScan {
}
}

impl ToBatchPb for BatchSeqScan {
fn to_batch_prost_body(&self) -> NodeBody {
NodeBody::RowSeqScan(RowSeqScanNode {
table_desc: Some(self.core.table_desc.to_protobuf()),
impl TryToBatchPb for BatchSeqScan {
fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
Ok(NodeBody::RowSeqScan(RowSeqScanNode {
table_desc: Some(self.core.table_desc.try_to_protobuf()?),
column_ids: self
.core
.output_column_ids()
Expand All @@ -247,7 +248,7 @@ impl ToBatchPb for BatchSeqScan {
vnode_bitmap: None,
ordered: !self.order().is_any(),
limit: *self.limit(),
})
}))
}
}

Expand Down
26 changes: 15 additions & 11 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,10 @@ impl dyn PlanNode {
///
/// Note that [`StreamTableScan`] has its own implementation of `to_stream_prost`. We have a
/// hook inside to do some ad-hoc thing for [`StreamTableScan`].
pub fn to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> StreamPlanPb {
pub fn to_stream_prost(
&self,
state: &mut BuildFragmentGraphState,
) -> SchedulerResult<StreamPlanPb> {
use stream::prelude::*;

if let Some(stream_table_scan) = self.as_stream_table_scan() {
Expand All @@ -691,14 +694,14 @@ impl dyn PlanNode {
return stream_share.adhoc_to_stream_prost(state);
}

let node = Some(self.to_stream_prost_body(state));
let node = Some(self.try_to_stream_prost_body(state)?);
let input = self
.inputs()
.into_iter()
.map(|plan| plan.to_stream_prost(state))
.collect();
.try_collect()?;
// TODO: support pk_indices and operator_id
StreamPlanPb {
Ok(StreamPlanPb {
input,
identity: self.explain_myself_to_string(),
node_body: node,
Expand All @@ -711,32 +714,32 @@ impl dyn PlanNode {
.collect(),
fields: self.schema().to_prost(),
append_only: self.plan_base().append_only(),
}
})
}

/// Serialize the plan node and its children to a batch plan proto.
pub fn to_batch_prost(&self) -> BatchPlanPb {
pub fn to_batch_prost(&self) -> SchedulerResult<BatchPlanPb> {
self.to_batch_prost_identity(true)
}

/// Serialize the plan node and its children to a batch plan proto without the identity field
/// (for testing).
pub fn to_batch_prost_identity(&self, identity: bool) -> BatchPlanPb {
let node_body = Some(self.to_batch_prost_body());
pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult<BatchPlanPb> {
let node_body = Some(self.try_to_batch_prost_body()?);
let children = self
.inputs()
.into_iter()
.map(|plan| plan.to_batch_prost_identity(identity))
.collect();
BatchPlanPb {
.try_collect()?;
Ok(BatchPlanPb {
children,
identity: if identity {
self.explain_myself_to_string()
} else {
"".into()
},
node_body,
}
})
}

pub fn explain_myself_to_string(&self) -> String {
Expand Down Expand Up @@ -956,6 +959,7 @@ use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_rewriter::PlanCloner;
use crate::optimizer::plan_visitor::ExprCorrelatedIdFinder;
use crate::scheduler::SchedulerResult;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Endo, Layer, Visit};

Expand Down
10 changes: 7 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::handler::create_source::debezium_cdc_source_schema;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::scheduler::SchedulerResult;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::{Explain, TableCatalog};

Expand Down Expand Up @@ -131,7 +132,10 @@ impl StreamNode for StreamCdcTableScan {
}

impl StreamCdcTableScan {
pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> PbStreamNode {
pub fn adhoc_to_stream_prost(
&self,
state: &mut BuildFragmentGraphState,
) -> SchedulerResult<PbStreamNode> {
use risingwave_pb::stream_plan::*;

let stream_key = self
Expand Down Expand Up @@ -254,15 +258,15 @@ impl StreamCdcTableScan {
});

// plan: merge -> filter -> exchange(simple) -> stream_scan
PbStreamNode {
Ok(PbStreamNode {
fields: self.schema().to_prost(),
input: vec![exchange_stream_node],
node_body: Some(stream_scan_body),
stream_key,
operator_id: self.base.id().0 as u64,
identity: self.distill_to_string(),
append_only: self.append_only(),
}
})
}

pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl {
Expand Down
Loading

0 comments on commit 5500c70

Please sign in to comment.