Skip to content

Commit

Permalink
feat(meta): dependency check for drop function (#19399)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
  • Loading branch information
stdrc and BugenZhao authored Nov 21, 2024
1 parent cfa521d commit 37e108c
Show file tree
Hide file tree
Showing 23 changed files with 228 additions and 103 deletions.
6 changes: 3 additions & 3 deletions e2e_test/slow_tests/udf/always_retry_python.slt
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ system ok
pkill -i python

statement ok
DROP FUNCTION sleep_always_retry;
DROP TABLE t CASCADE;

statement ok
DROP FUNCTION sleep_no_retry;
DROP FUNCTION sleep_always_retry;

statement ok
DROP TABLE t CASCADE;
DROP FUNCTION sleep_no_retry;
44 changes: 44 additions & 0 deletions e2e_test/udf/drop_function.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# https://github.com/risingwavelabs/risingwave/issues/17263

statement ok
create table t (a int, b int);

statement ok
create function add(a int, b int) returns int language python as $$
def add(a, b):
return a+b
$$;

statement ok
create materialized view mv as select add(a, b) as c from t;

statement error function used by 1 other objects
drop function add;

statement ok
drop materialized view mv;

statement ok
drop function add;


statement ok
create function add(a int, b int) returns int language python as $$
def add(a, b):
return a+b
$$;

statement ok
create sink s as select add(a, b) as c from t with (connector = 'blackhole');

statement error function used by 1 other objects
drop function add;

statement ok
drop sink s;

statement ok
drop function add;

statement ok
drop table t;
4 changes: 2 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ message Sink {
repeated plan_common.ColumnCatalog columns = 5;
// Primary key derived from the SQL by the frontend.
repeated common.ColumnOrder plan_pk = 6;
repeated uint32 dependent_relations = 7;
repeated uint32 dependent_relations = 7 [deprecated = true];
repeated int32 distribution_key = 8;
// User-defined primary key indices for the upsert sink.
repeated int32 downstream_pk = 9;
Expand Down Expand Up @@ -342,7 +342,7 @@ message Table {
repeated plan_common.ColumnCatalog columns = 5;
repeated common.ColumnOrder pk = 6;
// For cdc table created from a cdc source, here records the source id.
repeated uint32 dependent_relations = 8;
repeated uint32 dependent_relations = 8; // TODO(rc): deprecate this by passing dependencies via `Request` message
oneof optional_associated_source_id {
uint32 associated_source_id = 9;
}
Expand Down
5 changes: 5 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ message CreateSinkRequest {
stream_plan.StreamFragmentGraph fragment_graph = 2;
// It is used to provide a replace plan for the downstream table in `create sink into table` requests.
optional ReplaceTablePlan affected_table_change = 3;
// The list of object IDs that this sink depends on.
repeated uint32 dependencies = 4;
}

message CreateSinkResponse {
Expand Down Expand Up @@ -136,6 +138,9 @@ message CreateMaterializedViewRequest {
SERVERLESS = 2;
}
BackfillType backfill = 3;

// The list of object IDs that this materialized view depends on.
repeated uint32 dependencies = 4;
}

message CreateMaterializedViewResponse {
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ pub trait SysCatalogReader: Sync + Send + 'static {

pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;

pub type ObjectId = u32;

#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)]
#[display("{database_id}")]
pub struct DatabaseId {
Expand Down
2 changes: 0 additions & 2 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ impl SinkDesc {
database_id: DatabaseId,
owner: UserId,
connection_id: Option<ConnectionId>,
dependent_relations: Vec<TableId>,
) -> SinkCatalog {
SinkCatalog {
id: self.id,
Expand All @@ -104,7 +103,6 @@ impl SinkDesc {
downstream_pk: self.downstream_pk,
distribution_key: self.distribution_key,
owner,
dependent_relations,
properties: self.properties,
secret_refs: self.secret_refs,
sink_type: self.sink_type,
Expand Down
15 changes: 2 additions & 13 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,6 @@ pub struct SinkCatalog {
/// Owner of the sink.
pub owner: UserId,

// Relations on which the sink depends.
pub dependent_relations: Vec<TableId>,

// The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
// based on both its own derivation on the append-only attribute and other user-specified
// options in `properties`.
Expand Down Expand Up @@ -382,6 +379,7 @@ pub struct SinkCatalog {

impl SinkCatalog {
pub fn to_proto(&self) -> PbSink {
#[allow(deprecated)] // for `dependent_relations`
PbSink {
id: self.id.into(),
schema_id: self.schema_id.schema_id,
Expand All @@ -395,11 +393,7 @@ impl SinkCatalog {
.iter()
.map(|idx| *idx as i32)
.collect_vec(),
dependent_relations: self
.dependent_relations
.iter()
.map(|id| id.table_id)
.collect_vec(),
dependent_relations: vec![],
distribution_key: self
.distribution_key
.iter()
Expand Down Expand Up @@ -507,11 +501,6 @@ impl From<PbSink> for SinkCatalog {
.collect_vec(),
properties: pb.properties,
owner: pb.owner.into(),
dependent_relations: pb
.dependent_relations
.into_iter()
.map(TableId::from)
.collect_vec(),
sink_type: SinkType::from_proto(sink_type),
format_desc,
connection_id: pb.connection_id.map(ConnectionId),
Expand Down
13 changes: 12 additions & 1 deletion src/frontend/src/binder/expr/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;

Expand Down Expand Up @@ -154,6 +154,8 @@ impl Binder {
.flatten_ok()
.try_collect()?;

let mut referred_udfs = HashSet::new();

let wrapped_agg_type = if scalar_as_agg {
// Let's firstly try to apply the `AGGREGATE:` prefix.
// We will reject functions that are not able to be wrapped as aggregate function.
Expand All @@ -167,12 +169,16 @@ impl Binder {
let scalar_func_expr = if let Ok(schema) = self.first_valid_schema()
&& let Some(func) = schema.get_function_by_name_inputs(&func_name, &mut array_args)
{
// record the dependency upon the UDF
referred_udfs.insert(func.id);

if !func.kind.is_scalar() {
return Err(ErrorCode::InvalidInputSyntax(
"expect a scalar function after `AGGREGATE:`".to_string(),
)
.into());
}

if func.language == "sql" {
self.bind_sql_udf(func.clone(), array_args)?
} else {
Expand All @@ -194,6 +200,9 @@ impl Binder {
.get_function_by_name_inputs(&func_name, &mut args)
.cloned()
{
// record the dependency upon the UDF
referred_udfs.insert(func.id);

if func.language == "sql" {
let name = format!("SQL user-defined function `{}`", func.name);
reject_syntax!(
Expand Down Expand Up @@ -228,6 +237,8 @@ impl Binder {
None
};

self.included_udfs.extend(referred_udfs);

let agg_type = if wrapped_agg_type.is_some() {
wrapped_agg_type
} else if let Some(ref udf) = udf
Expand Down
16 changes: 13 additions & 3 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::catalog::FunctionId;
use risingwave_common::session_config::{SearchPath, SessionConfig};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
Expand Down Expand Up @@ -121,6 +122,9 @@ pub struct Binder {
/// The included relations while binding a query.
included_relations: HashSet<TableId>,

/// The included user-defined functions while binding a query.
included_udfs: HashSet<FunctionId>,

param_types: ParameterTypes,

/// The sql udf context that will be used during binding phase
Expand Down Expand Up @@ -324,6 +328,7 @@ impl Binder {
bind_for,
shared_views: HashMap::new(),
included_relations: HashSet::new(),
included_udfs: HashSet::new(),
param_types: ParameterTypes::new(param_types),
udf_context: UdfContext::new(),
temporary_source_manager: session.temporary_source_manager(),
Expand Down Expand Up @@ -382,13 +387,18 @@ impl Binder {
self.param_types.export()
}

/// Returns included relations in the query after binding. This is used for resolving relation
/// Get included relations in the query after binding. This is used for resolving relation
/// dependencies. Note that it only contains referenced relations discovered during binding.
/// After the plan is built, the referenced relations may be changed. We cannot rely on the
/// collection result of plan, because we still need to record the dependencies that have been
/// optimised away.
pub fn included_relations(&self) -> HashSet<TableId> {
self.included_relations.clone()
pub fn included_relations(&self) -> &HashSet<TableId> {
&self.included_relations
}

/// Get included user-defined functions in the query after binding.
pub fn included_udfs(&self) -> &HashSet<FunctionId> {
&self.included_udfs
}

fn push_context(&mut self) {
Expand Down
11 changes: 8 additions & 3 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use anyhow::anyhow;
use parking_lot::lock_api::ArcRwLockReadGuard;
use parking_lot::{RawRwLock, RwLock};
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId};
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, ObjectId};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::catalog::{
Expand Down Expand Up @@ -78,6 +79,7 @@ pub trait CatalogWriter: Send + Sync {
&self,
table: PbTable,
graph: StreamFragmentGraph,
dependencies: HashSet<ObjectId>,
) -> Result<()>;

async fn create_table(
Expand Down Expand Up @@ -115,6 +117,7 @@ pub trait CatalogWriter: Send + Sync {
sink: PbSink,
graph: StreamFragmentGraph,
affected_table_change: Option<PbReplaceTablePlan>,
dependencies: HashSet<ObjectId>,
) -> Result<()>;

async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
Expand Down Expand Up @@ -246,11 +249,12 @@ impl CatalogWriter for CatalogWriterImpl {
&self,
table: PbTable,
graph: StreamFragmentGraph,
dependencies: HashSet<ObjectId>,
) -> Result<()> {
let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
let version = self
.meta_client
.create_materialized_view(table, graph)
.create_materialized_view(table, graph, dependencies)
.await?;
if matches!(create_type, PbCreateType::Foreground) {
self.wait_version(version).await?
Expand Down Expand Up @@ -316,10 +320,11 @@ impl CatalogWriter for CatalogWriterImpl {
sink: PbSink,
graph: StreamFragmentGraph,
affected_table_change: Option<ReplaceTablePlan>,
dependencies: HashSet<ObjectId>,
) -> Result<()> {
let version = self
.meta_client
.create_sink(sink, graph, affected_table_change)
.create_sink(sink, graph, affected_table_change, dependencies)
.await?;
self.wait_version(version).await
}
Expand Down
Loading

0 comments on commit 37e108c

Please sign in to comment.