Skip to content

Commit

Permalink
refactor(meta): simplify stream job table/source id assignment (#19171)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
  • Loading branch information
xxchan and BugenZhao authored Nov 4, 2024
1 parent 7690f67 commit a9f8945
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 124 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ c583e2c6c054764249acf484438c7bf7197765f4

# chore: replace all ProstXxx with PbXxx (#8621)
6fd8821f2e053957b183d648bea9c95b6703941f

# chore: cleanup v2 naming for sql metastore (#18941)
9a6a7f9052d5679165ff57cc01417c742c95351c
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ message StreamSourceInfo {
}

message Source {
// For shared source, this is the same as the job id.
// For non-shared source and table with connector, this is a different oid.
uint32 id = 1;
uint32 schema_id = 2;
uint32 database_id = 3;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use risingwave_sqlparser::ast::{
use risingwave_sqlparser::parser::Parser;

use super::create_source::get_json_schema_location;
use super::create_table::{generate_stream_graph_for_table, ColumnIdGenerator};
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
Expand Down Expand Up @@ -192,7 +192,7 @@ pub async fn get_replace_table_plan(
panic!("unexpected statement type: {:?}", definition);
};

let (mut graph, table, source, job_type) = generate_stream_graph_for_table(
let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
session,
table_name,
original_catalog,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{rewrite_now_to_proctime, ExprImpl, InputRef};
use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
use crate::handler::create_mv::parse_column_names;
use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator};
use crate::handler::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use crate::handler::privilege::resolve_query_privileges;
use crate::handler::util::SourceSchemaCompatExt;
use crate::handler::HandlerArgs;
Expand Down Expand Up @@ -672,7 +672,7 @@ pub(crate) async fn reparse_table_for_sink(
panic!("unexpected statement type: {:?}", definition);
};

let (graph, table, source, _) = generate_stream_graph_for_table(
let (graph, table, source, _) = generate_stream_graph_for_replace_table(
session,
table_name,
table_catalog,
Expand Down
18 changes: 12 additions & 6 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use risingwave_connector::source::cdc::external::{
ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
use risingwave_connector::{source, WithOptionsSecResolved};
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, Table, WatermarkDesc};
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
Expand Down Expand Up @@ -1322,7 +1323,7 @@ pub fn check_create_table_with_source(
}

#[allow(clippy::too_many_arguments)]
pub async fn generate_stream_graph_for_table(
pub async fn generate_stream_graph_for_replace_table(
_session: &Arc<SessionImpl>,
table_name: ObjectName,
original_catalog: &Arc<TableCatalog>,
Expand All @@ -1341,7 +1342,7 @@ pub async fn generate_stream_graph_for_table(
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;

let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
(Some(format_encode), None) => (
gen_create_table_plan_with_source(
handler_args,
Expand Down Expand Up @@ -1441,13 +1442,18 @@ pub async fn generate_stream_graph_for_table(
let graph = build_graph(plan)?;

// Fill the original table ID.
let table = Table {
let mut table = Table {
id: original_catalog.id().table_id(),
optional_associated_source_id: original_catalog
.associated_source_id()
.map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
..table
};
if let Some(source_id) = original_catalog.associated_source_id() {
table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(
source_id.table_id,
));
source.as_mut().unwrap().id = source_id.table_id;
source.as_mut().unwrap().optional_associated_table_id =
Some(OptionalAssociatedTableId::AssociatedTableId(table.id))
}

Ok((graph, table, source, job_type))
}
Expand Down
37 changes: 18 additions & 19 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
use risingwave_meta::rpc::ddl_controller::fill_table_stream_graph_info;
use risingwave_meta::rpc::metrics::MetaMetrics;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{Comment, CreateType, Secret, Table};
use risingwave_pb::common::worker_node::State;
use risingwave_pb::common::WorkerType;
Expand Down Expand Up @@ -84,27 +82,28 @@ impl DdlServiceImpl {
}
}

fn extract_replace_table_info(change: ReplaceTablePlan) -> ReplaceTableInfo {
let job_type = change.get_job_type().unwrap_or_default();
let mut source = change.source;
let mut fragment_graph = change.fragment_graph.unwrap();
let mut table = change.table.unwrap();
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) =
table.optional_associated_source_id
{
source.as_mut().unwrap().id = source_id;
fill_table_stream_graph_info(&mut source, &mut table, job_type, &mut fragment_graph);
}
let table_col_index_mapping = change
.table_col_index_mapping
fn extract_replace_table_info(
ReplaceTablePlan {
table,
fragment_graph,
table_col_index_mapping,
source,
job_type,
}: ReplaceTablePlan,
) -> ReplaceTableInfo {
let table = table.unwrap();
let col_index_mapping = table_col_index_mapping
.as_ref()
.map(ColIndexMapping::from_protobuf);

let stream_job = StreamingJob::Table(source, table, job_type);
ReplaceTableInfo {
streaming_job: stream_job,
fragment_graph,
col_index_mapping: table_col_index_mapping,
streaming_job: StreamingJob::Table(
source,
table,
TableJobType::try_from(job_type).unwrap(),
),
fragment_graph: fragment_graph.unwrap(),
col_index_mapping,
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,4 @@ impl StreamingJob {
StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
}
}

pub fn is_source_job(&self) -> bool {
matches!(self, StreamingJob::Source(_))
}
}
76 changes: 4 additions & 72 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::secret::SecretEncryption;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::stream_graph_visitor::{
visit_fragment, visit_stream_node, visit_stream_node_cont_mut,
visit_stream_node, visit_stream_node_cont_mut,
};
use risingwave_common::{bail, hash, must_match};
use risingwave_connector::error::ConnectorError;
Expand All @@ -40,11 +40,9 @@ use risingwave_meta_model::{
ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId,
SubscriptionId, TableId, UserId, ViewId,
};
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{
Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, Schema, Secret,
Sink, Source, Subscription, Table, View,
Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
Subscription, Table, View,
};
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
Expand Down Expand Up @@ -906,7 +904,7 @@ impl DdlController {
pub async fn create_streaming_job(
&self,
mut streaming_job: StreamingJob,
mut fragment_graph: StreamFragmentGraphProto,
fragment_graph: StreamFragmentGraphProto,
affected_table_replace_info: Option<ReplaceTableInfo>,
) -> MetaResult<NotificationVersion> {
let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
Expand All @@ -921,24 +919,6 @@ impl DdlController {
.await?;
let job_id = streaming_job.id();

match &mut streaming_job {
StreamingJob::Table(src, table, job_type) => {
// If we're creating a table with connector, we should additionally fill its ID first.
fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph);
}
StreamingJob::Source(src) => {
// set the inner source id of source node.
for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
source_node.source_inner.as_mut().unwrap().source_id = src.id;
}
});
}
}
_ => {}
}

tracing::debug!(
id = job_id,
definition = streaming_job.definition(),
Expand Down Expand Up @@ -2003,51 +1983,3 @@ impl DdlController {
.await
}
}

/// Fill in necessary information for `Table` stream graph.
/// e.g., fill source id for table with connector, fill external table id for CDC table.
pub fn fill_table_stream_graph_info(
source: &mut Option<PbSource>,
table: &mut PbTable,
table_job_type: TableJobType,
fragment_graph: &mut PbStreamFragmentGraph,
) {
let mut source_count = 0;
for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
if source_node.source_inner.is_none() {
// skip empty source for dml node
return;
}

// If we're creating a table with connector, we should additionally fill its ID first.
if let Some(source) = source {
source_node.source_inner.as_mut().unwrap().source_id = source.id;
source_count += 1;

assert_eq!(
source_count, 1,
"require exactly 1 external stream source when creating table with a connector"
);

// Fill in the correct table id for source.
source.optional_associated_table_id =
Some(OptionalAssociatedTableId::AssociatedTableId(table.id));
// Fill in the correct source id for mview.
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id));
}
}

// fill table id for cdc backfill
if let NodeBody::StreamCdcScan(node) = node_body
&& table_job_type == TableJobType::SharedCdcSource
{
if let Some(table_desc) = node.cdc_table_desc.as_mut() {
table_desc.table_id = table.id;
}
}
});
}
}
1 change: 1 addition & 0 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,7 @@ impl SourceManager {

/// create and register connector worker for source.
pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
tracing::debug!("register_source: {}", source.get_id());
let mut core = self.core.lock().await;
if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) {
let handle = create_source_worker_handle(source, self.metrics.clone())
Expand Down
Loading

0 comments on commit a9f8945

Please sign in to comment.