diff --git a/dashboard/lib/api/streaming.ts b/dashboard/lib/api/streaming.ts index 5859c2df7e519..4295e639fb218 100644 --- a/dashboard/lib/api/streaming.ts +++ b/dashboard/lib/api/streaming.ts @@ -18,7 +18,10 @@ import _ from "lodash" import sortBy from "lodash/sortBy" import { Sink, Source, Table, View } from "../../proto/gen/catalog" -import { TableFragments } from "../../proto/gen/meta" +import { + ListObjectDependenciesResponse_ObjectDependencies as ObjectDependencies, + TableFragments, +} from "../../proto/gen/meta" import { ColumnCatalog, Field } from "../../proto/gen/plan_common" import api from "./api" @@ -86,6 +89,10 @@ export async function getRelations() { return relations } +export async function getRelationDependencies() { + return await getObjectDependencies() +} + async function getTableCatalogsInner( path: "tables" | "materialized_views" | "indexes" | "internal_tables" ) { @@ -127,3 +134,18 @@ export async function getViews() { views = sortBy(views, (x) => x.id) return views } + +export async function getObjectDependencies() { + let objDependencies: ObjectDependencies[] = ( + await api.get("/object_dependencies") + ).map(ObjectDependencies.fromJSON) + const objDependencyGroup = new Map() + objDependencies.forEach((x) => { + if (!objDependencyGroup.has(x.objectId)) { + objDependencyGroup.set(x.objectId, new Array()) + } + objDependencyGroup.get(x.objectId)?.push(x.referencedObjectId) + }) + + return objDependencyGroup +} diff --git a/dashboard/pages/dependency_graph.tsx b/dashboard/pages/dependency_graph.tsx index 96aeed23f8696..11d349725cfc1 100644 --- a/dashboard/pages/dependency_graph.tsx +++ b/dashboard/pages/dependency_graph.tsx @@ -27,6 +27,7 @@ import Title from "../components/Title" import useFetch from "../lib/api/fetch" import { Relation, + getRelationDependencies, getRelations, relationIsStreamingJob, } from "../lib/api/streaming" @@ -34,7 +35,10 @@ import { RelationPoint } from "../lib/layout" const SIDEBAR_WIDTH = "200px" -function buildDependencyAsEdges(list: Relation[]): RelationPoint[] { +function buildDependencyAsEdges( + list: Relation[], + relation_deps: Map +): RelationPoint[] { const edges = [] const relationSet = new Set(list.map((r) => r.id)) for (const r of reverse(sortBy(list, "id"))) { @@ -42,9 +46,12 @@ function buildDependencyAsEdges(list: Relation[]): RelationPoint[] { id: r.id.toString(), name: r.name, parentIds: relationIsStreamingJob(r) - ? r.dependentRelations - .filter((r) => relationSet.has(r)) - .map((r) => r.toString()) + ? relation_deps.has(r.id) + ? relation_deps + .get(r.id) + ?.filter((r) => relationSet.has(r)) + .map((r) => r.toString()) + : [] : [], order: r.id, width: nodeRadius * 2, @@ -52,20 +59,22 @@ function buildDependencyAsEdges(list: Relation[]): RelationPoint[] { relation: r, }) } - return edges + return edges as RelationPoint[] } export default function StreamingGraph() { const { response: relationList } = useFetch(getRelations) + // Since dependentRelations will be deprecated, we need to use getRelationDependencies here to separately obtain the dependency relationship. + const { response: relationDeps } = useFetch(getRelationDependencies) const [selectedId, setSelectedId] = useQueryState("id", parseAsInteger) const relationDependencyCallback = useCallback(() => { - if (relationList) { - return buildDependencyAsEdges(relationList) + if (relationList && relationDeps) { + return buildDependencyAsEdges(relationList, relationDeps) } else { return undefined } - }, [relationList]) + }, [relationList, relationDeps]) const relationDependency = relationDependencyCallback() diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index c7766e3fd2dae..6d05666819437 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -56,6 +56,7 @@ pub(super) mod handlers { use risingwave_pb::catalog::table::TableType; use risingwave_pb::catalog::{Sink, Source, Table, View}; use risingwave_pb::common::{WorkerNode, WorkerType}; + use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; use risingwave_pb::meta::PbTableFragments; use risingwave_pb::monitor_service::{ GetBackPressureResponse, HeapProfilingResponse, ListHeapProfilingResponse, @@ -193,6 +194,21 @@ pub(super) mod handlers { Ok(Json(table_fragments)) } + pub async fn list_object_dependencies( + Extension(srv): Extension, + ) -> Result>> { + let object_dependencies = match &srv.metadata_manager { + MetadataManager::V1(mgr) => mgr.catalog_manager.list_object_dependencies().await, + MetadataManager::V2(mgr) => mgr + .catalog_controller + .list_object_dependencies() + .await + .map_err(err)?, + }; + + Ok(Json(object_dependencies)) + } + async fn dump_await_tree_inner( worker_nodes: impl IntoIterator, compute_clients: &ComputeClientPool, @@ -378,6 +394,7 @@ impl DashboardService { .route("/internal_tables", get(list_internal_tables)) .route("/sources", get(list_sources)) .route("/sinks", get(list_sinks)) + .route("/object_dependencies", get(list_object_dependencies)) .route("/metrics/cluster", get(prometheus::list_prometheus_cluster)) .route( "/metrics/fragment/prometheus_back_pressures",