Skip to content

Commit

Permalink
feat(sql-backend): [PART 4] support create streaming job in sql backe…
Browse files Browse the repository at this point in the history
…nd (#14212)
  • Loading branch information
yezizp2012 authored Jan 11, 2024
1 parent 2952ffe commit 06d4728
Show file tree
Hide file tree
Showing 17 changed files with 845 additions and 80 deletions.
4 changes: 4 additions & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub type ActorId = i32;
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum JobStatus {
#[sea_orm(string_value = "INITIAL")]
Initial,
#[sea_orm(string_value = "CREATING")]
Creating,
#[sea_orm(string_value = "CREATED")]
Expand All @@ -90,6 +92,7 @@ pub enum JobStatus {
impl From<JobStatus> for PbStreamJobStatus {
fn from(job_status: JobStatus) -> Self {
match job_status {
JobStatus::Initial => Self::Unspecified,
JobStatus::Creating => Self::Creating,
JobStatus::Created => Self::Created,
}
Expand All @@ -100,6 +103,7 @@ impl From<JobStatus> for PbStreamJobStatus {
impl From<JobStatus> for PbStreamJobState {
fn from(status: JobStatus) -> Self {
match status {
JobStatus::Initial => PbStreamJobState::Initial,
JobStatus::Creating => PbStreamJobState::Creating,
JobStatus::Created => PbStreamJobState::Created,
}
Expand Down
11 changes: 9 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,8 +935,15 @@ impl CommandContext {
.await?;
}
}
MetadataManager::V2(_) => {
unimplemented!("support post collect in v2");
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.post_collect_table_fragments(
table_fragments.table_id().table_id as _,
table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ impl GlobalBarrierManagerContext {
state_table_ids,
source_ids,
..
} = mgr
.catalog_controller
.clean_foreground_creating_jobs()
.await?;
} = mgr.catalog_controller.clean_dirty_creating_jobs().await?;

// unregister compaction group for cleaned state tables.
self.hummock_manager
Expand Down
72 changes: 39 additions & 33 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ use std::sync::Arc;

use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS};
use risingwave_common::{bail, current_cluster_version};
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::*;
use risingwave_meta_model_v2::table::TableType;
use risingwave_meta_model_v2::{
connection, database, function, index, object, object_dependency, schema, sink, source,
streaming_job, table, user_privilege, view, ColumnCatalogArray, ConnectionId, CreateType,
DatabaseId, FunctionId, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId,
SourceId, TableId, UserId,
SourceId, StreamSourceInfo, TableId, UserId,
};
use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::{
Expand All @@ -39,7 +39,7 @@ use risingwave_pb::meta::relation::PbRelationInfo;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
};
use risingwave_pb::meta::{PbRelation, PbRelationGroup, PbTableFragments, Relation, RelationGroup};
use risingwave_pb::meta::{PbRelation, PbRelationGroup};
use risingwave_pb::user::PbUserInfo;
use sea_orm::sea_query::{Expr, SimpleExpr};
use sea_orm::ActiveValue::Set;
Expand Down Expand Up @@ -145,8 +145,8 @@ impl CatalogController {
database_id: Set(database_id),
initialized_at: Default::default(),
created_at: Default::default(),
initialized_at_cluster_version: Default::default(),
created_at_cluster_version: Default::default(),
initialized_at_cluster_version: Set(Some(current_cluster_version())),
created_at_cluster_version: Set(Some(current_cluster_version())),
};
Ok(active_db.insert(txn).await?)
}
Expand Down Expand Up @@ -407,16 +407,19 @@ impl CatalogController {
Ok(count > 0)
}

pub async fn clean_foreground_creating_jobs(&self) -> MetaResult<ReleaseContext> {
/// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
pub async fn clean_dirty_creating_jobs(&self) -> MetaResult<ReleaseContext> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let creating_job_ids: Vec<ObjectId> = streaming_job::Entity::find()
.select_only()
.column(streaming_job::Column::JobId)
.filter(
streaming_job::Column::CreateType
.eq(CreateType::Foreground)
.and(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
streaming_job::Column::JobStatus
.eq(JobStatus::Creating)
.and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
),
)
.into_tuple()
.all(&txn)
Expand Down Expand Up @@ -449,24 +452,13 @@ impl CatalogController {
.all(&txn)
.await?;

// get all fragment mappings.
let mut fragment_mappings = vec![];
for job_id in &creating_job_ids {
let mappings = get_fragment_mappings(&txn, *job_id).await?;
fragment_mappings.extend(mappings);
}

let res = Object::delete_many()
.filter(object::Column::Oid.is_in(creating_job_ids.clone()))
.exec(&txn)
.await?;
assert!(res.rows_affected > 0);
txn.commit().await?;

// notify delete of fragment mappings.
self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings)
.await;

Ok(ReleaseContext {
streaming_jobs: creating_job_ids,
state_table_ids,
Expand All @@ -488,9 +480,13 @@ impl CatalogController {
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;

// update `created_at` as now().
// update `created_at` as now() and `created_at_cluster_version` as current cluster version.
let res = Object::update_many()
.col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
.col_expr(
object::Column::CreatedAtClusterVersion,
current_cluster_version().into(),
)
.filter(object::Column::Oid.eq(job_id))
.exec(&txn)
.await?;
Expand Down Expand Up @@ -587,8 +583,11 @@ impl CatalogController {
_ => unreachable!("invalid job type: {:?}", job_type),
}

let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;
txn.commit().await?;

self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
.await;
let version = self
.notify_frontend(
NotificationOperation::Add,
Expand All @@ -599,15 +598,6 @@ impl CatalogController {
Ok(version)
}

pub fn create_stream_job(
&self,
_stream_job: &StreamingJob,
_table_fragments: &PbTableFragments,
_internal_tables: Vec<PbTable>,
) -> MetaResult<()> {
todo!()
}

pub async fn create_source(
&self,
mut pb_source: PbSource,
Expand Down Expand Up @@ -1320,10 +1310,10 @@ impl CatalogController {
let version = self
.notify_frontend(
Operation::Update,
Info::RelationGroup(RelationGroup {
Info::RelationGroup(PbRelationGroup {
relations: relations
.into_iter()
.map(|relation_info| Relation {
.map(|relation_info| PbRelation {
relation_info: Some(relation_info),
})
.collect_vec(),
Expand Down Expand Up @@ -1431,8 +1421,24 @@ impl CatalogController {
.filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
.map(|obj| obj.oid)
.collect_vec();

// cdc source streaming job.
if object_type == ObjectType::Source {
let source_info: Option<StreamSourceInfo> = Source::find_by_id(object_id)
.select_only()
.column(source::Column::SourceInfo)
.into_tuple()
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
if let Some(source_info) = source_info
&& source_info.into_inner().cdc_source_job
{
to_drop_streaming_jobs.push(object_id);
}
}

let mut to_drop_state_table_ids = to_drop_table_ids.clone().collect_vec();
// todo: record index dependency info in the object dependency table.
let to_drop_index_ids = to_drop_objects
.iter()
.filter(|obj| obj.obj_type == ObjectType::Index)
Expand Down
11 changes: 7 additions & 4 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob};
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, sink, streaming_job, ActorId, ConnectorSplits, ExprContext,
FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId, StreamNode,
TableId, VnodeBitmap, WorkerId,
actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits,
ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId,
StreamNode, TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::ddl_service::PbTableJobType;
Expand Down Expand Up @@ -60,13 +60,16 @@ use crate::stream::SplitAssignment;
use crate::MetaResult;

impl CatalogControllerInner {
/// List all fragment vnode mapping info
/// List all fragment vnode mapping info for all CREATED streaming jobs.
pub async fn all_running_fragment_mappings(
&self,
) -> MetaResult<impl Iterator<Item = FragmentParallelUnitMapping> + '_> {
let fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find()
.join(JoinType::InnerJoin, fragment::Relation::Object.def())
.join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
.select_only()
.columns([fragment::Column::FragmentId, fragment::Column::VnodeMapping])
.filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
.into_tuple()
.all(&self.db)
.await?;
Expand Down
103 changes: 103 additions & 0 deletions src/meta/src/controller/id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use risingwave_meta_model_v2::prelude::{Actor, Fragment};
use risingwave_meta_model_v2::{actor, fragment};
use sea_orm::sea_query::{Expr, Func};
use sea_orm::{DatabaseConnection, EntityTrait, QuerySelect};

use crate::manager::{IdCategory, IdCategoryType};
use crate::MetaResult;

pub struct IdGenerator<const TYPE: IdCategoryType>(AtomicU64);

impl<const TYPE: IdCategoryType> IdGenerator<TYPE> {
pub async fn new(conn: &DatabaseConnection) -> MetaResult<Self> {
let id: i32 = match TYPE {
IdCategory::Table => {
// Since we are using object pk to generate id for tables, here we just implement a dummy
// id generator and refill it later when inserting the table.
0
}
IdCategory::Fragment => Fragment::find()
.select_only()
.expr(Func::if_null(
Expr::col(fragment::Column::FragmentId).max().add(1),
0,
))
.into_tuple()
.one(conn)
.await?
.unwrap_or_default(),
IdCategory::Actor => Actor::find()
.select_only()
.expr(Func::if_null(
Expr::col(actor::Column::ActorId).max().add(1),
0,
))
.into_tuple()
.one(conn)
.await?
.unwrap_or_default(),
_ => unreachable!("IdGeneratorV2 only supports Table, Fragment, and Actor"),
};

Ok(Self(AtomicU64::new(id as u64)))
}

pub fn generate_interval(&self, interval: u64) -> u64 {
self.0.fetch_add(interval, Ordering::Relaxed)
}
}

pub type IdGeneratorManagerRef = Arc<IdGeneratorManager>;

/// `IdGeneratorManager` is a manager for three id generators: `tables`, `fragments`, and `actors`. Note that this is just a
/// workaround for the current implementation of `IdGenerator`. We should refactor it later.
pub struct IdGeneratorManager {
pub tables: Arc<IdGenerator<{ IdCategory::Table }>>,
pub fragments: Arc<IdGenerator<{ IdCategory::Fragment }>>,
pub actors: Arc<IdGenerator<{ IdCategory::Actor }>>,
}

impl IdGeneratorManager {
pub async fn new(conn: &DatabaseConnection) -> MetaResult<Self> {
Ok(Self {
tables: Arc::new(IdGenerator::new(conn).await?),
fragments: Arc::new(IdGenerator::new(conn).await?),
actors: Arc::new(IdGenerator::new(conn).await?),
})
}

pub fn generate<const C: IdCategoryType>(&self) -> u64 {
match C {
IdCategory::Table => self.tables.generate_interval(1),
IdCategory::Fragment => self.fragments.generate_interval(1),
IdCategory::Actor => self.actors.generate_interval(1),
_ => unreachable!("IdGeneratorV2 only supports Table, Fragment, and Actor"),
}
}

pub fn generate_interval<const C: IdCategoryType>(&self, interval: u64) -> u64 {
match C {
IdCategory::Table => self.tables.generate_interval(interval),
IdCategory::Fragment => self.fragments.generate_interval(interval),
IdCategory::Actor => self.actors.generate_interval(interval),
_ => unreachable!("IdGeneratorV2 only supports Table, Fragment, and Actor"),
}
}
}
2 changes: 2 additions & 0 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use crate::MetaError;
pub mod catalog;
pub mod cluster;
pub mod fragment;
pub mod id;
pub mod rename;
pub mod streaming_job;
pub mod system_param;
pub mod user;
pub mod utils;
Expand Down
Loading

0 comments on commit 06d4728

Please sign in to comment.