Skip to content

Commit

Permalink
change more
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 21, 2024
1 parent ed2944a commit b1c0736
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 29 deletions.
9 changes: 6 additions & 3 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,17 @@ impl GlobalBarrierWorkerContextImpl {
.catalog_controller
.get_job_fragments_by_id(mview.table_id)
.await?;
let table_fragments = StreamJobFragments::from_protobuf(table_fragments);
if table_fragments.tracking_progress_actor_ids().is_empty() {
let stream_job_fragments = StreamJobFragments::from_protobuf(table_fragments);
if stream_job_fragments
.tracking_progress_actor_ids()
.is_empty()
{
// If there's no tracking actor in the mview, we can finish the job directly.
mgr.catalog_controller
.finish_streaming_job(mview.table_id, None)
.await?;
} else {
mview_map.insert(table_id, (mview.definition.clone(), table_fragments));
mview_map.insert(table_id, (mview.definition.clone(), stream_job_fragments));
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ impl From<TableParallelism> for PbTableParallelism {
}

/// Fragments of a streaming job. Corresponds to [`PbTableFragments`].
/// (It was previously called `TableFragments` due to historical reasons.)
///
/// We store whole fragments in a single column family as follow:
/// `table_id` => `TableFragments`.
/// `stream_job_id` => `StreamJobFragments`.
#[derive(Debug, Clone)]
pub struct StreamJobFragments {
/// The table id.
Expand Down
40 changes: 20 additions & 20 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,13 +668,13 @@ impl DdlController {
streaming_job: &StreamingJob,
fragment_graph: StreamFragmentGraph,
) -> MetaResult<(ReplaceTableContext, StreamJobFragments)> {
let (mut replace_table_ctx, mut table_fragments) = self
let (mut replace_table_ctx, mut stream_job_fragments) = self
.build_replace_table(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _)
.await?;

let mut union_fragment_id = None;

for (fragment_id, fragment) in &mut table_fragments.fragments {
for (fragment_id, fragment) in &mut stream_job_fragments.fragments {
for actor in &mut fragment.actors {
if let Some(node) = &mut actor.nodes {
visit_stream_node(node, |body| {
Expand Down Expand Up @@ -704,7 +704,7 @@ impl DdlController {
&sink_fragment,
target_table,
&mut replace_table_ctx,
&mut table_fragments,
&mut stream_job_fragments,
target_fragment_id,
None,
);
Expand Down Expand Up @@ -743,15 +743,15 @@ impl DdlController {
&sink_fragment,
target_table,
&mut replace_table_ctx,
&mut table_fragments,
&mut stream_job_fragments,
target_fragment_id,
Some(&sink.unique_identity()),
);
}
}

// check if the union fragment is fully assigned.
for fragment in table_fragments.fragments.values_mut() {
for fragment in stream_job_fragments.fragments.values_mut() {
for actor in &mut fragment.actors {
if let Some(node) = &mut actor.nodes {
visit_stream_node(node, |node| {
Expand All @@ -763,15 +763,15 @@ impl DdlController {
}
}

Ok((replace_table_ctx, table_fragments))
Ok((replace_table_ctx, stream_job_fragments))
}

pub(crate) fn inject_replace_table_plan_for_sink(
sink_id: Option<u32>,
sink_fragment: &PbFragment,
table: &Table,
replace_table_ctx: &mut ReplaceTableContext,
table_fragments: &mut StreamJobFragments,
stream_job_fragments: &mut StreamJobFragments,
target_fragment_id: FragmentId,
unique_identity: Option<&str>,
) {
Expand All @@ -781,7 +781,7 @@ impl DdlController {
.map(|a| a.actor_id)
.collect_vec();

let union_fragment = table_fragments
let union_fragment = stream_job_fragments
.fragments
.get_mut(&target_fragment_id)
.unwrap();
Expand Down Expand Up @@ -1350,7 +1350,7 @@ impl DdlController {
let mut updated_sink_catalogs = vec![];

let result: MetaResult<Vec<PbMergeUpdate>> = try {
let (mut ctx, mut table_fragments) = self
let (mut ctx, mut stream_job_fragments) = self
.build_replace_table(
ctx,
&streaming_job,
Expand All @@ -1362,7 +1362,7 @@ impl DdlController {

let mut union_fragment_id = None;

for (fragment_id, fragment) in &mut table_fragments.fragments {
for (fragment_id, fragment) in &mut stream_job_fragments.fragments {
for actor in &mut fragment.actors {
if let Some(node) = &mut actor.nodes {
visit_stream_node(node, |body| {
Expand Down Expand Up @@ -1402,7 +1402,7 @@ impl DdlController {
&sink_fragment,
table,
&mut ctx,
&mut table_fragments,
&mut stream_job_fragments,
target_fragment_id,
Some(&sink.unique_identity()),
);
Expand All @@ -1416,11 +1416,11 @@ impl DdlController {

self.metadata_manager
.catalog_controller
.prepare_streaming_job(&table_fragments, &streaming_job, true)
.prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
.await?;

self.stream_manager
.replace_table(table_fragments, ctx)
.replace_table(stream_job_fragments, ctx)
.await?;
merge_updates
};
Expand Down Expand Up @@ -1618,17 +1618,17 @@ impl DdlController {
_ => TableParallelism::Fixed(parallelism.get()),
};

let table_fragments = StreamJobFragments::new(
let stream_job_fragments = StreamJobFragments::new(
id.into(),
graph,
&building_locations.actor_locations,
stream_ctx.clone(),
table_parallelism,
max_parallelism.get(),
);
let internal_tables = table_fragments.internal_tables();
let internal_tables = stream_job_fragments.internal_tables();

if let Some(mview_fragment) = table_fragments.mview_fragment() {
if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
stream_job.set_table_vnode_count(mview_fragment.vnode_count());
}

Expand Down Expand Up @@ -1663,7 +1663,7 @@ impl DdlController {
&self.metadata_manager,
stream_ctx,
Some(s),
Some(&table_fragments),
Some(&stream_job_fragments),
None,
&streaming_job,
fragment_graph,
Expand Down Expand Up @@ -1698,7 +1698,7 @@ impl DdlController {
snapshot_backfill_info,
};

Ok((ctx, table_fragments))
Ok((ctx, stream_job_fragments))
}

/// `build_replace_table` builds a table replacement and returns the context and new table
Expand Down Expand Up @@ -1821,7 +1821,7 @@ impl DdlController {
// 3. Build the table fragments structure that will be persisted in the stream manager, and
// the context that contains all information needed for building the actors on the compute
// nodes.
let table_fragments = StreamJobFragments::new(
let stream_job_fragments = StreamJobFragments::new(
(tmp_table_id as u32).into(),
graph,
&building_locations.actor_locations,
Expand All @@ -1843,7 +1843,7 @@ impl DdlController {
tmp_id: tmp_table_id as _,
};

Ok((ctx, table_fragments))
Ok((ctx, stream_job_fragments))
}

async fn alter_name(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ impl StreamFragmentGraph {
/// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.
/// Be careful when using the returned values.
///
/// See also [`crate::model::TableFragments::internal_tables`].
/// See also [`crate::model::StreamJobFragments::internal_tables`].
pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
let mut tables = BTreeMap::new();
for fragment in self.fragments.values() {
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/stream/test_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,15 +459,15 @@ async fn test_graph_builder() -> MetaResult<()> {
let ActorGraphBuildResult { graph, .. } =
actor_graph_builder.generate_graph(&env, &job, expr_context)?;

let table_fragments = StreamJobFragments::for_test(TableId::default(), graph);
let actors = table_fragments.actors();
let mview_actor_ids = table_fragments.mview_actor_ids();
let stream_job_fragments = StreamJobFragments::for_test(TableId::default(), graph);
let actors = stream_job_fragments.actors();
let mview_actor_ids = stream_job_fragments.mview_actor_ids();

assert_eq!(actors.len(), 9);
assert_eq!(mview_actor_ids, vec![1]);
assert_eq!(internal_tables.len(), 3);

let fragment_upstreams: HashMap<_, _> = table_fragments
let fragment_upstreams: HashMap<_, _> = stream_job_fragments
.fragments
.iter()
.map(|(fragment_id, fragment)| (*fragment_id, fragment.upstream_fragment_ids.clone()))
Expand Down

0 comments on commit b1c0736

Please sign in to comment.