Skip to content

Commit

Permalink
perf(dashboard): only fetch ids and specified fragments for a relation (
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored and kwannoel committed Aug 28, 2024
1 parent dae4cfe commit ed07542
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 32 deletions.
18 changes: 18 additions & 0 deletions dashboard/lib/api/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,30 @@ import {
} from "../../proto/gen/catalog"
import {
ListObjectDependenciesResponse_ObjectDependencies as ObjectDependencies,
RelationIdInfos,
TableFragments,
} from "../../proto/gen/meta"
import { ColumnCatalog, Field } from "../../proto/gen/plan_common"
import { UserInfo } from "../../proto/gen/user"
import api from "./api"

// NOTE(kwannoel): This can be optimized further, instead of fetching the entire TableFragments struct,
// We can fetch the fields we need from TableFragments, in a truncated struct.
export async function getFragmentsByJobId(
jobId: number
): Promise<TableFragments> {
let route = "/fragments/job_id/" + jobId.toString()
let tableFragments: TableFragments = TableFragments.fromJSON(
await api.get(route)
)
return tableFragments
}

export async function getRelationIdInfos(): Promise<RelationIdInfos> {
let fragmentIds: RelationIdInfos = await api.get("/relation_id_infos")
return fragmentIds
}

export async function getFragments(): Promise<TableFragments[]> {
let fragmentList: TableFragments[] = (await api.get("/fragments2")).map(
TableFragments.fromJSON
Expand Down
71 changes: 40 additions & 31 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ import {
fetchEmbeddedBackPressure,
fetchPrometheusBackPressure,
} from "../lib/api/metric"
import { getFragments, getStreamingJobs } from "../lib/api/streaming"
import {
getFragmentsByJobId,
getRelationIdInfos,
getStreamingJobs,
} from "../lib/api/streaming"
import { FragmentBox } from "../lib/layout"
import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta"
import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan"
Expand Down Expand Up @@ -194,28 +198,33 @@ interface EmbeddedBackPressureInfo {

export default function Streaming() {
const { response: relationList } = useFetch(getStreamingJobs)
const { response: fragmentList } = useFetch(getFragments)
const { response: relationIdInfos } = useFetch(getRelationIdInfos)

const [relationId, setRelationId] = useQueryState("id", parseAsInteger)
const [selectedFragmentId, setSelectedFragmentId] = useState<number>()
const [tableFragments, setTableFragments] = useState<TableFragments>()

const toast = useErrorToast()

useEffect(() => {
if (relationId) {
setTableFragments(undefined)
getFragmentsByJobId(relationId).then((tf) => {
setTableFragments(tf)
})
}
}, [relationId])

const fragmentDependencyCallback = useCallback(() => {
if (fragmentList) {
if (relationId) {
const fragments = fragmentList.find((x) => x.tableId === relationId)
if (fragments) {
const fragmentDep = buildFragmentDependencyAsEdges(fragments)
return {
fragments,
fragmentDep,
fragmentDepDag: dagStratify()(fragmentDep),
}
}
if (tableFragments) {
const fragmentDep = buildFragmentDependencyAsEdges(tableFragments)
return {
fragments: tableFragments,
fragmentDep,
fragmentDepDag: dagStratify()(fragmentDep),
}
}
}, [fragmentList, relationId])
}, [tableFragments])

useEffect(() => {
if (relationList) {
Expand Down Expand Up @@ -255,38 +264,38 @@ export default function Streaming() {

const handleSearchFragment = () => {
const searchFragIdInt = parseInt(searchFragId)
if (fragmentList) {
for (const tf of fragmentList) {
for (const fragmentId in tf.fragments) {
if (tf.fragments[fragmentId].fragmentId == searchFragIdInt) {
setRelationId(tf.tableId)
if (relationIdInfos) {
let map = relationIdInfos.map
for (const relationId in map) {
const fragmentIdToRelationId = map[relationId].map
for (const fragmentId in fragmentIdToRelationId) {
if (parseInt(fragmentId) == searchFragIdInt) {
setRelationId(parseInt(relationId))
setSelectedFragmentId(searchFragIdInt)
return
}
}
}
}

toast(new Error(`Fragment ${searchFragIdInt} not found`))
}

const handleSearchActor = () => {
const searchActorIdInt = parseInt(searchActorId)
if (fragmentList) {
for (const tf of fragmentList) {
for (const fragmentId in tf.fragments) {
const fragment = tf.fragments[fragmentId]
for (const actor of fragment.actors) {
if (actor.actorId == searchActorIdInt) {
setRelationId(tf.tableId)
setSelectedFragmentId(fragment.fragmentId)
return
}
if (relationIdInfos) {
let map = relationIdInfos.map
for (const relationId in map) {
const fragmentIdToRelationId = map[relationId].map
for (const fragmentId in fragmentIdToRelationId) {
let actorIds = fragmentIdToRelationId[fragmentId].ids
if (actorIds.includes(searchActorIdInt)) {
setRelationId(parseInt(relationId))
setSelectedFragmentId(parseInt(fragmentId))
return
}
}
}
}

toast(new Error(`Actor ${searchActorIdInt} not found`))
}

Expand Down
17 changes: 17 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -775,3 +775,20 @@ service EventLogService {
rpc ListEventLog(ListEventLogRequest) returns (ListEventLogResponse);
rpc AddEventLog(AddEventLogRequest) returns (AddEventLogResponse);
}

message ActorIds {
repeated uint32 ids = 1;
}

message FragmentIdToActorIdMap {
map<uint32, ActorIds> map = 1;
}

/// Provides all the ids: relation_id, fragment_id, actor_id
/// in an hierarchical format.
/// relation_id -> [fragment_id]
/// fragment_id -> [actor_id]
message RelationIdInfos {
// relation_id -> FragmentIdToActorIdMap
map<uint32, FragmentIdToActorIdMap> map = 1;
}
96 changes: 95 additions & 1 deletion src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,22 @@ pub struct DashboardService {
pub type Service = Arc<DashboardService>;

pub(super) mod handlers {
use std::collections::HashMap;

use anyhow::Context;
use axum::Json;
use futures::future::join_all;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::TableId;
use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{PbDatabase, PbSchema, Sink, Source, Subscription, Table, View};
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::PbTableFragments;
use risingwave_pb::meta::{
ActorIds, FragmentIdToActorIdMap, PbTableFragments, RelationIdInfos,
};
use risingwave_pb::monitor_service::{
GetBackPressureResponse, HeapProfilingResponse, ListHeapProfilingResponse,
StackTraceResponse,
Expand Down Expand Up @@ -211,6 +217,92 @@ pub(super) mod handlers {
Ok(Json(table_fragments))
}

pub async fn get_relation_id_infos(
Extension(srv): Extension<Service>,
) -> Result<Json<RelationIdInfos>> {
let map = match &srv.metadata_manager {
MetadataManager::V1(mgr) => {
let core = mgr.fragment_manager.get_fragment_read_guard().await;
let table_fragments = core.table_fragments();
let mut map = HashMap::new();
for (id, tf) in table_fragments {
let mut fragment_id_to_actor_ids = HashMap::new();
for (fragment_id, fragment) in &tf.fragments {
let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
}
map.insert(
id.table_id,
FragmentIdToActorIdMap {
map: fragment_id_to_actor_ids,
},
);
}
map
}
MetadataManager::V2(mgr) => {
let table_fragments = mgr
.catalog_controller
.table_fragments()
.await
.map_err(err)?;
let mut map = HashMap::new();
for (id, tf) in table_fragments {
let mut fragment_id_to_actor_ids = HashMap::new();
for (fragment_id, fragment) in &tf.fragments {
let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
}
map.insert(
id as u32,
FragmentIdToActorIdMap {
map: fragment_id_to_actor_ids,
},
);
}
map
}
};
let relation_id_infos = RelationIdInfos { map };

Ok(Json(relation_id_infos))
}

pub async fn list_fragments_by_job_id(
Extension(srv): Extension<Service>,
Path(job_id): Path<u32>,
) -> Result<Json<PbTableFragments>> {
let table_fragments = match &srv.metadata_manager {
MetadataManager::V1(mgr) => {
if let Some(tf) = mgr
.fragment_manager
.get_fragment_read_guard()
.await
.table_fragments()
.get(&TableId::new(job_id))
{
tf.to_protobuf()
} else {
bail!("job_id {} not found", job_id)
}
}
MetadataManager::V2(mgr) => {
let mut table_fragments = mgr
.catalog_controller
.table_fragments()
.await
.map_err(err)?;
if let Some(tf) = table_fragments.remove(&(job_id as i32)) {
tf
} else {
bail!("job_id {} not found", job_id)
}
}
};

Ok(Json(table_fragments))
}

pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
let users = match &srv.metadata_manager {
MetadataManager::V1(mgr) => mgr.catalog_manager.list_users().await,
Expand Down Expand Up @@ -428,6 +520,8 @@ impl DashboardService {
let api_router = Router::new()
.route("/clusters/:ty", get(list_clusters))
.route("/fragments2", get(list_fragments))
.route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
.route("/relation_id_infos", get(get_relation_id_infos))
.route("/views", get(list_views))
.route("/materialized_views", get(list_materialized_views))
.route("/tables", get(list_tables))
Expand Down

0 comments on commit ed07542

Please sign in to comment.