diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index b7716c769ddc6..77707d3be5932 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -81,6 +81,8 @@ pub type ActorId = i32; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum JobStatus { + #[sea_orm(string_value = "INITIAL")] + Initial, #[sea_orm(string_value = "CREATING")] Creating, #[sea_orm(string_value = "CREATED")] @@ -90,6 +92,7 @@ pub enum JobStatus { impl From for PbStreamJobStatus { fn from(job_status: JobStatus) -> Self { match job_status { + JobStatus::Initial => Self::Unspecified, JobStatus::Creating => Self::Creating, JobStatus::Created => Self::Created, } @@ -100,6 +103,7 @@ impl From for PbStreamJobStatus { impl From for PbStreamJobState { fn from(status: JobStatus) -> Self { match status { + JobStatus::Initial => PbStreamJobState::Initial, JobStatus::Creating => PbStreamJobState::Creating, JobStatus::Created => PbStreamJobState::Created, } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index ac1ffb392cef8..61fc59c49c3ae 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -935,8 +935,15 @@ impl CommandContext { .await?; } } - MetadataManager::V2(_) => { - unimplemented!("support post collect in v2"); + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .post_collect_table_fragments( + table_fragments.table_id().table_id as _, + table_fragments.actor_ids(), + dispatchers.clone(), + init_split_assignment, + ) + .await?; } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index e8b3878ee1034..afaf1667b2d59 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -113,10 +113,7 @@ impl GlobalBarrierManagerContext { state_table_ids, source_ids, .. - } = mgr - .catalog_controller - .clean_foreground_creating_jobs() - .await?; + } = mgr.catalog_controller.clean_dirty_creating_jobs().await?; // unregister compaction group for cleaned state tables. self.hummock_manager diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index d94a9cab006ae..70fc1c52a0fce 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; -use risingwave_common::bail; use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; +use risingwave_common::{bail, current_cluster_version}; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::table::TableType; @@ -27,7 +27,7 @@ use risingwave_meta_model_v2::{ connection, database, function, index, object, object_dependency, schema, sink, source, streaming_job, table, user_privilege, view, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FunctionId, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId, - SourceId, TableId, UserId, + SourceId, StreamSourceInfo, TableId, UserId, }; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ @@ -39,7 +39,7 @@ use risingwave_pb::meta::relation::PbRelationInfo; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, }; -use risingwave_pb::meta::{PbRelation, PbRelationGroup, PbTableFragments, Relation, RelationGroup}; +use risingwave_pb::meta::{PbRelation, PbRelationGroup}; use risingwave_pb::user::PbUserInfo; use sea_orm::sea_query::{Expr, SimpleExpr}; use sea_orm::ActiveValue::Set; @@ -145,8 +145,8 @@ impl CatalogController { database_id: Set(database_id), initialized_at: Default::default(), created_at: Default::default(), - initialized_at_cluster_version: Default::default(), - created_at_cluster_version: Default::default(), + initialized_at_cluster_version: Set(Some(current_cluster_version())), + created_at_cluster_version: Set(Some(current_cluster_version())), }; Ok(active_db.insert(txn).await?) } @@ -407,16 +407,19 @@ impl CatalogController { Ok(count > 0) } - pub async fn clean_foreground_creating_jobs(&self) -> MetaResult { + /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status. + pub async fn clean_dirty_creating_jobs(&self) -> MetaResult { let inner = self.inner.write().await; let txn = inner.db.begin().await?; let creating_job_ids: Vec = streaming_job::Entity::find() .select_only() .column(streaming_job::Column::JobId) .filter( - streaming_job::Column::CreateType - .eq(CreateType::Foreground) - .and(streaming_job::Column::JobStatus.eq(JobStatus::Creating)), + streaming_job::Column::JobStatus.eq(JobStatus::Initial).or( + streaming_job::Column::JobStatus + .eq(JobStatus::Creating) + .and(streaming_job::Column::CreateType.eq(CreateType::Foreground)), + ), ) .into_tuple() .all(&txn) @@ -449,13 +452,6 @@ impl CatalogController { .all(&txn) .await?; - // get all fragment mappings. - let mut fragment_mappings = vec![]; - for job_id in &creating_job_ids { - let mappings = get_fragment_mappings(&txn, *job_id).await?; - fragment_mappings.extend(mappings); - } - let res = Object::delete_many() .filter(object::Column::Oid.is_in(creating_job_ids.clone())) .exec(&txn) @@ -463,10 +459,6 @@ impl CatalogController { assert!(res.rows_affected > 0); txn.commit().await?; - // notify delete of fragment mappings. - self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings) - .await; - Ok(ReleaseContext { streaming_jobs: creating_job_ids, state_table_ids, @@ -488,9 +480,13 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?; - // update `created_at` as now(). + // update `created_at` as now() and `created_at_cluster_version` as current cluster version. let res = Object::update_many() .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into()) + .col_expr( + object::Column::CreatedAtClusterVersion, + current_cluster_version().into(), + ) .filter(object::Column::Oid.eq(job_id)) .exec(&txn) .await?; @@ -587,8 +583,11 @@ impl CatalogController { _ => unreachable!("invalid job type: {:?}", job_type), } + let fragment_mapping = get_fragment_mappings(&txn, job_id).await?; txn.commit().await?; + self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping) + .await; let version = self .notify_frontend( NotificationOperation::Add, @@ -599,15 +598,6 @@ impl CatalogController { Ok(version) } - pub fn create_stream_job( - &self, - _stream_job: &StreamingJob, - _table_fragments: &PbTableFragments, - _internal_tables: Vec, - ) -> MetaResult<()> { - todo!() - } - pub async fn create_source( &self, mut pb_source: PbSource, @@ -1320,10 +1310,10 @@ impl CatalogController { let version = self .notify_frontend( Operation::Update, - Info::RelationGroup(RelationGroup { + Info::RelationGroup(PbRelationGroup { relations: relations .into_iter() - .map(|relation_info| Relation { + .map(|relation_info| PbRelation { relation_info: Some(relation_info), }) .collect_vec(), @@ -1431,8 +1421,24 @@ impl CatalogController { .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink) .map(|obj| obj.oid) .collect_vec(); + + // cdc source streaming job. + if object_type == ObjectType::Source { + let source_info: Option = Source::find_by_id(object_id) + .select_only() + .column(source::Column::SourceInfo) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?; + if let Some(source_info) = source_info + && source_info.into_inner().cdc_source_job + { + to_drop_streaming_jobs.push(object_id); + } + } + let mut to_drop_state_table_ids = to_drop_table_ids.clone().collect_vec(); - // todo: record index dependency info in the object dependency table. let to_drop_index_ids = to_drop_objects .iter() .filter(|obj| obj.obj_type == ObjectType::Index) diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index e1849f8407a45..d0c39694692b1 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -22,9 +22,9 @@ use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; use risingwave_meta_model_v2::{ - actor, actor_dispatcher, fragment, sink, streaming_job, ActorId, ConnectorSplits, ExprContext, - FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId, StreamNode, - TableId, VnodeBitmap, WorkerId, + actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits, + ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId, + StreamNode, TableId, VnodeBitmap, WorkerId, }; use risingwave_pb::common::PbParallelUnit; use risingwave_pb::ddl_service::PbTableJobType; @@ -60,13 +60,16 @@ use crate::stream::SplitAssignment; use crate::MetaResult; impl CatalogControllerInner { - /// List all fragment vnode mapping info + /// List all fragment vnode mapping info for all CREATED streaming jobs. pub async fn all_running_fragment_mappings( &self, ) -> MetaResult + '_> { let fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find() + .join(JoinType::InnerJoin, fragment::Relation::Object.def()) + .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) .select_only() .columns([fragment::Column::FragmentId, fragment::Column::VnodeMapping]) + .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) .into_tuple() .all(&self.db) .await?; diff --git a/src/meta/src/controller/id.rs b/src/meta/src/controller/id.rs new file mode 100644 index 0000000000000..0f41569908d46 --- /dev/null +++ b/src/meta/src/controller/id.rs @@ -0,0 +1,103 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use risingwave_meta_model_v2::prelude::{Actor, Fragment}; +use risingwave_meta_model_v2::{actor, fragment}; +use sea_orm::sea_query::{Expr, Func}; +use sea_orm::{DatabaseConnection, EntityTrait, QuerySelect}; + +use crate::manager::{IdCategory, IdCategoryType}; +use crate::MetaResult; + +pub struct IdGenerator(AtomicU64); + +impl IdGenerator { + pub async fn new(conn: &DatabaseConnection) -> MetaResult { + let id: i32 = match TYPE { + IdCategory::Table => { + // Since we are using object pk to generate id for tables, here we just implement a dummy + // id generator and refill it later when inserting the table. + 0 + } + IdCategory::Fragment => Fragment::find() + .select_only() + .expr(Func::if_null( + Expr::col(fragment::Column::FragmentId).max().add(1), + 0, + )) + .into_tuple() + .one(conn) + .await? + .unwrap_or_default(), + IdCategory::Actor => Actor::find() + .select_only() + .expr(Func::if_null( + Expr::col(actor::Column::ActorId).max().add(1), + 0, + )) + .into_tuple() + .one(conn) + .await? + .unwrap_or_default(), + _ => unreachable!("IdGeneratorV2 only supports Table, Fragment, and Actor"), + }; + + Ok(Self(AtomicU64::new(id as u64))) + } + + pub fn generate_interval(&self, interval: u64) -> u64 { + self.0.fetch_add(interval, Ordering::Relaxed) + } +} + +pub type IdGeneratorManagerRef = Arc; + +/// `IdGeneratorManager` is a manager for three id generators: `tables`, `fragments`, and `actors`. Note that this is just a +/// workaround for the current implementation of `IdGenerator`. We should refactor it later. +pub struct IdGeneratorManager { + pub tables: Arc>, + pub fragments: Arc>, + pub actors: Arc>, +} + +impl IdGeneratorManager { + pub async fn new(conn: &DatabaseConnection) -> MetaResult { + Ok(Self { + tables: Arc::new(IdGenerator::new(conn).await?), + fragments: Arc::new(IdGenerator::new(conn).await?), + actors: Arc::new(IdGenerator::new(conn).await?), + }) + } + + pub fn generate(&self) -> u64 { + match C { + IdCategory::Table => self.tables.generate_interval(1), + IdCategory::Fragment => self.fragments.generate_interval(1), + IdCategory::Actor => self.actors.generate_interval(1), + _ => unreachable!("IdGeneratorV2 only supports Table, Fragment, and Actor"), + } + } + + pub fn generate_interval(&self, interval: u64) -> u64 { + match C { + IdCategory::Table => self.tables.generate_interval(interval), + IdCategory::Fragment => self.fragments.generate_interval(interval), + IdCategory::Actor => self.actors.generate_interval(interval), + _ => unreachable!("IdGeneratorV2 only supports Table, Fragment, and Actor"), + } + } +} diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 037f9e3417163..8128981b12283 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -31,7 +31,9 @@ use crate::MetaError; pub mod catalog; pub mod cluster; pub mod fragment; +pub mod id; pub mod rename; +pub mod streaming_job; pub mod system_param; pub mod user; pub mod utils; diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs new file mode 100644 index 0000000000000..56f86aab8d85f --- /dev/null +++ b/src/meta/src/controller/streaming_job.rs @@ -0,0 +1,401 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use itertools::Itertools; +use risingwave_meta_model_v2::actor::ActorStatus; +use risingwave_meta_model_v2::object::ObjectType; +use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Object, ObjectDependency, Table}; +use risingwave_meta_model_v2::{ + actor, actor_dispatcher, index, object_dependency, sink, source, streaming_job, table, ActorId, + CreateType, DatabaseId, JobStatus, ObjectId, SchemaId, UserId, +}; +use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; +use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId; +use risingwave_pb::catalog::{PbCreateType, PbTable}; +use risingwave_pb::meta::PbTableFragments; +use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; +use risingwave_pb::stream_plan::Dispatcher; +use sea_orm::sea_query::SimpleExpr; +use sea_orm::ActiveValue::Set; +use sea_orm::{ + ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, + NotSet, QueryFilter, TransactionTrait, +}; + +use crate::controller::catalog::CatalogController; +use crate::controller::utils::{check_relation_name_duplicate, ensure_object_id, ensure_user_id}; +use crate::manager::StreamingJob; +use crate::model::StreamContext; +use crate::stream::SplitAssignment; +use crate::MetaResult; + +impl CatalogController { + pub async fn create_streaming_job_obj( + txn: &DatabaseTransaction, + obj_type: ObjectType, + owner_id: UserId, + database_id: Option, + schema_id: Option, + create_type: PbCreateType, + ctx: &StreamContext, + ) -> MetaResult { + let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?; + let job = streaming_job::ActiveModel { + job_id: Set(obj.oid), + job_status: Set(JobStatus::Initial), + create_type: Set(create_type.into()), + timezone: Set(ctx.timezone.clone()), + }; + job.insert(txn).await?; + + Ok(obj.oid) + } + + pub async fn create_job_catalog( + &self, + streaming_job: &mut StreamingJob, + ctx: &StreamContext, + ) -> MetaResult<()> { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let create_type = streaming_job.create_type(); + + ensure_user_id(streaming_job.owner() as _, &txn).await?; + ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?; + ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?; + check_relation_name_duplicate( + &streaming_job.name(), + streaming_job.database_id() as _, + streaming_job.schema_id() as _, + &txn, + ) + .await?; + + match streaming_job { + StreamingJob::MaterializedView(table) => { + let job_id = Self::create_streaming_job_obj( + &txn, + ObjectType::Table, + table.owner as _, + Some(table.database_id as _), + Some(table.schema_id as _), + create_type, + ctx, + ) + .await?; + table.id = job_id as _; + let table: table::ActiveModel = table.clone().into(); + table.insert(&txn).await?; + } + StreamingJob::Sink(sink, _) => { + let job_id = Self::create_streaming_job_obj( + &txn, + ObjectType::Sink, + sink.owner as _, + Some(sink.database_id as _), + Some(sink.schema_id as _), + create_type, + ctx, + ) + .await?; + sink.id = job_id as _; + let sink: sink::ActiveModel = sink.clone().into(); + sink.insert(&txn).await?; + } + StreamingJob::Table(src, table, _) => { + let job_id = Self::create_streaming_job_obj( + &txn, + ObjectType::Table, + table.owner as _, + Some(table.database_id as _), + Some(table.schema_id as _), + create_type, + ctx, + ) + .await?; + table.id = job_id as _; + if let Some(src) = src { + let src_obj = Self::create_object( + &txn, + ObjectType::Source, + src.owner as _, + Some(src.database_id as _), + Some(src.schema_id as _), + ) + .await?; + src.id = src_obj.oid as _; + src.optional_associated_table_id = + Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _)); + table.optional_associated_source_id = Some( + PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _), + ); + let source: source::ActiveModel = src.clone().into(); + source.insert(&txn).await?; + } + let table: table::ActiveModel = table.clone().into(); + table.insert(&txn).await?; + } + StreamingJob::Index(index, table) => { + ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?; + let job_id = Self::create_streaming_job_obj( + &txn, + ObjectType::Index, + index.owner as _, + Some(index.database_id as _), + Some(index.schema_id as _), + create_type, + ctx, + ) + .await?; + // to be compatible with old implementation. + index.id = job_id as _; + index.index_table_id = job_id as _; + table.id = job_id as _; + + object_dependency::ActiveModel { + oid: Set(index.primary_table_id as _), + used_by: Set(table.id as _), + ..Default::default() + } + .insert(&txn) + .await?; + + let table: table::ActiveModel = table.clone().into(); + table.insert(&txn).await?; + let index: index::ActiveModel = index.clone().into(); + index.insert(&txn).await?; + } + StreamingJob::Source(src) => { + let job_id = Self::create_streaming_job_obj( + &txn, + ObjectType::Source, + src.owner as _, + Some(src.database_id as _), + Some(src.schema_id as _), + create_type, + ctx, + ) + .await?; + src.id = job_id as _; + let source: source::ActiveModel = src.clone().into(); + source.insert(&txn).await?; + } + } + + // record object dependency. + let dependent_relations = streaming_job.dependent_relations(); + if !dependent_relations.is_empty() { + ObjectDependency::insert_many(dependent_relations.into_iter().map(|id| { + object_dependency::ActiveModel { + oid: Set(id as _), + used_by: Set(streaming_job.id() as _), + ..Default::default() + } + })) + .exec(&txn) + .await?; + } + + txn.commit().await?; + + Ok(()) + } + + pub async fn create_internal_table_catalog( + &self, + job_id: ObjectId, + internal_tables: Vec, + ) -> MetaResult> { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let mut table_id_map = HashMap::new(); + for table in internal_tables { + let table_id = Self::create_object( + &txn, + ObjectType::Table, + table.owner as _, + Some(table.database_id as _), + Some(table.schema_id as _), + ) + .await? + .oid; + table_id_map.insert(table.id, table_id as u32); + let mut table: table::ActiveModel = table.into(); + table.table_id = Set(table_id as _); + table.belongs_to_job_id = Set(Some(job_id as _)); + table.insert(&txn).await?; + } + txn.commit().await?; + + Ok(table_id_map) + } + + pub async fn prepare_streaming_job( + &self, + table_fragment: PbTableFragments, + streaming_job: &StreamingJob, + ) -> MetaResult<()> { + let fragment_actors = + Self::extract_fragment_and_actors_from_table_fragments(table_fragment)?; + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + + // Add fragments, actors and actor dispatchers. + for (fragment, actors, actor_dispatchers) in fragment_actors { + let fragment = fragment.into_active_model(); + fragment.insert(&txn).await?; + for actor in actors { + let actor = actor.into_active_model(); + actor.insert(&txn).await?; + } + for (_, actor_dispatchers) in actor_dispatchers { + for actor_dispatcher in actor_dispatchers { + let mut actor_dispatcher = actor_dispatcher.into_active_model(); + actor_dispatcher.id = NotSet; + actor_dispatcher.insert(&txn).await?; + } + } + } + + // Update fragment id and dml fragment id. + match streaming_job { + StreamingJob::MaterializedView(table) + | StreamingJob::Index(_, table) + | StreamingJob::Table(_, table, ..) => { + Table::update(table::ActiveModel { + table_id: Set(table.id as _), + fragment_id: Set(Some(table.fragment_id as _)), + ..Default::default() + }) + .exec(&txn) + .await?; + } + _ => {} + } + if let StreamingJob::Table(_, table, ..) = streaming_job { + Table::update(table::ActiveModel { + table_id: Set(table.id as _), + dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)), + ..Default::default() + }) + .exec(&txn) + .await?; + } + + txn.commit().await?; + + Ok(()) + } + + /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode. + /// It returns true if the job is not found or aborted. + pub async fn try_abort_creating_streaming_job(&self, job_id: ObjectId) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + + let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?; + let Some(streaming_job) = streaming_job else { + tracing::warn!( + id = job_id, + "streaming job not found when aborting creating, might be cleaned by recovery" + ); + return Ok(true); + }; + + assert_ne!(streaming_job.job_status, JobStatus::Created); + if streaming_job.create_type == CreateType::Background + && streaming_job.job_status == JobStatus::Creating + { + // If the job is created in background and still in creating status, we should not abort it and let recovery to handle it. + tracing::warn!( + id = job_id, + "streaming job is created in background and still in creating status" + ); + return Ok(false); + } + + Object::delete_by_id(job_id).exec(&txn).await?; + txn.commit().await?; + + Ok(true) + } + + pub async fn post_collect_table_fragments( + &self, + job_id: ObjectId, + actor_ids: Vec, + new_actor_dispatchers: HashMap>, + split_assignment: &SplitAssignment, + ) -> MetaResult<()> { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + + Actor::update_many() + .col_expr( + actor::Column::Status, + SimpleExpr::from(ActorStatus::Running.into_value()), + ) + .filter( + actor::Column::ActorId + .is_in(actor_ids.into_iter().map(|id| id as ActorId).collect_vec()), + ) + .exec(&txn) + .await?; + + for splits in split_assignment.values() { + for (actor_id, splits) in splits { + let splits = splits.iter().map(PbConnectorSplit::from).collect_vec(); + let connector_splits = PbConnectorSplits { splits }; + actor::ActiveModel { + actor_id: Set(*actor_id as _), + splits: Set(Some(connector_splits.into())), + ..Default::default() + } + .update(&txn) + .await?; + } + } + + let mut actor_dispatchers = vec![]; + for (actor_id, dispatchers) in new_actor_dispatchers { + for dispatcher in dispatchers { + let mut actor_dispatcher = + actor_dispatcher::Model::from((actor_id, dispatcher)).into_active_model(); + actor_dispatcher.id = NotSet; + actor_dispatchers.push(actor_dispatcher); + } + } + + if !actor_dispatchers.is_empty() { + ActorDispatcher::insert_many(actor_dispatchers) + .exec(&txn) + .await?; + } + + // Mark job as CREATING. + streaming_job::ActiveModel { + job_id: Set(job_id), + job_status: Set(JobStatus::Creating), + ..Default::default() + } + .update(&txn) + .await?; + + txn.commit().await?; + + Ok(()) + } +} diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 171597c871668..41bad85b3f4ca 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use anyhow::anyhow; -use itertools::Itertools; use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::fragment::DistributionType; @@ -657,12 +656,14 @@ where .all(db) .await?; - Ok(actor_dispatchers - .into_iter() - .group_by(|actor_dispatcher| actor_dispatcher.actor_id) - .into_iter() - .map(|(actor_id, actor_dispatcher)| (actor_id, actor_dispatcher.collect())) - .collect()) + let mut actor_dispatchers_map = HashMap::new(); + for actor_dispatcher in actor_dispatchers { + actor_dispatchers_map + .entry(actor_dispatcher.actor_id) + .or_insert_with(Vec::new) + .push(actor_dispatcher); + } + Ok(actor_dispatchers_map) } /// `get_fragment_parallel_unit_mappings` returns the fragment vnode mappings of the given job. diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 601f3fa016031..dd25c39307905 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -23,6 +23,9 @@ use risingwave_rpc_client::{ConnectorClient, StreamClientPool, StreamClientPoolR use sea_orm::EntityTrait; use super::{SystemParamsManager, SystemParamsManagerRef}; +use crate::controller::id::{ + IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef, +}; use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef}; use crate::controller::SqlMetaStore; use crate::hummock::sequence::SequenceGenerator; @@ -44,6 +47,9 @@ pub struct MetaSrvEnv { /// id generator manager. id_gen_manager: IdGeneratorManagerRef, + /// sql id generator manager. + sql_id_gen_manager: Option, + /// meta store. meta_store: MetaStoreRef, @@ -317,8 +323,15 @@ impl MetaSrvEnv { .clone() .map(|m| Arc::new(SequenceGenerator::new(m.conn))); + let sql_id_gen_manager = if let Some(store) = &meta_store_sql { + Some(Arc::new(SqlIdGeneratorManager::new(&store.conn).await?)) + } else { + None + }; + Ok(Self { id_gen_manager, + sql_id_gen_manager, meta_store, meta_store_sql, notification_manager, @@ -355,6 +368,10 @@ impl MetaSrvEnv { self.id_gen_manager.deref() } + pub fn sql_id_gen_manager_ref(&self) -> Option { + self.sql_id_gen_manager.clone() + } + pub fn notification_manager_ref(&self) -> NotificationManagerRef { self.notification_manager.clone() } @@ -466,12 +483,20 @@ impl MetaSrvEnv { }; let event_log_manager = Arc::new(EventLogManger::for_test()); + let sql_id_gen_manager = if let Some(store) = &meta_store_sql { + Some(Arc::new( + SqlIdGeneratorManager::new(&store.conn).await.unwrap(), + )) + } else { + None + }; let hummock_seq = meta_store_sql .clone() .map(|m| Arc::new(SequenceGenerator::new(m.conn))); Self { id_gen_manager, + sql_id_gen_manager, meta_store, meta_store_sql, notification_manager, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index c613385735125..3bff278d32e01 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -636,7 +636,9 @@ impl DdlController { affected_table_replace_info: Option, ) -> MetaResult { let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("support create streaming job in v2"); + return self + .create_streaming_job_v2(stream_job, fragment_graph) + .await; }; let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; stream_job.set_id(id); diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs new file mode 100644 index 0000000000000..852d9776d61db --- /dev/null +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -0,0 +1,194 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::util::stream_graph_visitor::visit_fragment; +use risingwave_pb::catalog::CreateType; +use risingwave_pb::ddl_service::TableJobType; +use risingwave_pb::stream_plan::stream_node::NodeBody; +use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; +use thiserror_ext::AsReport; + +use crate::manager::{ + MetadataManager, MetadataManagerV2, NotificationVersion, StreamingJob, + IGNORED_NOTIFICATION_VERSION, +}; +use crate::model::{MetadataModel, StreamContext}; +use crate::rpc::ddl_controller::{fill_table_stream_graph_info, DdlController}; +use crate::stream::{validate_sink, StreamFragmentGraph}; +use crate::MetaResult; + +impl DdlController { + pub async fn create_streaming_job_v2( + &self, + mut streaming_job: StreamingJob, + mut fragment_graph: StreamFragmentGraphProto, + ) -> MetaResult { + let MetadataManager::V2(mgr) = &self.metadata_manager else { + unreachable!("MetadataManager should be V2") + }; + + let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); + mgr.catalog_controller + .create_job_catalog(&mut streaming_job, &ctx) + .await?; + let job_id = streaming_job.id(); + + match &mut streaming_job { + StreamingJob::Table(Some(src), table, job_type) => { + // If we're creating a table with connector, we should additionally fill its ID first. + fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph); + } + StreamingJob::Source(src) => { + // set the inner source id of source node. + for fragment in fragment_graph.fragments.values_mut() { + visit_fragment(fragment, |node_body| { + if let NodeBody::Source(source_node) = node_body { + source_node.source_inner.as_mut().unwrap().source_id = src.id; + } + }); + } + } + _ => {} + } + + tracing::debug!( + id = job_id, + definition = streaming_job.definition(), + "starting streaming job", + ); + let _permit = self + .creating_streaming_job_permits + .semaphore + .acquire() + .await + .unwrap(); + let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + + // create streaming job. + match self + .create_streaming_job_inner_v2(mgr, ctx, &mut streaming_job, fragment_graph) + .await + { + Ok(version) => Ok(version), + Err(err) => { + tracing::error!(id = job_id, error = ?err.as_report(), "failed to create streaming job"); + let aborted = mgr + .catalog_controller + .try_abort_creating_streaming_job(job_id as _) + .await?; + if aborted { + tracing::warn!(id = job_id, "aborted streaming job"); + match &streaming_job { + StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => { + self.source_manager.unregister_sources(vec![src.id]).await; + } + _ => {} + } + } + Err(err) + } + } + } + + async fn create_streaming_job_inner_v2( + &self, + mgr: &MetadataManagerV2, + ctx: StreamContext, + streaming_job: &mut StreamingJob, + fragment_graph: StreamFragmentGraphProto, + ) -> MetaResult { + let mut fragment_graph = + StreamFragmentGraph::new(&self.env, fragment_graph, streaming_job).await?; + streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + + // create internal table catalogs and refill table id. + let internal_tables = fragment_graph.internal_tables().into_values().collect_vec(); + let table_id_map = mgr + .catalog_controller + .create_internal_table_catalog(streaming_job.id() as _, internal_tables) + .await?; + fragment_graph.refill_internal_table_ids(table_id_map); + + // create fragment and actor catalogs. + tracing::debug!(id = streaming_job.id(), "building streaming job"); + let (ctx, table_fragments) = self + .build_stream_job(ctx, streaming_job, fragment_graph, None) + .await?; + + match streaming_job { + StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => { + Self::validate_cdc_table(table, &table_fragments).await?; + } + StreamingJob::Table(Some(source), ..) => { + // Register the source on the connector node. + self.source_manager.register_source(source).await?; + } + StreamingJob::Sink(sink, target_table) => { + if target_table.is_some() { + unimplemented!("support create sink into table in v2"); + } + // Validate the sink on the connector node. + validate_sink(sink).await?; + } + StreamingJob::Source(source) => { + // Register the source on the connector node. + self.source_manager.register_source(source).await?; + } + _ => {} + } + + mgr.catalog_controller + .prepare_streaming_job(table_fragments.to_protobuf(), streaming_job) + .await?; + + // create streaming jobs. + let stream_job_id = streaming_job.id(); + match streaming_job.create_type() { + CreateType::Unspecified | CreateType::Foreground => { + self.stream_manager + .create_streaming_job(table_fragments, ctx) + .await?; + let version = mgr + .catalog_controller + .finish_streaming_job(stream_job_id as _) + .await?; + Ok(version) + } + CreateType::Background => { + let ctrl = self.clone(); + let mgr = mgr.clone(); + let fut = async move { + let result = ctrl + .stream_manager + .create_streaming_job(table_fragments, ctx) + .await.inspect_err(|err| { + tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job"); + }); + if result.is_ok() { + let _ = mgr + .catalog_controller + .finish_streaming_job(stream_job_id as _) + .await.inspect_err(|err| { + tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to finish background streaming job"); + }); + } + }; + tokio::spawn(fut); + Ok(IGNORED_NOTIFICATION_VERSION) + } + } + } +} diff --git a/src/meta/src/rpc/mod.rs b/src/meta/src/rpc/mod.rs index 7eb347c9e3e85..ebf9af1a8a78c 100644 --- a/src/meta/src/rpc/mod.rs +++ b/src/meta/src/rpc/mod.rs @@ -14,6 +14,7 @@ pub mod cloud_provider; pub mod ddl_controller; +mod ddl_controller_v2; pub mod election; pub mod intercept; pub mod metrics; diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 801be6a4ab628..c9eccf29d4734 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -716,9 +716,11 @@ impl ActorGraphBuilder { .values() .map(|d| d.parallelism()) .sum::() as u64; - - // TODO: use sql_id_gen that is not implemented yet. - let id_gen = GlobalActorIdGen::new(env.id_gen_manager(), actor_len).await?; + let id_gen = if let Some(sql_id_gen) = env.sql_id_gen_manager_ref() { + GlobalActorIdGen::new_v2(&sql_id_gen, actor_len) + } else { + GlobalActorIdGen::new(env.id_gen_manager(), actor_len).await? + }; // Build the actor graph and get the final state. let ActorGraphBuildStateInner { diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index e4b2b03004fe8..79926a359060b 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -288,11 +288,19 @@ impl StreamFragmentGraph { proto: StreamFragmentGraphProto, job: &StreamingJob, ) -> MetaResult { - // TODO: use sql_id_gen that is not implemented yet. - let fragment_id_gen = - GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64).await?; - let table_id_gen = - GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64).await?; + let (fragment_id_gen, table_id_gen) = if let Some(sql_id_gen) = env.sql_id_gen_manager_ref() + { + ( + GlobalFragmentIdGen::new_v2(&sql_id_gen, proto.fragments.len() as u64), + GlobalTableIdGen::new_v2(&sql_id_gen, proto.table_ids_cnt as u64), + ) + } else { + ( + GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64) + .await?, + GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64).await?, + ) + }; // Create nodes. let fragments: HashMap<_, _> = proto diff --git a/src/meta/src/stream/stream_graph/id.rs b/src/meta/src/stream/stream_graph/id.rs index f835440fd801a..9027a0c6c3e1b 100644 --- a/src/meta/src/stream/stream_graph/id.rs +++ b/src/meta/src/stream/stream_graph/id.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::controller::id::IdGeneratorManager as SqlIdGeneratorManager; use crate::manager::{IdCategory, IdCategoryType, IdGeneratorManager}; use crate::MetaResult; @@ -55,6 +56,14 @@ impl GlobalIdGen { }) } + pub fn new_v2(id_gen: &SqlIdGeneratorManager, len: u64) -> Self { + let offset = id_gen.generate_interval::(len); + Self { + offset: offset as u32, + len: len as u32, + } + } + /// Convert local id to global id. Panics if `id >= len`. pub fn to_global_id(self, local_id: u32) -> GlobalId { assert!( diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 3382aae93ce9b..6c09eec492f19 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -450,6 +450,7 @@ fn make_cluster_info() -> StreamingClusterInfo { } #[tokio::test] +#[cfg(not(madsim))] async fn test_graph_builder() -> MetaResult<()> { let env = MetaSrvEnv::for_test().await; let parallel_degree = 4; @@ -475,11 +476,11 @@ async fn test_graph_builder() -> MetaResult<()> { let table_fragments = TableFragments::for_test(TableId::default(), graph); let actors = table_fragments.actors(); let barrier_inject_actor_ids = table_fragments.barrier_inject_actor_ids(); - let sink_actor_ids = table_fragments.mview_actor_ids(); + let mview_actor_ids = table_fragments.mview_actor_ids(); assert_eq!(actors.len(), 9); - assert_eq!(barrier_inject_actor_ids, vec![6, 7, 8, 9]); - assert_eq!(sink_actor_ids, vec![1]); + assert_eq!(barrier_inject_actor_ids, vec![5, 6, 7, 8]); + assert_eq!(mview_actor_ids, vec![0]); assert_eq!(internal_tables.len(), 3); let fragment_upstreams: HashMap<_, _> = table_fragments @@ -488,34 +489,33 @@ async fn test_graph_builder() -> MetaResult<()> { .map(|(fragment_id, fragment)| (*fragment_id, fragment.upstream_fragment_ids.clone())) .collect(); + assert_eq!(fragment_upstreams.get(&0).unwrap(), &vec![1]); assert_eq!(fragment_upstreams.get(&1).unwrap(), &vec![2]); - assert_eq!(fragment_upstreams.get(&2).unwrap(), &vec![3]); - assert!(fragment_upstreams.get(&3).unwrap().is_empty()); + assert!(fragment_upstreams.get(&2).unwrap().is_empty()); let mut expected_downstream = HashMap::new(); - expected_downstream.insert(1, vec![]); - expected_downstream.insert(2, vec![1]); - expected_downstream.insert(3, vec![1]); - expected_downstream.insert(4, vec![1]); - expected_downstream.insert(5, vec![1]); - expected_downstream.insert(6, vec![2, 3, 4, 5]); - expected_downstream.insert(7, vec![2, 3, 4, 5]); - expected_downstream.insert(8, vec![2, 3, 4, 5]); - expected_downstream.insert(9, vec![2, 3, 4, 5]); + expected_downstream.insert(0, vec![]); + expected_downstream.insert(1, vec![0]); + expected_downstream.insert(2, vec![0]); + expected_downstream.insert(3, vec![0]); + expected_downstream.insert(4, vec![0]); + expected_downstream.insert(5, vec![1, 2, 3, 4]); + expected_downstream.insert(6, vec![1, 2, 3, 4]); + expected_downstream.insert(7, vec![1, 2, 3, 4]); + expected_downstream.insert(8, vec![1, 2, 3, 4]); let mut expected_upstream = HashMap::new(); - expected_upstream.insert(1, vec![2, 3, 4, 5]); - expected_upstream.insert(2, vec![6, 7, 8, 9]); - expected_upstream.insert(3, vec![6, 7, 8, 9]); - expected_upstream.insert(4, vec![6, 7, 8, 9]); - expected_upstream.insert(5, vec![6, 7, 8, 9]); + expected_upstream.insert(0, vec![1, 2, 3, 4]); + expected_upstream.insert(1, vec![5, 6, 7, 8]); + expected_upstream.insert(2, vec![5, 6, 7, 8]); + expected_upstream.insert(3, vec![5, 6, 7, 8]); + expected_upstream.insert(4, vec![5, 6, 7, 8]); + expected_upstream.insert(5, vec![]); expected_upstream.insert(6, vec![]); expected_upstream.insert(7, vec![]); expected_upstream.insert(8, vec![]); - expected_upstream.insert(9, vec![]); for actor in actors { - println!("actor_id = {}", actor.get_actor_id()); assert_eq!( expected_downstream.get(&actor.get_actor_id()).unwrap(), actor