Skip to content

Commit

Permalink
record dependencies upon UDFs for sinks
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 15, 2024
1 parent e1e71d8 commit 8323c2d
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 14 deletions.
2 changes: 2 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ message CreateSinkRequest {
stream_plan.StreamFragmentGraph fragment_graph = 2;
// It is used to provide a replace plan for the downstream table in `create sink into table` requests.
optional ReplaceTablePlan affected_table_change = 3;
// The list of object IDs that this materialized view depends on.
repeated uint32 dependencies = 4;
}

message CreateSinkResponse {
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ impl SinkDesc {
database_id: DatabaseId,
owner: UserId,
connection_id: Option<ConnectionId>,
dependent_relations: Vec<TableId>,
) -> SinkCatalog {
SinkCatalog {
id: self.id,
Expand All @@ -104,7 +103,7 @@ impl SinkDesc {
downstream_pk: self.downstream_pk,
distribution_key: self.distribution_key,
owner,
dependent_relations,
dependent_relations: vec![], // TODO(rc): to be deprecated
properties: self.properties,
secret_refs: self.secret_refs,
sink_type: self.sink_type,
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub trait CatalogWriter: Send + Sync {
sink: PbSink,
graph: StreamFragmentGraph,
affected_table_change: Option<PbReplaceTablePlan>,
dependencies: HashSet<ObjectId>,
) -> Result<()>;

async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
Expand Down Expand Up @@ -319,10 +320,11 @@ impl CatalogWriter for CatalogWriterImpl {
sink: PbSink,
graph: StreamFragmentGraph,
affected_table_change: Option<ReplaceTablePlan>,
dependencies: HashSet<ObjectId>,
) -> Result<()> {
let version = self
.meta_client
.create_sink(sink, graph, affected_table_change)
.create_sink(sink, graph, affected_table_change, dependencies)
.await?;
self.wait_version(version).await
}
Expand Down
39 changes: 30 additions & 9 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::{ColumnCatalog, DatabaseId, Schema, SchemaId, TableId, UserId};
use risingwave_common::catalog::{
ColumnCatalog, DatabaseId, ObjectId, Schema, SchemaId, TableId, UserId,
};
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_common::{bail, catalog};
Expand Down Expand Up @@ -75,6 +77,7 @@ pub struct SinkPlanContext {
pub sink_plan: PlanRef,
pub sink_catalog: SinkCatalog,
pub target_table_catalog: Option<Arc<TableCatalog>>,
pub dependencies: HashSet<ObjectId>,
}

pub async fn gen_sink_plan(
Expand Down Expand Up @@ -125,10 +128,14 @@ pub async fn gen_sink_plan(
let (sink_database_id, sink_schema_id) =
session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;

let (dependent_relations, bound) = {
let (dependent_relations, dependent_udfs, bound) = {
let mut binder = Binder::new_for_stream(session);
let bound = binder.bind_query(*query.clone())?;
(binder.included_relations().clone(), bound)
(
binder.included_relations().clone(),
binder.included_udfs().clone(),
bound,
)
};

let check_items = resolve_query_privileges(&bound);
Expand Down Expand Up @@ -253,15 +260,22 @@ pub async fn gen_sink_plan(
ctx.trace(sink_plan.explain_to_string());
}

let dependent_relations =
RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone());
let dependencies =
RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
.into_iter()
.map(|id| id.table_id() as ObjectId)
.chain(
dependent_udfs
.into_iter()
.map(|id| id.function_id() as ObjectId),
)
.collect();

let sink_catalog = sink_desc.into_catalog(
SchemaId::new(sink_schema_id),
DatabaseId::new(sink_database_id),
UserId::new(session.user_id()),
None, // deprecated: private link connection id
dependent_relations.into_iter().collect_vec(),
);

if let Some(table_catalog) = &target_table_catalog {
Expand Down Expand Up @@ -314,6 +328,7 @@ pub async fn gen_sink_plan(
sink_plan,
sink_catalog,
target_table_catalog,
dependencies,
})
}

Expand Down Expand Up @@ -428,12 +443,13 @@ pub async fn handle_create_sink(
return Ok(resp);
}

let (mut sink, graph, target_table_catalog) = {
let (mut sink, graph, target_table_catalog, dependencies) = {
let SinkPlanContext {
query,
sink_plan: plan,
sink_catalog: sink,
target_table_catalog,
dependencies,
} = gen_sink_plan(handle_args, stmt, None).await?;

let has_order_by = !query.order_by.is_empty();
Expand All @@ -446,7 +462,7 @@ pub async fn handle_create_sink(

let graph = build_graph(plan)?;

(sink, graph, target_table_catalog)
(sink, graph, target_table_catalog, dependencies)
};

let mut target_table_replace_plan = None;
Expand Down Expand Up @@ -506,7 +522,12 @@ pub async fn handle_create_sink(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_sink(sink.to_proto(), graph, target_table_replace_plan)
.create_sink(
sink.to_proto(),
graph,
target_table_replace_plan,
dependencies,
)
.await?;

Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl CatalogWriter for MockCatalogWriter {
sink: PbSink,
graph: StreamFragmentGraph,
_affected_table_change: Option<ReplaceTablePlan>,
_dependencies: HashSet<ObjectId>,
) -> Result<()> {
self.create_sink_inner(sink, graph)
}
Expand Down
14 changes: 12 additions & 2 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
use risingwave_meta::rpc::metrics::MetaMetrics;
use risingwave_meta_model::ObjectId;
use risingwave_pb::catalog::{Comment, CreateType, Secret, Table};
use risingwave_pb::common::worker_node::State;
use risingwave_pb::common::WorkerType;
Expand Down Expand Up @@ -281,6 +282,11 @@ impl DdlService for DdlServiceImpl {
let sink = req.get_sink()?.clone();
let fragment_graph = req.get_fragment_graph()?.clone();
let affected_table_change = req.get_affected_table_change().cloned().ok();
let dependencies = req
.get_dependencies()
.iter()
.map(|id| *id as ObjectId)
.collect();

let stream_job = match &affected_table_change {
None => StreamingJob::Sink(sink, None),
Expand All @@ -296,7 +302,7 @@ impl DdlService for DdlServiceImpl {
fragment_graph,
CreateType::Foreground,
affected_table_change.map(Self::extract_replace_table_info),
HashSet::new(), // TODO(rc): pass dependencies through this field instead of `PbSink`
dependencies,
);

let version = self.ddl_controller.run_command(command).await?;
Expand Down Expand Up @@ -382,7 +388,11 @@ impl DdlService for DdlServiceImpl {
let mview = req.get_materialized_view()?.clone();
let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground);
let fragment_graph = req.get_fragment_graph()?.clone();
let dependencies = req.get_dependencies().iter().map(|id| *id as i32).collect();
let dependencies = req
.get_dependencies()
.iter()
.map(|id| *id as ObjectId)
.collect();

let stream_job = StreamingJob::MaterializedView(mview);
let version = self
Expand Down
2 changes: 2 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,13 @@ impl MetaClient {
sink: PbSink,
graph: StreamFragmentGraph,
affected_table_change: Option<ReplaceTablePlan>,
dependencies: HashSet<ObjectId>,
) -> Result<WaitVersion> {
let request = CreateSinkRequest {
sink: Some(sink),
fragment_graph: Some(graph),
affected_table_change,
dependencies: dependencies.into_iter().collect(),
};

let resp = self.inner.create_sink(request).await?;
Expand Down

0 comments on commit 8323c2d

Please sign in to comment.