Skip to content

Commit

Permalink
collect object dependencies for MV
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 15, 2024
1 parent c29f09e commit 695953b
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 42 deletions.
3 changes: 3 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ message CreateMaterializedViewRequest {
SERVERLESS = 2;
}
BackfillType backfill = 3;

// The list of object IDs that this materialized view depends on.
repeated uint32 dependencies = 4;
}

message CreateMaterializedViewResponse {
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ pub trait SysCatalogReader: Sync + Send + 'static {

pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;

pub type ObjectId = u32;

#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)]
#[display("{database_id}")]
pub struct DatabaseId {
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use anyhow::anyhow;
use parking_lot::lock_api::ArcRwLockReadGuard;
use parking_lot::{RawRwLock, RwLock};
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId};
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, ObjectId};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::catalog::{
Expand Down Expand Up @@ -78,6 +79,7 @@ pub trait CatalogWriter: Send + Sync {
&self,
table: PbTable,
graph: StreamFragmentGraph,
dependencies: HashSet<ObjectId>,
) -> Result<()>;

async fn create_table(
Expand Down Expand Up @@ -246,11 +248,12 @@ impl CatalogWriter for CatalogWriterImpl {
&self,
table: PbTable,
graph: StreamFragmentGraph,
dependencies: HashSet<ObjectId>,
) -> Result<()> {
let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
let version = self
.meta_client
.create_materialized_view(table, graph)
.create_materialized_view(table, graph, dependencies)
.await?;
if matches!(create_type, PbCreateType::Foreground) {
self.wait_version(version).await?
Expand Down
59 changes: 26 additions & 33 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use either::Either;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::acl::AclMode;
use risingwave_common::catalog::TableId;
use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
use risingwave_pb::catalog::PbTable;
use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};

Expand Down Expand Up @@ -93,23 +93,14 @@ pub fn gen_create_mv_plan(
) -> Result<(PlanRef, PbTable)> {
let mut binder = Binder::new_for_stream(session);
let bound = binder.bind_query(query)?;
gen_create_mv_plan_bound(
session,
context,
bound,
binder.included_relations().clone(),
name,
columns,
emit_mode,
)
gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
}

/// Generate create MV plan from a bound query
pub fn gen_create_mv_plan_bound(
session: &SessionImpl,
context: OptimizerContextRef,
query: BoundQuery,
dependent_relations: HashSet<TableId>,
name: ObjectName,
columns: Vec<Ident>,
emit_mode: Option<EmitMode>,
Expand Down Expand Up @@ -147,17 +138,9 @@ pub fn gen_create_mv_plan_bound(
let mut table = materialize.table().to_prost(schema_id, database_id);

let plan: PlanRef = materialize.into();
let dependent_relations =
RelationCollectorVisitor::collect_with(dependent_relations, plan.clone());

table.owner = session.user_id();

// record dependent relations.
table.dependent_relations = dependent_relations
.into_iter()
.map(|t| t.table_id)
.collect_vec();

let ctx = plan.ctx();
let explain_trace = ctx.is_explain_trace();
if explain_trace {
Expand All @@ -176,17 +159,22 @@ pub async fn handle_create_mv(
columns: Vec<Ident>,
emit_mode: Option<EmitMode>,
) -> Result<RwPgResponse> {
let (dependent_relations, bound) = {
let (dependent_relations, dependent_udfs, bound) = {
let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
let bound = binder.bind_query(query)?;
(binder.included_relations().clone(), bound)
(
binder.included_relations().clone(),
binder.included_udfs().clone(),
bound,
)
};
handle_create_mv_bound(
handler_args,
if_not_exists,
name,
bound,
dependent_relations,
dependent_udfs,
columns,
emit_mode,
)
Expand All @@ -199,6 +187,7 @@ pub async fn handle_create_mv_bound(
name: ObjectName,
query: BoundQuery,
dependent_relations: HashSet<TableId>,
dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
columns: Vec<Ident>,
emit_mode: Option<EmitMode>,
) -> Result<RwPgResponse> {
Expand All @@ -215,7 +204,7 @@ pub async fn handle_create_mv_bound(
return Ok(resp);
}

let (table, graph) = {
let (table, graph, dependencies) = {
let context = OptimizerContext::from_handler_args(handler_args);
if !context.with_options().is_empty() {
// get other useful fields by `remove`, the logic here is to reject unknown options.
Expand All @@ -232,19 +221,23 @@ It only indicates the physical clustering of the data, which may improve the per
"#.to_string());
}

let (plan, table) = gen_create_mv_plan_bound(
&session,
context.into(),
query,
dependent_relations,
name,
columns,
emit_mode,
)?;
let (plan, table) =
gen_create_mv_plan_bound(&session, context.into(), query, name, columns, emit_mode)?;

let dependencies =
RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
.into_iter()
.map(|id| id.table_id() as ObjectId)
.chain(
dependent_udfs
.into_iter()
.map(|id| id.function_id() as ObjectId),
)
.collect();

let graph = build_graph(plan)?;

(table, graph)
(table, graph, dependencies)
};

// Ensure writes to `StreamJobTracker` are atomic.
Expand All @@ -262,7 +255,7 @@ It only indicates the physical clustering of the data, which may improve the per
let session = session.clone();
let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_materialized_view(table, graph)
.create_materialized_view(table, graph, dependencies)
.await?;

Ok(PgResponse::empty_result(
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/extended_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ pub fn handle_bind(
bound,
param_types,
dependent_relations,
dependent_udfs,
..
} = bound_result;

Expand All @@ -176,6 +177,7 @@ pub fn handle_bind(
param_types,
parsed_params: Some(parsed_params),
dependent_relations,
dependent_udfs,
bound: new_bound,
};
Ok(Portal::Portal(PortalResult {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/fetch_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ pub async fn handle_parse(
param_types: binder.export_param_types()?,
parsed_params: None,
dependent_relations: binder.included_relations().clone(),
dependent_udfs: binder.included_udfs().clone(),
};
let result = PreparedResult {
statement,
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::Format;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::Schema;
use risingwave_common::catalog::{FunctionId, Schema};
use risingwave_common::session_config::QueryMode;
use risingwave_common::types::{DataType, Datum};
use risingwave_sqlparser::ast::{SetExpr, Statement};
Expand Down Expand Up @@ -109,6 +109,7 @@ pub async fn handle_execute(
let BoundResult {
bound,
dependent_relations,
dependent_udfs,
..
} = bound_result;
let create_mv = if let BoundStatement::CreateView(create_mv) = bound {
Expand Down Expand Up @@ -144,6 +145,7 @@ pub async fn handle_execute(
name,
*query,
dependent_relations,
dependent_udfs,
columns,
emit_mode,
)
Expand All @@ -170,6 +172,8 @@ pub struct BoundResult {
pub(crate) param_types: Vec<DataType>,
pub(crate) parsed_params: Option<Vec<Datum>>,
pub(crate) dependent_relations: HashSet<TableId>,
/// TODO(rc): merge with `dependent_relations`
pub(crate) dependent_udfs: HashSet<FunctionId>,
}

fn gen_bound(
Expand All @@ -194,6 +198,7 @@ fn gen_bound(
param_types: binder.export_param_types()?,
parsed_params: None,
dependent_relations: binder.included_relations().clone(),
dependent_udfs: binder.included_udfs().clone(),
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::optimizer::plan_node::{BatchSource, LogicalScan, StreamSource, Stream
use crate::optimizer::plan_visitor::PlanVisitor;
use crate::PlanRef;

/// TODO(rc): maybe we should rename this to `DependencyCollectorVisitor`.
#[derive(Debug, Clone, Default)]
pub struct RelationCollectorVisitor {
relations: HashSet<TableId>,
Expand Down
11 changes: 7 additions & 4 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicU32, Ordering};
Expand All @@ -25,8 +25,9 @@ use pgwire::pg_response::StatementType;
use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
use pgwire::types::Row;
use risingwave_common::catalog::{
FunctionId, IndexId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME,
FunctionId, IndexId, ObjectId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME,
DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME,
RW_CATALOG_SCHEMA_NAME,
};
use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
use risingwave_common::session_config::SessionConfig;
Expand Down Expand Up @@ -279,6 +280,7 @@ impl CatalogWriter for MockCatalogWriter {
&self,
mut table: PbTable,
_graph: StreamFragmentGraph,
_dependencies: HashSet<ObjectId>,
) -> Result<()> {
table.id = self.gen_id();
table.stream_job_status = PbStreamJobStatus::Created as _;
Expand Down Expand Up @@ -309,7 +311,8 @@ impl CatalogWriter for MockCatalogWriter {
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
}
self.create_materialized_view(table, graph).await?;
self.create_materialized_view(table, graph, HashSet::new())
.await?;
Ok(())
}

Expand Down
6 changes: 4 additions & 2 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
Expand All @@ -26,7 +26,7 @@ use cluster_limit_service_client::ClusterLimitServiceClient;
use either::Either;
use futures::stream::BoxStream;
use lru::LruCache;
use risingwave_common::catalog::{FunctionId, IndexId, SecretId, TableId};
use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, SecretId, TableId};
use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE};
use risingwave_common::hash::WorkerSlotMapping;
use risingwave_common::monitor::EndpointExt;
Expand Down Expand Up @@ -391,11 +391,13 @@ impl MetaClient {
&self,
table: PbTable,
graph: StreamFragmentGraph,
dependencies: HashSet<ObjectId>,
) -> Result<WaitVersion> {
let request = CreateMaterializedViewRequest {
materialized_view: Some(table),
fragment_graph: Some(graph),
backfill: PbBackfillType::Regular as _,
dependencies: dependencies.into_iter().collect(),
};
let resp = self.inner.create_materialized_view(request).await?;
// TODO: handle error in `resp.status` here
Expand Down

0 comments on commit 695953b

Please sign in to comment.