Skip to content

Commit

Permalink
Refactor catalog streaming job handling and minor streaming_job.rs ch…
Browse files Browse the repository at this point in the history
…anges
  • Loading branch information
shanicky committed Apr 15, 2024
1 parent 931244d commit 97a5857
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 46 deletions.
44 changes: 32 additions & 12 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,29 +959,49 @@ impl CatalogController {

let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;

if let Some((streaming_job, merge_updates, table_id)) = replace_table_job_info {
self.finish_replace_streaming_job(
table_id as ObjectId,
streaming_job.clone(),
merge_updates.clone(),
None,
Some(job_id as _),
None,
)
.await?;
}
let replace_table_mapping_update = match replace_table_job_info {
Some((streaming_job, merge_updates, dummy_id)) => {
let incoming_sink_id = job_id;

let (relations, fragment_mapping) = Self::finish_replace_streaming_job_inner(
dummy_id as ObjectId,
merge_updates,
None,
Some(incoming_sink_id as _),
None,
&txn,
streaming_job,
)
.await?;

Some((relations, fragment_mapping))
}
None => None,
};

txn.commit().await?;

self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
.await;
let version = self

let mut version = self
.notify_frontend(
NotificationOperation::Add,
NotificationInfo::RelationGroup(PbRelationGroup { relations }),
)
.await;

if let Some((relations, fragment_mapping)) = replace_table_mapping_update {
self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
.await;
version = self
.notify_frontend(
NotificationOperation::Update,
NotificationInfo::RelationGroup(PbRelationGroup { relations }),
)
.await;
}

Ok(version)
}

Expand Down
95 changes: 61 additions & 34 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ use risingwave_pb::meta::subscribe_response::{
};
use risingwave_pb::meta::table_fragments::PbActorStatus;
use risingwave_pb::meta::{
FragmentParallelUnitMapping, PbRelation, PbRelationGroup, PbTableFragments,
FragmentParallelUnitMapping, PbFragmentParallelUnitMapping, PbRelation, PbRelationGroup,
PbTableFragments, Relation,
};
use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
Expand Down Expand Up @@ -597,13 +598,53 @@ impl CatalogController {
creating_sink_id: Option<SinkId>,
dropping_sink_id: Option<SinkId>,
) -> MetaResult<NotificationVersion> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

let (relations, fragment_mapping) = Self::finish_replace_streaming_job_inner(
dummy_id,
merge_updates,
table_col_index_mapping,
creating_sink_id,
dropping_sink_id,
&txn,
streaming_job,
)
.await?;

txn.commit().await?;

// FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
// catalog and need to access the old fragment. Let frontend nodes delete the old fragment
// when they receive table catalog change.
// self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings)
// .await;
self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
.await;
let version = self
.notify_frontend(
NotificationOperation::Update,
NotificationInfo::RelationGroup(PbRelationGroup { relations }),
)
.await;

Ok(version)
}

pub async fn finish_replace_streaming_job_inner(
dummy_id: ObjectId,
merge_updates: Vec<PbMergeUpdate>,
table_col_index_mapping: Option<ColIndexMapping>,
creating_sink_id: Option<SinkId>,
dropping_sink_id: Option<SinkId>,
txn: &DatabaseTransaction,
streaming_job: StreamingJob,
) -> MetaResult<(Vec<Relation>, Vec<PbFragmentParallelUnitMapping>)> {
// Question: The source catalog should be remain unchanged?
let StreamingJob::Table(_, table, ..) = streaming_job else {
unreachable!("unexpected job: {streaming_job:?}")
};

let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let job_id = table.id as ObjectId;

let mut table = table::ActiveModel::from(table);
Expand All @@ -621,7 +662,7 @@ impl CatalogController {
}

table.incoming_sinks = Set(incoming_sinks.into());
let table = table.update(&txn).await?;
let table = table.update(txn).await?;

// Update state table fragment id.
let fragment_table_ids: Vec<(FragmentId, I32Array)> = Fragment::find()
Expand All @@ -632,7 +673,7 @@ impl CatalogController {
])
.filter(fragment::Column::JobId.eq(dummy_id))
.into_tuple()
.all(&txn)
.all(txn)
.await?;
for (fragment_id, state_table_ids) in fragment_table_ids {
for state_table_id in state_table_ids.into_inner() {
Expand All @@ -641,7 +682,7 @@ impl CatalogController {
fragment_id: Set(Some(fragment_id)),
..Default::default()
}
.update(&txn)
.update(txn)
.await?;
}
}
Expand All @@ -650,12 +691,12 @@ impl CatalogController {
// 1. replace old fragments/actors with new ones.
Fragment::delete_many()
.filter(fragment::Column::JobId.eq(job_id))
.exec(&txn)
.exec(txn)
.await?;
Fragment::update_many()
.col_expr(fragment::Column::JobId, SimpleExpr::from(job_id))
.filter(fragment::Column::JobId.eq(dummy_id))
.exec(&txn)
.exec(txn)
.await?;

// 2. update merges.
Expand Down Expand Up @@ -686,7 +727,7 @@ impl CatalogController {
actor::Column::UpstreamActorIds,
])
.into_tuple::<(ActorId, FragmentId, ActorUpstreamActors)>()
.one(&txn)
.one(txn)
.await?
.ok_or_else(|| {
MetaError::catalog_id_not_found("actor", merge_update.actor_id)
Expand All @@ -709,7 +750,7 @@ impl CatalogController {
upstream_actor_ids: Set(upstream_actors),
..Default::default()
}
.update(&txn)
.update(txn)
.await?;

to_update_fragment_ids.insert(fragment_id);
Expand All @@ -724,7 +765,7 @@ impl CatalogController {
fragment::Column::UpstreamFragmentId,
])
.into_tuple::<(FragmentId, StreamNode, I32Array)>()
.one(&txn)
.one(txn)
.await?
.map(|(id, node, upstream)| (id, node.to_protobuf(), upstream))
.ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
Expand All @@ -748,18 +789,18 @@ impl CatalogController {
upstream_fragment_id: Set(upstream_fragment_id),
..Default::default()
}
.update(&txn)
.update(txn)
.await?;
}

// 3. remove dummy object.
Object::delete_by_id(dummy_id).exec(&txn).await?;
Object::delete_by_id(dummy_id).exec(txn).await?;

// 4. update catalogs and notify.
let mut relations = vec![];
let table_obj = table
.find_related(Object)
.one(&txn)
.one(txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("object", table.table_id))?;
relations.push(PbRelation {
Expand All @@ -775,7 +816,7 @@ impl CatalogController {
.columns([index::Column::IndexId, index::Column::IndexItems])
.filter(index::Column::PrimaryTableId.eq(job_id))
.into_tuple()
.all(&txn)
.all(txn)
.await?;
for (index_id, nodes) in index_items {
let mut pb_nodes = nodes.to_protobuf();
Expand All @@ -787,11 +828,11 @@ impl CatalogController {
index_items: Set(pb_nodes.into()),
..Default::default()
}
.update(&txn)
.update(txn)
.await?;
let index_obj = index
.find_related(Object)
.one(&txn)
.one(txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
relations.push(PbRelation {
Expand All @@ -801,25 +842,11 @@ impl CatalogController {
});
}
}
let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;

txn.commit().await?;
let fragment_mapping: Vec<PbFragmentParallelUnitMapping> =
get_fragment_mappings(txn, job_id as _).await?;

// FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
// catalog and need to access the old fragment. Let frontend nodes delete the old fragment
// when they receive table catalog change.
// self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings)
// .await;
self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
.await;
let version = self
.notify_frontend(
NotificationOperation::Update,
NotificationInfo::RelationGroup(PbRelationGroup { relations }),
)
.await;

Ok(version)
Ok((relations, fragment_mapping))
}

/// `try_abort_replacing_streaming_job` is used to abort the replacing streaming job, the input `job_id` is the dummy job id.
Expand Down

0 comments on commit 97a5857

Please sign in to comment.