Skip to content

Commit

Permalink
feat: refine sink into table functionalities (#15160)
Browse files Browse the repository at this point in the history
Co-authored-by: August <[email protected]>
  • Loading branch information
shanicky and yezizp2012 authored Feb 23, 2024
1 parent b95d9a9 commit bd29148
Show file tree
Hide file tree
Showing 4 changed files with 423 additions and 23 deletions.
276 changes: 255 additions & 21 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ use std::sync::Arc;
use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS};
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
use risingwave_common::{bail, current_cluster_version};
use risingwave_meta_model_v2::fragment::StreamNode;
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::*;
use risingwave_meta_model_v2::table::TableType;
use risingwave_meta_model_v2::{
connection, database, function, index, object, object_dependency, schema, sink, source,
streaming_job, table, user_privilege, view, ActorId, ColumnCatalogArray, ConnectionId,
CreateType, DatabaseId, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
PrivateLinkService, SchemaId, SourceId, StreamSourceInfo, StreamingParallelism, TableId,
UserId,
actor, connection, database, fragment, function, index, object, object_dependency, schema,
sink, source, streaming_job, table, user_privilege, view, ActorId, ActorUpstreamActors,
ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array,
IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SourceId, StreamSourceInfo,
StreamingParallelism, TableId, UserId,
};
use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::{
Expand All @@ -41,6 +43,8 @@ use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
};
use risingwave_pb::meta::{PbRelation, PbRelationGroup};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::FragmentTypeFlag;
use risingwave_pb::user::PbUserInfo;
use sea_orm::sea_query::{Expr, SimpleExpr};
use sea_orm::ActiveValue::Set;
Expand Down Expand Up @@ -423,6 +427,7 @@ impl CatalogController {
pub async fn clean_dirty_creating_jobs(&self) -> MetaResult<ReleaseContext> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

let creating_job_ids: Vec<ObjectId> = streaming_job::Entity::find()
.select_only()
.column(streaming_job::Column::JobId)
Expand All @@ -436,7 +441,14 @@ impl CatalogController {
.into_tuple()
.all(&txn)
.await?;

let changed = Self::clean_dirty_sink_downstreams(&txn).await?;

if creating_job_ids.is_empty() {
if changed {
txn.commit().await?;
}

return Ok(ReleaseContext::default());
}

Expand Down Expand Up @@ -476,6 +488,7 @@ impl CatalogController {
.exec(&txn)
.await?;
assert!(res.rows_affected > 0);

txn.commit().await?;

Ok(ReleaseContext {
Expand All @@ -485,6 +498,175 @@ impl CatalogController {
})
}

async fn clean_dirty_sink_downstreams(txn: &DatabaseTransaction) -> MetaResult<bool> {
// clean incoming sink from (table)
// clean upstream fragment ids from (fragment)
// clean stream node from (fragment)
// clean upstream actor ids from (actor)
let all_fragment_ids: Vec<FragmentId> = Fragment::find()
.select_only()
.columns(vec![fragment::Column::FragmentId])
.into_tuple()
.all(txn)
.await?;

let all_fragment_ids: HashSet<_> = all_fragment_ids.into_iter().collect();

let table_sink_ids: Vec<ObjectId> = Sink::find()
.select_only()
.column(sink::Column::SinkId)
.filter(sink::Column::TargetTable.is_not_null())
.into_tuple()
.all(txn)
.await?;

let all_table_with_incoming_sinks: Vec<(ObjectId, I32Array)> = Table::find()
.select_only()
.columns(vec![table::Column::TableId, table::Column::IncomingSinks])
.into_tuple()
.all(txn)
.await?;

let table_incoming_sinks_to_update = all_table_with_incoming_sinks
.into_iter()
.filter(|(_, incoming_sinks)| {
let inner_ref = incoming_sinks.inner_ref();
!inner_ref.is_empty()
&& inner_ref
.iter()
.any(|sink_id| !table_sink_ids.contains(sink_id))
})
.collect_vec();

let new_table_incoming_sinks = table_incoming_sinks_to_update
.into_iter()
.map(|(table_id, incoming_sinks)| {
let new_incoming_sinks = incoming_sinks
.into_inner()
.extract_if(|id| table_sink_ids.contains(id))
.collect_vec();
(table_id, I32Array::from(new_incoming_sinks))
})
.collect_vec();

// no need to update, returning
if new_table_incoming_sinks.is_empty() {
return Ok(false);
}

for (table_id, new_incoming_sinks) in new_table_incoming_sinks {
tracing::info!("cleaning dirty table sink downstream table {}", table_id);
Table::update_many()
.col_expr(table::Column::IncomingSinks, new_incoming_sinks.into())
.filter(table::Column::TableId.eq(table_id))
.exec(txn)
.await?;

let fragments: Vec<(FragmentId, I32Array, StreamNode, i32)> = Fragment::find()
.select_only()
.columns(vec![
fragment::Column::FragmentId,
fragment::Column::UpstreamFragmentId,
fragment::Column::StreamNode,
fragment::Column::FragmentTypeMask,
])
.filter(fragment::Column::JobId.eq(table_id))
.into_tuple()
.all(txn)
.await?;

for (fragment_id, upstream_fragment_ids, stream_node, fragment_mask) in fragments {
let mut upstream_fragment_ids = upstream_fragment_ids.into_inner();

let dirty_upstream_fragment_ids = upstream_fragment_ids
.extract_if(|id| !all_fragment_ids.contains(id))
.collect_vec();

if !dirty_upstream_fragment_ids.is_empty() {
// dirty downstream should be materialize fragment of table
assert!(fragment_mask & FragmentTypeFlag::Mview as i32 > 0);

tracing::info!(
"cleaning dirty table sink fragment {:?} from downstream fragment {}",
dirty_upstream_fragment_ids,
fragment_id
);

let mut pb_stream_node = stream_node.to_protobuf();

visit_stream_node_cont(&mut pb_stream_node, |node| {
if let Some(NodeBody::Union(_)) = node.node_body {
node.input.retain_mut(|input| {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body
&& all_fragment_ids
.contains(&(merge_node.upstream_fragment_id as i32))
{
true
} else {
false
}
});
}
true
});

Fragment::update_many()
.col_expr(
fragment::Column::UpstreamFragmentId,
I32Array::from(upstream_fragment_ids).into(),
)
.col_expr(
fragment::Column::StreamNode,
StreamNode::from_protobuf(&pb_stream_node).into(),
)
.filter(fragment::Column::FragmentId.eq(fragment_id))
.exec(txn)
.await?;

let actors: Vec<(ActorId, ActorUpstreamActors)> = Actor::find()
.select_only()
.columns(vec![
actor::Column::ActorId,
actor::Column::UpstreamActorIds,
])
.filter(actor::Column::FragmentId.eq(fragment_id))
.into_tuple()
.all(txn)
.await?;

for (actor_id, upstream_actor_ids) in actors {
let mut upstream_actor_ids = upstream_actor_ids.into_inner();

let dirty_actor_upstreams = upstream_actor_ids
.extract_if(|id, _| !all_fragment_ids.contains(id))
.map(|(id, _)| id)
.collect_vec();

if !dirty_actor_upstreams.is_empty() {
tracing::debug!(
"cleaning dirty table sink fragment {:?} from downstream fragment {} actor {}",
dirty_actor_upstreams,
fragment_id,
actor_id,
);

Actor::update_many()
.col_expr(
actor::Column::UpstreamActorIds,
ActorUpstreamActors::from(upstream_actor_ids).into(),
)
.filter(actor::Column::ActorId.eq(actor_id))
.exec(txn)
.await?;
}
}
}
}
}

Ok(true)
}

/// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
pub async fn finish_streaming_job(&self, job_id: ObjectId) -> MetaResult<NotificationVersion> {
let inner = self.inner.write().await;
Expand Down Expand Up @@ -1487,6 +1669,52 @@ impl CatalogController {
);
to_drop_objects.push(obj);

// Special handling for 'sink into table'.
if object_type != ObjectType::Sink {
// When dropping a table downstream, all incoming sinks of the table should be dropped as well.
if object_type == ObjectType::Table {
let table = Table::find_by_id(object_id)
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;

let incoming_sinks = table.incoming_sinks.into_inner();

if !incoming_sinks.is_empty() {
let objs: Vec<PartialObject> = Object::find()
.filter(object::Column::Oid.is_in(incoming_sinks))
.into_partial_model()
.all(&txn)
.await?;

to_drop_objects.extend(objs);
}
}

let to_drop_object_ids: HashSet<_> =
to_drop_objects.iter().map(|obj| obj.oid).collect();

// When there is a table sink in the dependency chain of drop cascade, an error message needs to be returned currently to manually drop the sink.
for obj in &to_drop_objects {
if obj.obj_type == ObjectType::Sink {
let sink = Sink::find_by_id(obj.oid)
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;

// Since dropping the sink into the table requires the frontend to handle some of the logic (regenerating the plan), it’s not compatible with the current cascade dropping.
if let Some(target_table) = sink.target_table
&& !to_drop_object_ids.contains(&target_table)
{
bail!(
"Found sink into table with sink id {} in dependency, please drop them manually",
obj.oid,
);
}
}
}
}

let to_drop_table_ids = to_drop_objects
.iter()
.filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
Expand Down Expand Up @@ -1856,22 +2084,28 @@ impl CatalogController {
});
}};
}
let objs = get_referring_objects(object_id, &txn).await?;
// TODO: For sink into table. when sink into table is ready.
// if object_type == ObjectType::Table {
// let incoming_sinks: Vec<_> = Table::find_by_id(object_id)
// .select_only()
// .column(table::Column::IncomingSinks)
// .into_tuple()
// .one(&txn)
// .await?
// .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
// objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
// oid: id as _,
// obj_type: ObjectType::Sink,
// ..Default::default()
// }));
// }
let mut objs = get_referring_objects(object_id, &txn).await?;
if object_type == ObjectType::Table {
let incoming_sinks: I32Array = Table::find_by_id(object_id)
.select_only()
.column(table::Column::IncomingSinks)
.into_tuple()
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;

objs.extend(
incoming_sinks
.into_inner()
.into_iter()
.map(|id| PartialObject {
oid: id,
obj_type: ObjectType::Sink,
schema_id: None,
database_id: None,
}),
);
}

for obj in objs {
match obj.obj_type {
Expand Down
19 changes: 19 additions & 0 deletions src/meta/src/controller/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str
stmt:
CreateSinkStatement {
sink_from: CreateSink::AsQuery(query),
into_table_name: None,
..
},
} => {
Expand All @@ -89,9 +90,27 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str
stmt:
CreateSinkStatement {
sink_from: CreateSink::From(table_name),
into_table_name: None,
..
},
} => replace_table_name(table_name, to),
Statement::CreateSink {
stmt: CreateSinkStatement {
sink_from,
into_table_name: Some(table_name),
..
}
} => {
let idx = table_name.0.len() - 1;
if table_name.0[idx].real_value() == from {
table_name.0[idx] = Ident::new_unchecked(to);
} else {
match sink_from {
CreateSink::From(table_name) => replace_table_name(table_name, to),
CreateSink::AsQuery(query) => QueryRewriter::rewrite_query(query, from, to),
}
}
}
_ => unreachable!(),
};
stmt.to_string()
Expand Down
Loading

0 comments on commit bd29148

Please sign in to comment.