Skip to content

Commit

Permalink
fix(dashboard): fix building streaming dependency graph in sql backend (
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Apr 10, 2024
1 parent fb7fcfe commit 6a401d7
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 9 deletions.
24 changes: 23 additions & 1 deletion dashboard/lib/api/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import _ from "lodash"
import sortBy from "lodash/sortBy"
import { Sink, Source, Table, View } from "../../proto/gen/catalog"
import { TableFragments } from "../../proto/gen/meta"
import {
ListObjectDependenciesResponse_ObjectDependencies as ObjectDependencies,
TableFragments,
} from "../../proto/gen/meta"
import { ColumnCatalog, Field } from "../../proto/gen/plan_common"
import api from "./api"

Expand Down Expand Up @@ -86,6 +89,10 @@ export async function getRelations() {
return relations
}

export async function getRelationDependencies() {
return await getObjectDependencies()
}

async function getTableCatalogsInner(
path: "tables" | "materialized_views" | "indexes" | "internal_tables"
) {
Expand Down Expand Up @@ -127,3 +134,18 @@ export async function getViews() {
views = sortBy(views, (x) => x.id)
return views
}

export async function getObjectDependencies() {
let objDependencies: ObjectDependencies[] = (
await api.get("/object_dependencies")
).map(ObjectDependencies.fromJSON)
const objDependencyGroup = new Map<number, number[]>()
objDependencies.forEach((x) => {
if (!objDependencyGroup.has(x.objectId)) {
objDependencyGroup.set(x.objectId, new Array<number>())
}
objDependencyGroup.get(x.objectId)?.push(x.referencedObjectId)
})

return objDependencyGroup
}
25 changes: 17 additions & 8 deletions dashboard/pages/dependency_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,45 +27,54 @@ import Title from "../components/Title"
import useFetch from "../lib/api/fetch"
import {
Relation,
getRelationDependencies,
getRelations,
relationIsStreamingJob,
} from "../lib/api/streaming"
import { RelationPoint } from "../lib/layout"

const SIDEBAR_WIDTH = "200px"

function buildDependencyAsEdges(list: Relation[]): RelationPoint[] {
function buildDependencyAsEdges(
list: Relation[],
relation_deps: Map<number, number[]>
): RelationPoint[] {
const edges = []
const relationSet = new Set(list.map((r) => r.id))
for (const r of reverse(sortBy(list, "id"))) {
edges.push({
id: r.id.toString(),
name: r.name,
parentIds: relationIsStreamingJob(r)
? r.dependentRelations
.filter((r) => relationSet.has(r))
.map((r) => r.toString())
? relation_deps.has(r.id)
? relation_deps
.get(r.id)
?.filter((r) => relationSet.has(r))
.map((r) => r.toString())
: []
: [],
order: r.id,
width: nodeRadius * 2,
height: nodeRadius * 2,
relation: r,
})
}
return edges
return edges as RelationPoint[]
}

export default function StreamingGraph() {
const { response: relationList } = useFetch(getRelations)
// Since dependentRelations will be deprecated, we need to use getRelationDependencies here to separately obtain the dependency relationship.
const { response: relationDeps } = useFetch(getRelationDependencies)
const [selectedId, setSelectedId] = useQueryState("id", parseAsInteger)

const relationDependencyCallback = useCallback(() => {
if (relationList) {
return buildDependencyAsEdges(relationList)
if (relationList && relationDeps) {
return buildDependencyAsEdges(relationList, relationDeps)
} else {
return undefined
}
}, [relationList])
}, [relationList, relationDeps])

const relationDependency = relationDependencyCallback()

Expand Down
17 changes: 17 additions & 0 deletions src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub(super) mod handlers {
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{Sink, Source, Table, View};
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::PbTableFragments;
use risingwave_pb::monitor_service::{
GetBackPressureResponse, HeapProfilingResponse, ListHeapProfilingResponse,
Expand Down Expand Up @@ -193,6 +194,21 @@ pub(super) mod handlers {
Ok(Json(table_fragments))
}

pub async fn list_object_dependencies(
Extension(srv): Extension<Service>,
) -> Result<Json<Vec<PbObjectDependencies>>> {
let object_dependencies = match &srv.metadata_manager {
MetadataManager::V1(mgr) => mgr.catalog_manager.list_object_dependencies().await,
MetadataManager::V2(mgr) => mgr
.catalog_controller
.list_object_dependencies()
.await
.map_err(err)?,
};

Ok(Json(object_dependencies))
}

async fn dump_await_tree_inner(
worker_nodes: impl IntoIterator<Item = &WorkerNode>,
compute_clients: &ComputeClientPool,
Expand Down Expand Up @@ -378,6 +394,7 @@ impl DashboardService {
.route("/internal_tables", get(list_internal_tables))
.route("/sources", get(list_sources))
.route("/sinks", get(list_sinks))
.route("/object_dependencies", get(list_object_dependencies))
.route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
.route(
"/metrics/fragment/prometheus_back_pressures",
Expand Down

0 comments on commit 6a401d7

Please sign in to comment.