From 612955d9518300ec7ed25f4d57806dde3302314b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 28 Aug 2024 09:28:47 +0000 Subject: [PATCH] perf(dashboard): only fetch ids and specified fragments for a relation (#18272) (#18298) Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com> --- dashboard/lib/api/streaming.ts | 18 ++++++ dashboard/pages/fragment_graph.tsx | 71 ++++++++++++---------- proto/meta.proto | 17 ++++++ src/meta/src/dashboard/mod.rs | 96 +++++++++++++++++++++++++++++- 4 files changed, 170 insertions(+), 32 deletions(-) diff --git a/dashboard/lib/api/streaming.ts b/dashboard/lib/api/streaming.ts index 948cd567d3f2b..95ad89fdf5c58 100644 --- a/dashboard/lib/api/streaming.ts +++ b/dashboard/lib/api/streaming.ts @@ -28,12 +28,30 @@ import { } from "../../proto/gen/catalog" import { ListObjectDependenciesResponse_ObjectDependencies as ObjectDependencies, + RelationIdInfos, TableFragments, } from "../../proto/gen/meta" import { ColumnCatalog, Field } from "../../proto/gen/plan_common" import { UserInfo } from "../../proto/gen/user" import api from "./api" +// NOTE(kwannoel): This can be optimized further, instead of fetching the entire TableFragments struct, +// We can fetch the fields we need from TableFragments, in a truncated struct. +export async function getFragmentsByJobId( + jobId: number +): Promise { + let route = "/fragments/job_id/" + jobId.toString() + let tableFragments: TableFragments = TableFragments.fromJSON( + await api.get(route) + ) + return tableFragments +} + +export async function getRelationIdInfos(): Promise { + let fragmentIds: RelationIdInfos = await api.get("/relation_id_infos") + return fragmentIds +} + export async function getFragments(): Promise { let fragmentList: TableFragments[] = (await api.get("/fragments2")).map( TableFragments.fromJSON diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 1c5f6d7746535..0cf4ccec94f1b 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -45,7 +45,11 @@ import { fetchEmbeddedBackPressure, fetchPrometheusBackPressure, } from "../lib/api/metric" -import { getFragments, getStreamingJobs } from "../lib/api/streaming" +import { + getFragmentsByJobId, + getRelationIdInfos, + getStreamingJobs, +} from "../lib/api/streaming" import { FragmentBox } from "../lib/layout" import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta" import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan" @@ -194,28 +198,33 @@ interface EmbeddedBackPressureInfo { export default function Streaming() { const { response: relationList } = useFetch(getStreamingJobs) - const { response: fragmentList } = useFetch(getFragments) + const { response: relationIdInfos } = useFetch(getRelationIdInfos) const [relationId, setRelationId] = useQueryState("id", parseAsInteger) const [selectedFragmentId, setSelectedFragmentId] = useState() + const [tableFragments, setTableFragments] = useState() const toast = useErrorToast() + useEffect(() => { + if (relationId) { + setTableFragments(undefined) + getFragmentsByJobId(relationId).then((tf) => { + setTableFragments(tf) + }) + } + }, [relationId]) + const fragmentDependencyCallback = useCallback(() => { - if (fragmentList) { - if (relationId) { - const fragments = fragmentList.find((x) => x.tableId === relationId) - if (fragments) { - const fragmentDep = buildFragmentDependencyAsEdges(fragments) - return { - fragments, - fragmentDep, - fragmentDepDag: dagStratify()(fragmentDep), - } - } + if (tableFragments) { + const fragmentDep = buildFragmentDependencyAsEdges(tableFragments) + return { + fragments: tableFragments, + fragmentDep, + fragmentDepDag: dagStratify()(fragmentDep), } } - }, [fragmentList, relationId]) + }, [tableFragments]) useEffect(() => { if (relationList) { @@ -255,38 +264,38 @@ export default function Streaming() { const handleSearchFragment = () => { const searchFragIdInt = parseInt(searchFragId) - if (fragmentList) { - for (const tf of fragmentList) { - for (const fragmentId in tf.fragments) { - if (tf.fragments[fragmentId].fragmentId == searchFragIdInt) { - setRelationId(tf.tableId) + if (relationIdInfos) { + let map = relationIdInfos.map + for (const relationId in map) { + const fragmentIdToRelationId = map[relationId].map + for (const fragmentId in fragmentIdToRelationId) { + if (parseInt(fragmentId) == searchFragIdInt) { + setRelationId(parseInt(relationId)) setSelectedFragmentId(searchFragIdInt) return } } } } - toast(new Error(`Fragment ${searchFragIdInt} not found`)) } const handleSearchActor = () => { const searchActorIdInt = parseInt(searchActorId) - if (fragmentList) { - for (const tf of fragmentList) { - for (const fragmentId in tf.fragments) { - const fragment = tf.fragments[fragmentId] - for (const actor of fragment.actors) { - if (actor.actorId == searchActorIdInt) { - setRelationId(tf.tableId) - setSelectedFragmentId(fragment.fragmentId) - return - } + if (relationIdInfos) { + let map = relationIdInfos.map + for (const relationId in map) { + const fragmentIdToRelationId = map[relationId].map + for (const fragmentId in fragmentIdToRelationId) { + let actorIds = fragmentIdToRelationId[fragmentId].ids + if (actorIds.includes(searchActorIdInt)) { + setRelationId(parseInt(relationId)) + setSelectedFragmentId(parseInt(fragmentId)) + return } } } } - toast(new Error(`Actor ${searchActorIdInt} not found`)) } diff --git a/proto/meta.proto b/proto/meta.proto index 5559440de4498..2c22e884b8007 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -775,3 +775,20 @@ service EventLogService { rpc ListEventLog(ListEventLogRequest) returns (ListEventLogResponse); rpc AddEventLog(AddEventLogRequest) returns (AddEventLogResponse); } + +message ActorIds { + repeated uint32 ids = 1; +} + +message FragmentIdToActorIdMap { + map map = 1; +} + +/// Provides all the ids: relation_id, fragment_id, actor_id +/// in an hierarchical format. +/// relation_id -> [fragment_id] +/// fragment_id -> [actor_id] +message RelationIdInfos { + // relation_id -> FragmentIdToActorIdMap + map map = 1; +} diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index 1229554032614..cfdf2bea0f322 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -49,16 +49,22 @@ pub struct DashboardService { pub type Service = Arc; pub(super) mod handlers { + use std::collections::HashMap; + use anyhow::Context; use axum::Json; use futures::future::join_all; use itertools::Itertools; + use risingwave_common::bail; + use risingwave_common::catalog::TableId; use risingwave_common_heap_profiling::COLLAPSED_SUFFIX; use risingwave_pb::catalog::table::TableType; use risingwave_pb::catalog::{PbDatabase, PbSchema, Sink, Source, Subscription, Table, View}; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; - use risingwave_pb::meta::PbTableFragments; + use risingwave_pb::meta::{ + ActorIds, FragmentIdToActorIdMap, PbTableFragments, RelationIdInfos, + }; use risingwave_pb::monitor_service::{ GetBackPressureResponse, HeapProfilingResponse, ListHeapProfilingResponse, StackTraceResponse, @@ -211,6 +217,92 @@ pub(super) mod handlers { Ok(Json(table_fragments)) } + pub async fn get_relation_id_infos( + Extension(srv): Extension, + ) -> Result> { + let map = match &srv.metadata_manager { + MetadataManager::V1(mgr) => { + let core = mgr.fragment_manager.get_fragment_read_guard().await; + let table_fragments = core.table_fragments(); + let mut map = HashMap::new(); + for (id, tf) in table_fragments { + let mut fragment_id_to_actor_ids = HashMap::new(); + for (fragment_id, fragment) in &tf.fragments { + let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec(); + fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids }); + } + map.insert( + id.table_id, + FragmentIdToActorIdMap { + map: fragment_id_to_actor_ids, + }, + ); + } + map + } + MetadataManager::V2(mgr) => { + let table_fragments = mgr + .catalog_controller + .table_fragments() + .await + .map_err(err)?; + let mut map = HashMap::new(); + for (id, tf) in table_fragments { + let mut fragment_id_to_actor_ids = HashMap::new(); + for (fragment_id, fragment) in &tf.fragments { + let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec(); + fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids }); + } + map.insert( + id as u32, + FragmentIdToActorIdMap { + map: fragment_id_to_actor_ids, + }, + ); + } + map + } + }; + let relation_id_infos = RelationIdInfos { map }; + + Ok(Json(relation_id_infos)) + } + + pub async fn list_fragments_by_job_id( + Extension(srv): Extension, + Path(job_id): Path, + ) -> Result> { + let table_fragments = match &srv.metadata_manager { + MetadataManager::V1(mgr) => { + if let Some(tf) = mgr + .fragment_manager + .get_fragment_read_guard() + .await + .table_fragments() + .get(&TableId::new(job_id)) + { + tf.to_protobuf() + } else { + bail!("job_id {} not found", job_id) + } + } + MetadataManager::V2(mgr) => { + let mut table_fragments = mgr + .catalog_controller + .table_fragments() + .await + .map_err(err)?; + if let Some(tf) = table_fragments.remove(&(job_id as i32)) { + tf + } else { + bail!("job_id {} not found", job_id) + } + } + }; + + Ok(Json(table_fragments)) + } + pub async fn list_users(Extension(srv): Extension) -> Result>> { let users = match &srv.metadata_manager { MetadataManager::V1(mgr) => mgr.catalog_manager.list_users().await, @@ -428,6 +520,8 @@ impl DashboardService { let api_router = Router::new() .route("/clusters/:ty", get(list_clusters)) .route("/fragments2", get(list_fragments)) + .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id)) + .route("/relation_id_infos", get(get_relation_id_infos)) .route("/views", get(list_views)) .route("/materialized_views", get(list_materialized_views)) .route("/tables", get(list_tables))