From a27ece1fac6467378e36c7030b4d09ffc1f85d9b Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 5 Jan 2024 14:14:31 +0800 Subject: [PATCH] Revert "add TableJobType::ArrangementBackfill" This reverts commit b7347bf155e0d388ed89a955454dc3548924a389. --- proto/ddl_service.proto | 2 - src/meta/src/controller/fragment.rs | 5 +- src/meta/src/manager/catalog/fragment.rs | 5 +- src/meta/src/stream/stream_graph/fragment.rs | 58 +------------------- 4 files changed, 3 insertions(+), 67 deletions(-) diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 93507a2acda79..db910930b5bee 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -147,8 +147,6 @@ enum TableJobType { TABLE_JOB_TYPE_GENERAL = 1; // table streaming job sharing a CDC source job TABLE_JOB_TYPE_SHARED_CDC_SOURCE = 2; - // arrangement backfill - TABLE_JOB_TYPE_ARRANGEMENT_BACKFILL = 3; } message CreateTableRequest { diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 50eee44b18eaf..e1849f8407a45 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1051,10 +1051,7 @@ impl CatalogController { f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 } // MV on MV, and other kinds of table job - None - | Some(PbTableJobType::General) - | Some(PbTableJobType::Unspecified) - | Some(PbTableJobType::ArrangementBackfill) => { + None | Some(PbTableJobType::General) | Some(PbTableJobType::Unspecified) => { f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 } }); diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 4b190faeab624..e5ba6503d79de 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -1364,10 +1364,7 @@ impl FragmentManager { } } // MV on MV, and other kinds of table job - None - | Some(TableJobType::General) - | Some(TableJobType::Unspecified) - | Some(TableJobType::ArrangementBackfill) => { + None | Some(TableJobType::General) | Some(TableJobType::Unspecified) => { if let Some(fragment) = table_fragments.mview_fragment() { fragments.insert(table_id, fragment); } diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index b36809e310750..b19d7a57826b7 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -620,63 +620,7 @@ impl CompleteStreamFragmentGraph { (source_job_id, edge) } - Some(TableJobType::ArrangementBackfill) => { - // handle other kinds of streaming graph, normally MV on MV - let mview_fragment = upstream_root_fragments - .get(&upstream_table_id) - .context("upstream materialized view fragment not found")?; - let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id); - - // Resolve the required output columns from the upstream materialized view. - let (dist_key_indices, output_indices) = { - let nodes = mview_fragment.actors[0].get_nodes().unwrap(); - let mview_node = - nodes.get_node_body().unwrap().as_materialize().unwrap(); - let all_column_ids = mview_node - .get_table() - .unwrap() - .columns - .iter() - .map(|c| c.column_desc.as_ref().unwrap().column_id) - .collect_vec(); - let dist_key_indices = mview_node - .table - .as_ref() - .unwrap() - .distribution_key - .iter() - .map(|i| *i as u32) - .collect(); - - let output_indices = output_columns - .iter() - .map(|c| { - all_column_ids - .iter() - .position(|&id| id == *c) - .map(|i| i as u32) - }) - .collect::>>() - .context( - "column not found in the upstream materialized view", - )?; - (dist_key_indices, output_indices) - }; - let edge = StreamFragmentEdge { - id: EdgeId::UpstreamExternal { - upstream_table_id, - downstream_fragment_id: id, - }, - dispatch_strategy: DispatchStrategy { - r#type: DispatcherType::Hash as _, - dist_key_indices, - output_indices, - }, - }; - - (mview_id, edge) - } - None | Some(TableJobType::General) | Some(TableJobType::Unspecified) => { + _ => { // handle other kinds of streaming graph, normally MV on MV let mview_fragment = upstream_root_fragments .get(&upstream_table_id)