Skip to content

Commit

Permalink
feat: support alter shared source
Browse files Browse the repository at this point in the history
- table with connector: filled when creating job catalog https://github.com/risingwavelabs/risingwave/blob/193e93fd8d9f9dbae717fe6a5b411e7f33382f27/src/meta/src/controller/streaming_job.rs#L247-L251
- stream node: filled in fill_job

Signed-off-by: xxchan <[email protected]>

feat: support alter shared source

.

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Dec 3, 2024
1 parent d5f5d05 commit 3698c6d
Show file tree
Hide file tree
Showing 15 changed files with 309 additions and 265 deletions.
14 changes: 14 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ message DropSourceResponse {

message AlterSourceRequest {
catalog.Source source = 1;
// for shared source, we need to replace the streaming job
optional ReplaceStreamingJobPlan plan = 2;
}

message AlterSourceResponse {
Expand Down Expand Up @@ -368,6 +370,18 @@ message ReplaceTablePlan {
TableJobType job_type = 5;
}

// Replace a streaming job, but not a table. e.g., alter a shared source.
message ReplaceStreamingJobPlan {
// The new materialization plan, where all schema are updated.
stream_plan.StreamFragmentGraph fragment_graph = 2;
// The mapping from the old columns to the new columns of the table.
// If no column modifications occur (such as for sinking into table), this will be None.
catalog.ColIndexMapping table_col_index_mapping = 3;
// Source catalog of table's associated source
catalog.Source source = 4;
TableJobType job_type = 5;
}

message ReplaceTablePlanRequest {
ReplaceTablePlan plan = 1;
}
Expand Down
5 changes: 3 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,20 @@ message BarrierMutation {
// Stop a set of actors, used for dropping materialized views. Empty dispatchers will be
// automatically removed.
StopMutation stop = 4;
// Update outputs and hash mappings for some dispatchers, used for scaling.
// Update outputs and hash mappings for some dispatchers, used for scaling and replace table.
UpdateMutation update = 5;
// Change the split of some sources.
SourceChangeSplitMutation splits = 6;
// Pause the dataflow of the whole streaming graph, only used for scaling.
PauseMutation pause = 7;
// Resume the dataflow of the whole streaming graph, only used for scaling.
ResumeMutation resume = 8;
// Throttle specific source exec or chain exec.
// Throttle specific source exec or backfill exec.
ThrottleMutation throttle = 10;
// Drop subscription on mv
DropSubscriptionsMutation drop_subscriptions = 12;
// Combined mutation.
// Currently, it can only be Add & Update, which is for sink into table.
CombinedMutation combined = 100;
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub use {Source as SourceModel, Table as TableModel};

pub use super::actor::Entity as Actor;
pub use super::actor_dispatcher::Entity as ActorDispatcher;
pub use super::catalog_version::Entity as CatalogVersion;
Expand Down
6 changes: 3 additions & 3 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl DdlService for DdlServiceImpl {
None => {
let version = self
.ddl_controller
.run_command(DdlCommand::CreateSourceWithoutStreamingJob(source))
.run_command(DdlCommand::CreateNonSharedSource(source))
.await?;
Ok(Response::new(CreateSourceResponse {
status: None,
Expand Down Expand Up @@ -699,10 +699,10 @@ impl DdlService for DdlServiceImpl {
&self,
request: Request<AlterSourceRequest>,
) -> Result<Response<AlterSourceResponse>, Status> {
let AlterSourceRequest { source } = request.into_inner();
let AlterSourceRequest { source, plan } = request.into_inner();
let version = self
.ddl_controller
.run_command(DdlCommand::AlterSourceColumn(source.unwrap()))
.run_command(DdlCommand::AlterNonSharedSource(source.unwrap(), plan))
.await?;
Ok(Response::new(AlterSourceResponse {
status: None,
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ pub enum CreateStreamingJobType {
}

/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
/// it will [build different barriers to send](Self::to_mutation),
/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
#[derive(Debug, strum::Display)]
pub enum Command {
/// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
Expand Down Expand Up @@ -595,6 +595,7 @@ impl Command {

Command::Pause(_) => {
// Only pause when the cluster is not already paused.
// XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
if current_paused_reason.is_none() {
Some(Mutation::Pause(PauseMutation {}))
} else {
Expand Down Expand Up @@ -699,7 +700,6 @@ impl Command {
..
}) = job_type
{
// TODO: support in v2.
let update = Self::generate_update_mutation_for_replace_table(
old_fragments,
merge_updates,
Expand Down
5 changes: 4 additions & 1 deletion src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2730,7 +2730,10 @@ impl CatalogController {
.collect())
}

pub async fn alter_source(&self, pb_source: PbSource) -> MetaResult<NotificationVersion> {
pub async fn alter_non_shared_source(
&self,
pb_source: PbSource,
) -> MetaResult<NotificationVersion> {
let source_id = pb_source.id as SourceId;
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
Expand Down
113 changes: 44 additions & 69 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model::actor::ActorStatus;
use risingwave_meta_model::fragment::DistributionType;
use risingwave_meta_model::object::ObjectType;
use risingwave_meta_model::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob};
use risingwave_meta_model::prelude::{Actor, Fragment, Sink, StreamingJob};
use risingwave_meta_model::{
actor, actor_dispatcher, fragment, object, sink, source, streaming_job, table, ActorId,
ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus,
Expand All @@ -46,7 +46,7 @@ use risingwave_pb::meta::{
use risingwave_pb::source::PbConnectorSplits;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
DispatchStrategy, PbDispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext,
DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext,
};
use sea_orm::sea_query::Expr;
use sea_orm::ActiveValue::Set;
Expand Down Expand Up @@ -1346,37 +1346,49 @@ impl CatalogController {
Ok(actors)
}

pub async fn get_upstream_root_fragments(
/// Get and filter the "**root**" fragments of the specified jobs.
/// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
///
/// Root fragment connects to downstream jobs.
///
/// ## What can be the root fragment
/// - For MV, it should have one `MView` fragment.
/// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
/// - For source, it should have one `Source` fragment.
///
/// In other words, it's the `MView` fragment if it exists, otherwise it's the `Source` fragment.
pub async fn get_root_fragments(
&self,
upstream_job_ids: Vec<ObjectId>,
job_ids: Vec<ObjectId>,
) -> MetaResult<(HashMap<ObjectId, PbFragment>, Vec<(ActorId, WorkerId)>)> {
let inner = self.inner.read().await;

let all_upstream_fragments = Fragment::find()
.filter(fragment::Column::JobId.is_in(upstream_job_ids))
.filter(fragment::Column::JobId.is_in(job_ids))
.all(&inner.db)
.await?;
// job_id -> fragment
let mut fragments = HashMap::<ObjectId, fragment::Model>::new();
let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
for fragment in all_upstream_fragments {
if fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 {
_ = fragments.insert(fragment.job_id, fragment);
_ = root_fragments.insert(fragment.job_id, fragment);
} else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
// look for Source fragment if there's no MView fragment
_ = fragments.try_insert(fragment.job_id, fragment);
// look for Source fragment only if there's no MView fragment
// (notice try_insert here vs insert above)
_ = root_fragments.try_insert(fragment.job_id, fragment);
}
}

let mut root_fragments = HashMap::new();
for (_, fragment) in fragments {
let mut root_fragments_pb = HashMap::new();
for (_, fragment) in root_fragments {
let actors = fragment.find_related(Actor).all(&inner.db).await?;
let actor_dispatchers = get_actor_dispatchers(
&inner.db,
actors.iter().map(|actor| actor.actor_id).collect(),
)
.await?;

root_fragments.insert(
root_fragments_pb.insert(
fragment.job_id,
Self::compose_fragment(fragment, actors, actor_dispatchers)?.0,
);
Expand All @@ -1389,35 +1401,34 @@ impl CatalogController {
.all(&inner.db)
.await?;

Ok((root_fragments, actors))
Ok((root_fragments_pb, actors))
}

/// Get the downstream `Chain` fragments of the specified table.
pub async fn get_downstream_chain_fragments(
pub async fn get_root_fragment(
&self,
job_id: ObjectId,
) -> MetaResult<(PbFragment, Vec<(ActorId, WorkerId)>)> {
let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
let root_fragment = root_fragments
.remove(&job_id)
.context(format!("root fragment for job {} not found", job_id))?;
Ok((root_fragment, actors))
}

/// Get the downstream fragments connected to the specified job.
pub async fn get_downstream_fragments(
&self,
job_id: ObjectId,
) -> MetaResult<(
Vec<(DispatchStrategy, PbFragment)>,
Vec<(ActorId, WorkerId)>,
)> {
let mview_fragment = self.get_mview_fragment(job_id).await?;
let downstream_dispatches: HashMap<_, _> = mview_fragment.actors[0]
.dispatcher
.iter()
.map(|d| {
let fragment_id = d.dispatcher_id as FragmentId;
let strategy = PbDispatchStrategy {
r#type: d.r#type,
dist_key_indices: d.dist_key_indices.clone(),
output_indices: d.output_indices.clone(),
};
(fragment_id, strategy)
})
.collect();
let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
let dispatches = root_fragment.dispatches();

let inner = self.inner.read().await;
let mut chain_fragments = vec![];
for (fragment_id, dispatch_strategy) in downstream_dispatches {
let mut downstream_fragments = vec![];
for (fragment_id, dispatch_strategy) in dispatches {
let mut fragment_actors = Fragment::find_by_id(fragment_id)
.find_with_related(Actor)
.all(&inner.db)
Expand All @@ -1433,17 +1444,10 @@ impl CatalogController {
)
.await?;
let fragment = Self::compose_fragment(fragment, actors, actor_dispatchers)?.0;
chain_fragments.push((dispatch_strategy, fragment));
downstream_fragments.push((dispatch_strategy, fragment));
}

let actors: Vec<(ActorId, WorkerId)> = Actor::find()
.select_only()
.columns([actor::Column::ActorId, actor::Column::WorkerId])
.into_tuple()
.all(&inner.db)
.await?;

Ok((chain_fragments, actors))
Ok((downstream_fragments, actors))
}

pub async fn load_source_fragment_ids(
Expand Down Expand Up @@ -1516,35 +1520,6 @@ impl CatalogController {
Ok(splits.into_iter().collect())
}

/// Get the `Materialize` fragment of the specified table.
pub async fn get_mview_fragment(&self, job_id: ObjectId) -> MetaResult<PbFragment> {
let inner = self.inner.read().await;
let mut fragments = Fragment::find()
.filter(fragment::Column::JobId.eq(job_id))
.all(&inner.db)
.await?;
fragments.retain(|f| f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0);
if fragments.is_empty() {
bail!("No mview fragment found for job {}", job_id);
}
assert_eq!(fragments.len(), 1);

let fragment = fragments.pop().unwrap();
let actor_with_dispatchers = fragment
.find_related(Actor)
.find_with_related(ActorDispatcher)
.all(&inner.db)
.await?;
let mut actors = vec![];
let mut actor_dispatchers = HashMap::new();
for (actor, dispatchers) in actor_with_dispatchers {
actor_dispatchers.insert(actor.actor_id, dispatchers);
actors.push(actor);
}

Ok(Self::compose_fragment(fragment, actors, actor_dispatchers)?.0)
}

/// Get the actor count of `Materialize` or `Sink` fragment of the specified table.
pub async fn get_actual_job_fragment_parallelism(
&self,
Expand Down
27 changes: 23 additions & 4 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ impl CatalogController {
// TODO: In this function, we also update the `Table` model in the meta store.
// Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
// making them the source of truth and performing a full replacement for those in the meta store?
/// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
pub async fn prepare_streaming_job(
&self,
stream_job_fragments: &StreamJobFragments,
Expand Down Expand Up @@ -481,7 +482,7 @@ impl CatalogController {
}

if !for_replace {
// // Update dml fragment id.
// Update dml fragment id.
if let StreamingJob::Table(_, table, ..) = streaming_job {
Table::update(table::ActiveModel {
table_id: Set(table.id as _),
Expand Down Expand Up @@ -1008,7 +1009,7 @@ impl CatalogController {
Ok(version)
}

/// TODO: make it general for other streaming jobs.
/// TODO(alter-source): make it general for other streaming jobs.
/// Currently only for replacing table.
pub async fn finish_replace_streaming_job_inner(
tmp_id: ObjectId,
Expand All @@ -1025,6 +1026,7 @@ impl CatalogController {
let original_job_id = streaming_job.id() as ObjectId;
let job_type = streaming_job.job_type();

// Update catalog
match streaming_job {
StreamingJob::Table(_source, table, _table_job_type) => {
// The source catalog should remain unchanged
Expand Down Expand Up @@ -1063,7 +1065,11 @@ impl CatalogController {
table.incoming_sinks = Set(incoming_sinks.into());
table.update(txn).await?;
}
// TODO: support other streaming jobs
StreamingJob::Source(source) => {
// Update the source catalog with the new one.
let mut source = source::ActiveModel::from(source);
source.update(txn).await?;
}
_ => unreachable!(
"invalid streaming job type: {:?}",
streaming_job.job_type_str()
Expand Down Expand Up @@ -1221,8 +1227,21 @@ impl CatalogController {
)),
})
}
_ => unreachable!("invalid streaming job type: {:?}", job_type),
StreamingJobType::Source => {
let (source, source_obj) = Source::find_by_id(original_job_id)
.find_also_related(Object)
.one(txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
relations.push(PbRelation {
relation_info: Some(PbRelationInfo::Source(
ObjectModel(source, source_obj.unwrap()).into(),
)),
})
}
_ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
}
// TODO(alter-source) what is this
if let Some(table_col_index_mapping) = table_col_index_mapping {
let expr_rewriter = ReplaceTableExprRewriter {
table_col_index_mapping,
Expand Down
Loading

0 comments on commit 3698c6d

Please sign in to comment.