Skip to content

Commit

Permalink
Revert "add TableJobType::ArrangementBackfill"
Browse files Browse the repository at this point in the history
This reverts commit b7347bf.
  • Loading branch information
kwannoel committed Jan 5, 2024
1 parent 2a627b4 commit a27ece1
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 67 deletions.
2 changes: 0 additions & 2 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ enum TableJobType {
TABLE_JOB_TYPE_GENERAL = 1;
// table streaming job sharing a CDC source job
TABLE_JOB_TYPE_SHARED_CDC_SOURCE = 2;
// arrangement backfill
TABLE_JOB_TYPE_ARRANGEMENT_BACKFILL = 3;
}

message CreateTableRequest {
Expand Down
5 changes: 1 addition & 4 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1051,10 +1051,7 @@ impl CatalogController {
f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0
}
// MV on MV, and other kinds of table job
None
| Some(PbTableJobType::General)
| Some(PbTableJobType::Unspecified)
| Some(PbTableJobType::ArrangementBackfill) => {
None | Some(PbTableJobType::General) | Some(PbTableJobType::Unspecified) => {
f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0
}
});
Expand Down
5 changes: 1 addition & 4 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,10 +1364,7 @@ impl FragmentManager {
}
}
// MV on MV, and other kinds of table job
None
| Some(TableJobType::General)
| Some(TableJobType::Unspecified)
| Some(TableJobType::ArrangementBackfill) => {
None | Some(TableJobType::General) | Some(TableJobType::Unspecified) => {
if let Some(fragment) = table_fragments.mview_fragment() {
fragments.insert(table_id, fragment);
}
Expand Down
58 changes: 1 addition & 57 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,63 +620,7 @@ impl CompleteStreamFragmentGraph {

(source_job_id, edge)
}
Some(TableJobType::ArrangementBackfill) => {
// handle other kinds of streaming graph, normally MV on MV
let mview_fragment = upstream_root_fragments
.get(&upstream_table_id)
.context("upstream materialized view fragment not found")?;
let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id);

// Resolve the required output columns from the upstream materialized view.
let (dist_key_indices, output_indices) = {
let nodes = mview_fragment.actors[0].get_nodes().unwrap();
let mview_node =
nodes.get_node_body().unwrap().as_materialize().unwrap();
let all_column_ids = mview_node
.get_table()
.unwrap()
.columns
.iter()
.map(|c| c.column_desc.as_ref().unwrap().column_id)
.collect_vec();
let dist_key_indices = mview_node
.table
.as_ref()
.unwrap()
.distribution_key
.iter()
.map(|i| *i as u32)
.collect();

let output_indices = output_columns
.iter()
.map(|c| {
all_column_ids
.iter()
.position(|&id| id == *c)
.map(|i| i as u32)
})
.collect::<Option<Vec<_>>>()
.context(
"column not found in the upstream materialized view",
)?;
(dist_key_indices, output_indices)
};
let edge = StreamFragmentEdge {
id: EdgeId::UpstreamExternal {
upstream_table_id,
downstream_fragment_id: id,
},
dispatch_strategy: DispatchStrategy {
r#type: DispatcherType::Hash as _,
dist_key_indices,
output_indices,
},
};

(mview_id, edge)
}
None | Some(TableJobType::General) | Some(TableJobType::Unspecified) => {
_ => {
// handle other kinds of streaming graph, normally MV on MV
let mview_fragment = upstream_root_fragments
.get(&upstream_table_id)
Expand Down

0 comments on commit a27ece1

Please sign in to comment.