Skip to content

Commit

Permalink
feat: support rw_depend in sql backend and add sink into dependency i…
Browse files Browse the repository at this point in the history
…nfo (#15524)
  • Loading branch information
yezizp2012 authored Mar 11, 2024
1 parent d34d7af commit 5ea5a95
Show file tree
Hide file tree
Showing 16 changed files with 198 additions and 70 deletions.
32 changes: 16 additions & 16 deletions e2e_test/batch/catalog/pg_class.slt.part
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
query ITIT
SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class ORDER BY oid limit 15;
----
1 columns 1 v
2 tables 1 v
3 views 1 v
4 pg_am 1 v
5 pg_attrdef 1 v
6 pg_attribute 1 v
7 pg_auth_members 1 v
8 pg_cast 1 r
9 pg_class 1 v
10 pg_collation 1 v
11 pg_constraint 1 r
12 pg_conversion 1 v
13 pg_database 1 v
14 pg_depend 1 v
15 pg_description 1 v
2147478647 columns 1 v
2147478648 tables 1 v
2147478649 views 1 v
2147478650 pg_am 1 v
2147478651 pg_attrdef 1 v
2147478652 pg_attribute 1 v
2147478653 pg_auth_members 1 v
2147478654 pg_cast 1 r
2147478655 pg_class 1 v
2147478656 pg_collation 1 v
2147478657 pg_constraint 1 r
2147478658 pg_conversion 1 v
2147478659 pg_database 1 v
2147478660 pg_depend 1 v
2147478661 pg_description 1 v

query ITIT
SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class WHERE oid = 'pg_namespace'::regclass;
----
25 pg_namespace 1 v
2147478671 pg_namespace 1 v
17 changes: 17 additions & 0 deletions e2e_test/batch/catalog/rw_depend.slt.part
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
statement ok
create table t1 (a int);

statement ok
create index idx1 on t1(a);

statement ok
create table t2(a int primary key);

statement ok
create source s1 (a int) with (connector='datagen');

Expand All @@ -13,6 +19,8 @@ create materialized view mv2 as select * from mv1;
statement ok
create sink sink1 from mv2 with (connector='blackhole');

statement ok
create sink sink2 into t2 as select a from t1;

# equivalent to:
# select objid::regclass, refobjid::regclass from rw_depend;
Expand All @@ -22,14 +30,23 @@ from rw_depend d
join rw_relations r1 on d.objid = r1.id
join rw_relations r2 on d.refobjid = r2.id;
----
idx1 t1
mv1 s1
mv1 t1
mv2 mv1
sink1 mv2
sink2 t1
t2 sink2

statement ok
drop sink sink1;

statement ok
drop sink sink2;

statement ok
drop table t2;

statement ok
drop materialized view mv2;

Expand Down
11 changes: 11 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ message ListActorStatesResponse {
repeated ActorState states = 1;
}

message ListObjectDependenciesRequest {}

message ListObjectDependenciesResponse {
message ObjectDependencies {
uint32 object_id = 1;
uint32 referenced_object_id = 2;
}
repeated ObjectDependencies dependencies = 1;
}

enum ThrottleTarget {
THROTTLE_TARGET_UNSPECIFIED = 0;
SOURCE = 1;
Expand All @@ -265,6 +275,7 @@ service StreamManagerService {
rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse);
rpc ListFragmentDistribution(ListFragmentDistributionRequest) returns (ListFragmentDistributionResponse);
rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse);
rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse);
rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse);
}

Expand Down
4 changes: 3 additions & 1 deletion src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;

pub const NON_RESERVED_USER_ID: i32 = 11;
pub const NON_RESERVED_SYS_CATALOG_ID: i32 = 1001;

pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;

pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;

Expand Down
17 changes: 9 additions & 8 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::acl::AclMode;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, Field, SysCatalogReader, TableDesc, TableId, DEFAULT_SUPER_USER_ID,
NON_RESERVED_SYS_CATALOG_ID,
MAX_SYS_CATALOG_NUM, SYS_CATALOG_START_ID,
};
use risingwave_common::error::BoxedError;
use risingwave_common::session_config::ConfigMap;
Expand Down Expand Up @@ -292,7 +292,7 @@ fn get_acl_items(
}

pub struct SystemCatalog {
// table id = index + 1
// table id = index + SYS_CATALOG_START_ID
catalogs: Vec<BuiltinCatalog>,
}

Expand All @@ -303,7 +303,8 @@ pub fn get_sys_tables_in_schema(schema_name: &str) -> Vec<Arc<SystemTableCatalog
.enumerate()
.filter_map(|(idx, c)| match c {
BuiltinCatalog::Table(t) if t.schema == schema_name => Some(Arc::new(
SystemTableCatalog::from(t).with_id((idx as u32 + 1).into()),
SystemTableCatalog::from(t)
.with_id((idx as u32 + SYS_CATALOG_START_ID as u32).into()),
)),
_ => None,
})
Expand All @@ -316,9 +317,9 @@ pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<Arc<ViewCatalog>> {
.iter()
.enumerate()
.filter_map(|(idx, c)| match c {
BuiltinCatalog::View(v) if v.schema == schema_name => {
Some(Arc::new(ViewCatalog::from(v).with_id(idx as u32 + 1)))
}
BuiltinCatalog::View(v) if v.schema == schema_name => Some(Arc::new(
ViewCatalog::from(v).with_id(idx as u32 + SYS_CATALOG_START_ID as u32),
)),
_ => None,
})
.collect()
Expand All @@ -327,7 +328,7 @@ pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<Arc<ViewCatalog>> {
/// The global registry of all builtin catalogs.
pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
assert!(SYS_CATALOGS_SLICE.len() + 1 < NON_RESERVED_SYS_CATALOG_ID as usize);
assert!(SYS_CATALOGS_SLICE.len() <= MAX_SYS_CATALOG_NUM as usize);
let catalogs = SYS_CATALOGS_SLICE
.iter()
.map(|f| f())
Expand All @@ -344,7 +345,7 @@ impl SysCatalogReader for SysCatalogReaderImpl {
async fn read_table(&self, table_id: &TableId) -> Result<DataChunk, BoxedError> {
let table_name = SYS_CATALOGS
.catalogs
.get(table_id.table_id as usize - 1)
.get((table_id.table_id - SYS_CATALOG_START_ID as u32) as usize)
.unwrap();
match table_name {
BuiltinCatalog::Table(t) => (t.function)(self).await,
Expand Down
34 changes: 9 additions & 25 deletions src/frontend/src/catalog/system_catalog/rw_catalog/rw_depend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,13 @@ struct RwDepend {
}

#[system_catalog(table, "rw_catalog.rw_depend")]
fn read_rw_depend(reader: &SysCatalogReaderImpl) -> Result<Vec<RwDepend>> {
let catalog_reader = reader.catalog_reader.read_guard();

let mut depends = vec![];
for schema in catalog_reader.iter_schemas(&reader.auth_context.database)? {
for table in schema.iter_table().chain(schema.iter_mv()) {
for referenced in &table.dependent_relations {
let depend = RwDepend {
objid: table.id.table_id as i32,
refobjid: referenced.table_id as i32,
};
depends.push(depend);
}
}
for sink in schema.iter_sink() {
for referenced in &sink.dependent_relations {
let depend = RwDepend {
objid: sink.id.sink_id as i32,
refobjid: referenced.table_id as i32,
};
depends.push(depend);
}
}
}
Ok(depends)
async fn read_rw_depend(reader: &SysCatalogReaderImpl) -> Result<Vec<RwDepend>> {
let dependencies = reader.meta_client.list_object_dependencies().await?;
Ok(dependencies
.into_iter()
.map(|depend| RwDepend {
objid: depend.object_id as i32,
refobjid: depend.referenced_object_id as i32,
})
.collect())
}
7 changes: 7 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_pb::hummock::{
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::EventLog;
Expand Down Expand Up @@ -62,6 +63,8 @@ pub trait FrontendMetaClient: Send + Sync {

async fn list_actor_states(&self) -> Result<Vec<ActorState>>;

async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;

async fn unpin_snapshot(&self) -> Result<()>;

async fn unpin_snapshot_before(&self, epoch: u64) -> Result<()>;
Expand Down Expand Up @@ -151,6 +154,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.list_actor_states().await
}

async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
self.0.list_object_dependencies().await
}

async fn unpin_snapshot(&self) -> Result<()> {
self.0.unpin_snapshot().await
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ impl WorkerNodeSelector {
if !matches!(e, SchedulerError::ServingVnodeMappingNotFound(_)) {
return Err(e);
}
let max_parallelism = 100;
let max_parallelism = 1;
tracing::warn!(
fragment_id,
max_parallelism,
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use risingwave_pb::hummock::{
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::{EventLog, PbTableParallelism, SystemParams};
Expand Down Expand Up @@ -890,6 +891,10 @@ impl FrontendMetaClient for MockFrontendMetaClient {
Ok(vec![])
}

async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
Ok(vec![])
}

async fn unpin_snapshot(&self) -> RpcResult<()> {
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::time::Duration;

use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::{TableId, NON_RESERVED_SYS_CATALOG_ID};
use risingwave_common::catalog::{TableId, SYS_CATALOG_START_ID};
use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_meta::manager::MetadataManager;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
Expand Down Expand Up @@ -244,7 +244,7 @@ impl HummockManagerService for HummockServiceImpl {
}

// get internal_table_id by metadata_manger
if request.table_id >= NON_RESERVED_SYS_CATALOG_ID as u32 {
if request.table_id < SYS_CATALOG_START_ID as u32 {
// We need to make sure to use the correct table_id to filter sst
let table_id = TableId::new(request.table_id);
if let Ok(table_fragment) = self
Expand All @@ -259,7 +259,7 @@ impl HummockManagerService for HummockServiceImpl {
assert!(option
.internal_table_id
.iter()
.all(|table_id| *table_id >= (NON_RESERVED_SYS_CATALOG_ID as u32)),);
.all(|table_id| *table_id < SYS_CATALOG_START_ID as u32),);

tracing::info!(
"Try trigger_manual_compaction compaction_group_id {} option {:?}",
Expand Down
15 changes: 15 additions & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,4 +395,19 @@ impl StreamManagerService for StreamServiceImpl {

Ok(Response::new(ListActorStatesResponse { states }))
}

#[cfg_attr(coverage, coverage(off))]
async fn list_object_dependencies(
&self,
_request: Request<ListObjectDependenciesRequest>,
) -> Result<Response<ListObjectDependenciesResponse>, Status> {
let dependencies = match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr.catalog_manager.list_object_dependencies().await,
MetadataManager::V2(mgr) => mgr.catalog_controller.list_object_dependencies().await?,
};

Ok(Response::new(ListObjectDependenciesResponse {
dependencies,
}))
}
}
52 changes: 52 additions & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use risingwave_pb::catalog::{
PbSubscription, PbTable, PbView,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::relation::PbRelationInfo;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
Expand Down Expand Up @@ -417,6 +418,57 @@ impl CatalogController {
Ok(tables)
}

pub async fn list_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
let inner = self.inner.read().await;

let dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find()
.select_only()
.columns([
object_dependency::Column::Oid,
object_dependency::Column::UsedBy,
])
.join(
JoinType::InnerJoin,
object_dependency::Relation::Object1.def(),
)
.join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
.filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
.into_tuple()
.all(&inner.db)
.await?;

let mut obj_dependencies = dependencies
.into_iter()
.map(|(oid, used_by)| PbObjectDependencies {
object_id: used_by as _,
referenced_object_id: oid as _,
})
.collect_vec();

let sink_dependencies: Vec<(SinkId, TableId)> = Sink::find()
.select_only()
.columns([sink::Column::SinkId, sink::Column::TargetTable])
.join(JoinType::InnerJoin, sink::Relation::Object.def())
.join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
.filter(
streaming_job::Column::JobStatus
.eq(JobStatus::Created)
.and(sink::Column::TargetTable.is_not_null()),
)
.into_tuple()
.all(&inner.db)
.await?;

obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
PbObjectDependencies {
object_id: table_id as _,
referenced_object_id: sink_id as _,
}
}));

Ok(obj_dependencies)
}

pub async fn has_any_streaming_jobs(&self) -> MetaResult<bool> {
let inner = self.inner.read().await;
let count = streaming_job::Entity::find().count(&inner.db).await?;
Expand Down
Loading

0 comments on commit 5ea5a95

Please sign in to comment.