Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(dashboard): only fetch ids and specified fragments for a relation #18272

Merged
merged 10 commits into from
Aug 28, 2024
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion dashboard/lib/api/streaming.ts
Original file line number Diff line number Diff line change
@@ -27,13 +27,27 @@ import {
View,
} from "../../proto/gen/catalog"
import {
ListObjectDependenciesResponse_ObjectDependencies as ObjectDependencies,
ListObjectDependenciesResponse_ObjectDependencies as ObjectDependencies, RelationIdInfos,
TableFragments,
} from "../../proto/gen/meta"
import { ColumnCatalog, Field } from "../../proto/gen/plan_common"
import { UserInfo } from "../../proto/gen/user"
import api from "./api"

// NOTE(kwannoel): This can be optimized further, instead of fetching the entire TableFragments struct,
// We can fetch the fields we need from TableFragments, in a truncated struct.
export async function getFragmentsByJobId(jobId: number): Promise<TableFragments> {
let route = "/fragments/job_id/" + jobId.toString()
console.log("route: ", route)
let tableFragments: TableFragments = TableFragments.fromJSON(await api.get(route))
return tableFragments
}

export async function getRelationIdInfos(): Promise<RelationIdInfos> {
let fragmentIds: RelationIdInfos = (await api.get("/relation_id_infos"))
return fragmentIds
}

export async function getFragments(): Promise<TableFragments[]> {
let fragmentList: TableFragments[] = (await api.get("/fragments2")).map(
TableFragments.fromJSON
72 changes: 41 additions & 31 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
@@ -45,7 +45,13 @@ import {
fetchEmbeddedBackPressure,
fetchPrometheusBackPressure,
} from "../lib/api/metric"
import { getFragments, getStreamingJobs } from "../lib/api/streaming"
import {
getFragments,
getFragmentIds,
getFragmentsByJobId,
getStreamingJobs,
getRelationIdInfos
} from "../lib/api/streaming"
import { FragmentBox } from "../lib/layout"
import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta"
import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan"
@@ -194,28 +200,32 @@ interface EmbeddedBackPressureInfo {

export default function Streaming() {
const { response: relationList } = useFetch(getStreamingJobs)
const { response: fragmentList } = useFetch(getFragments)
const { response: relationIdInfos } = useFetch(getRelationIdInfos)

const [relationId, setRelationId] = useQueryState("id", parseAsInteger)
const [selectedFragmentId, setSelectedFragmentId] = useState<number>()
const [tableFragments, setTableFragments] = useState<TableFragments>()

const toast = useErrorToast()

useEffect(() => {
if (relationId) {
getFragmentsByJobId(relationId).then(tf => {
setTableFragments(tf)
})
}
}, [relationId])

const fragmentDependencyCallback = useCallback(() => {
if (fragmentList) {
if (relationId) {
const fragments = fragmentList.find((x) => x.tableId === relationId)
if (fragments) {
const fragmentDep = buildFragmentDependencyAsEdges(fragments)
return {
fragments,
fragmentDep,
fragmentDepDag: dagStratify()(fragmentDep),
}
}
if (tableFragments) {
const fragmentDep = buildFragmentDependencyAsEdges(tableFragments)
return {
fragments: tableFragments,
fragmentDep,
fragmentDepDag: dagStratify()(fragmentDep),
}
}
}, [fragmentList, relationId])
}, [tableFragments])

useEffect(() => {
if (relationList) {
@@ -255,38 +265,38 @@ export default function Streaming() {

const handleSearchFragment = () => {
const searchFragIdInt = parseInt(searchFragId)
if (fragmentList) {
for (const tf of fragmentList) {
for (const fragmentId in tf.fragments) {
if (tf.fragments[fragmentId].fragmentId == searchFragIdInt) {
setRelationId(tf.tableId)
if (relationIdInfos) {
let map = relationIdInfos.map
for (const relationId in map) {
const fragmentIdToRelationId = map[relationId].map
for (const fragmentId in fragmentIdToRelationId) {
if (parseInt(fragmentId) == searchFragIdInt) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: searchFragId can be used in the comparison so that we save parseInt call.

setRelationId(parseInt(relationId))
setSelectedFragmentId(searchFragIdInt)
return
}
}
}
}

toast(new Error(`Fragment ${searchFragIdInt} not found`))
}

const handleSearchActor = () => {
const searchActorIdInt = parseInt(searchActorId)
if (fragmentList) {
for (const tf of fragmentList) {
for (const fragmentId in tf.fragments) {
const fragment = tf.fragments[fragmentId]
for (const actor of fragment.actors) {
if (actor.actorId == searchActorIdInt) {
setRelationId(tf.tableId)
setSelectedFragmentId(fragment.fragmentId)
return
}
if (relationIdInfos) {
let map = relationIdInfos.map
for (const relationId in map) {
const fragmentIdToRelationId = map[relationId].map
for (const fragmentId in fragmentIdToRelationId) {
let actorIds = fragmentIdToRelationId[fragmentId].ids
if (actorIds.includes(searchActorIdInt)) {
setRelationId(parseInt(relationId))
setSelectedFragmentId(fragmentId)
return
}
}
}
}

toast(new Error(`Actor ${searchActorIdInt} not found`))
}

17 changes: 17 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
@@ -771,3 +771,20 @@ service EventLogService {
rpc ListEventLog(ListEventLogRequest) returns (ListEventLogResponse);
rpc AddEventLog(AddEventLogRequest) returns (AddEventLogResponse);
}

message ActorIds {
repeated uint32 ids = 1;
}

message FragmentIdToActorIdMap {
map<uint32, ActorIds> map = 1;
}
kwannoel marked this conversation as resolved.
Show resolved Hide resolved

/// Provides all the ids: relation_id, fragment_id, actor_id
/// in an hierarchical format.
/// relation_id -> [fragment_id]
/// fragment_id -> [actor_id]
message RelationIdInfos {
// relation_id -> FragmentIdToActorIdMap
map<uint32, FragmentIdToActorIdMap> map = 1;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide a new proto for it to make it easier to work with on the web frontend side.

96 changes: 95 additions & 1 deletion src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
@@ -49,16 +49,22 @@ pub struct DashboardService {
pub type Service = Arc<DashboardService>;

pub(super) mod handlers {
use std::collections::HashMap;

use anyhow::Context;
use axum::Json;
use futures::future::join_all;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::TableId;
use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{PbDatabase, PbSchema, Sink, Source, Subscription, Table, View};
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::PbTableFragments;
use risingwave_pb::meta::{
ActorIds, FragmentIdToActorIdMap, PbTableFragments, RelationIdInfos,
};
use risingwave_pb::monitor_service::{
GetBackPressureResponse, HeapProfilingResponse, ListHeapProfilingResponse,
StackTraceResponse,
@@ -211,6 +217,92 @@ pub(super) mod handlers {
Ok(Json(table_fragments))
}

pub async fn get_relation_id_infos(
Extension(srv): Extension<Service>,
) -> Result<Json<RelationIdInfos>> {
let map = match &srv.metadata_manager {
MetadataManager::V1(mgr) => {
let core = mgr.fragment_manager.get_fragment_read_guard().await;
let table_fragments = core.table_fragments();
let mut map = HashMap::new();
for (id, tf) in table_fragments {
let mut fragment_id_to_actor_ids = HashMap::new();
for (fragment_id, fragment) in &tf.fragments {
let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
}
map.insert(
id.table_id,
FragmentIdToActorIdMap {
map: fragment_id_to_actor_ids,
},
);
}
map
}
MetadataManager::V2(mgr) => {
let table_fragments = mgr
.catalog_controller
.table_fragments()
.await
.map_err(err)?;
let mut map = HashMap::new();
for (id, tf) in table_fragments {
let mut fragment_id_to_actor_ids = HashMap::new();
for (fragment_id, fragment) in &tf.fragments {
let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
}
map.insert(
id as u32,
FragmentIdToActorIdMap {
map: fragment_id_to_actor_ids,
},
);
}
map
}
};
let relation_id_infos = RelationIdInfos { map };

Ok(Json(relation_id_infos))
}

pub async fn list_fragments_by_job_id(
Extension(srv): Extension<Service>,
Path(job_id): Path<u32>,
) -> Result<Json<PbTableFragments>> {
let table_fragments = match &srv.metadata_manager {
MetadataManager::V1(mgr) => {
if let Some(tf) = mgr
.fragment_manager
.get_fragment_read_guard()
.await
.table_fragments()
.get(&TableId::new(job_id))
{
tf.to_protobuf()
} else {
bail!("job_id {} not found", job_id)
}
}
MetadataManager::V2(mgr) => {
let mut table_fragments = mgr
.catalog_controller
.table_fragments()
.await
.map_err(err)?;
if let Some(tf) = table_fragments.remove(&(job_id as i32)) {
tf
} else {
bail!("job_id {} not found", job_id)
}
}
};

Ok(Json(table_fragments))
}

pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
let users = match &srv.metadata_manager {
MetadataManager::V1(mgr) => mgr.catalog_manager.list_users().await,
@@ -428,6 +520,8 @@ impl DashboardService {
let api_router = Router::new()
.route("/clusters/:ty", get(list_clusters))
.route("/fragments2", get(list_fragments))
.route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
.route("/relation_id_infos", get(get_relation_id_infos))
.route("/views", get(list_views))
.route("/materialized_views", get(list_materialized_views))
.route("/tables", get(list_tables))
Loading