Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(dashboard): only fetch ids and specified fragments for a relation #18272

Merged
merged 10 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions dashboard/lib/api/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,30 @@ import {
} from "../../proto/gen/catalog"
import {
ListObjectDependenciesResponse_ObjectDependencies as ObjectDependencies,
RelationIdInfos,
TableFragments,
} from "../../proto/gen/meta"
import { ColumnCatalog, Field } from "../../proto/gen/plan_common"
import { UserInfo } from "../../proto/gen/user"
import api from "./api"

// NOTE(kwannoel): This can be optimized further, instead of fetching the entire TableFragments struct,
// We can fetch the fields we need from TableFragments, in a truncated struct.
export async function getFragmentsByJobId(
jobId: number
): Promise<TableFragments> {
let route = "/fragments/job_id/" + jobId.toString()
let tableFragments: TableFragments = TableFragments.fromJSON(
await api.get(route)
)
return tableFragments
}

export async function getRelationIdInfos(): Promise<RelationIdInfos> {
let fragmentIds: RelationIdInfos = await api.get("/relation_id_infos")
return fragmentIds
}

export async function getFragments(): Promise<TableFragments[]> {
let fragmentList: TableFragments[] = (await api.get("/fragments2")).map(
TableFragments.fromJSON
Expand Down
71 changes: 40 additions & 31 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ import {
fetchEmbeddedBackPressure,
fetchPrometheusBackPressure,
} from "../lib/api/metric"
import { getFragments, getStreamingJobs } from "../lib/api/streaming"
import {
getFragmentsByJobId,
getRelationIdInfos,
getStreamingJobs,
} from "../lib/api/streaming"
import { FragmentBox } from "../lib/layout"
import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta"
import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan"
Expand Down Expand Up @@ -194,28 +198,33 @@ interface EmbeddedBackPressureInfo {

export default function Streaming() {
const { response: relationList } = useFetch(getStreamingJobs)
const { response: fragmentList } = useFetch(getFragments)
const { response: relationIdInfos } = useFetch(getRelationIdInfos)

const [relationId, setRelationId] = useQueryState("id", parseAsInteger)
const [selectedFragmentId, setSelectedFragmentId] = useState<number>()
const [tableFragments, setTableFragments] = useState<TableFragments>()

const toast = useErrorToast()

useEffect(() => {
if (relationId) {
setTableFragments(undefined)
getFragmentsByJobId(relationId).then((tf) => {
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
setTableFragments(tf)
})
}
}, [relationId])

const fragmentDependencyCallback = useCallback(() => {
if (fragmentList) {
if (relationId) {
const fragments = fragmentList.find((x) => x.tableId === relationId)
if (fragments) {
const fragmentDep = buildFragmentDependencyAsEdges(fragments)
return {
fragments,
fragmentDep,
fragmentDepDag: dagStratify()(fragmentDep),
}
}
if (tableFragments) {
const fragmentDep = buildFragmentDependencyAsEdges(tableFragments)
return {
fragments: tableFragments,
fragmentDep,
fragmentDepDag: dagStratify()(fragmentDep),
}
}
}, [fragmentList, relationId])
}, [tableFragments])

useEffect(() => {
if (relationList) {
Expand Down Expand Up @@ -255,38 +264,38 @@ export default function Streaming() {

const handleSearchFragment = () => {
const searchFragIdInt = parseInt(searchFragId)
if (fragmentList) {
for (const tf of fragmentList) {
for (const fragmentId in tf.fragments) {
if (tf.fragments[fragmentId].fragmentId == searchFragIdInt) {
setRelationId(tf.tableId)
if (relationIdInfos) {
let map = relationIdInfos.map
for (const relationId in map) {
const fragmentIdToRelationId = map[relationId].map
for (const fragmentId in fragmentIdToRelationId) {
if (parseInt(fragmentId) == searchFragIdInt) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: searchFragId can be used in the comparison so that we save parseInt call.

setRelationId(parseInt(relationId))
setSelectedFragmentId(searchFragIdInt)
return
}
}
}
}

toast(new Error(`Fragment ${searchFragIdInt} not found`))
}

const handleSearchActor = () => {
const searchActorIdInt = parseInt(searchActorId)
if (fragmentList) {
for (const tf of fragmentList) {
for (const fragmentId in tf.fragments) {
const fragment = tf.fragments[fragmentId]
for (const actor of fragment.actors) {
if (actor.actorId == searchActorIdInt) {
setRelationId(tf.tableId)
setSelectedFragmentId(fragment.fragmentId)
return
}
if (relationIdInfos) {
let map = relationIdInfos.map
for (const relationId in map) {
const fragmentIdToRelationId = map[relationId].map
for (const fragmentId in fragmentIdToRelationId) {
let actorIds = fragmentIdToRelationId[fragmentId].ids
if (actorIds.includes(searchActorIdInt)) {
setRelationId(parseInt(relationId))
setSelectedFragmentId(parseInt(fragmentId))
return
}
}
}
}

toast(new Error(`Actor ${searchActorIdInt} not found`))
}

Expand Down
17 changes: 17 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -771,3 +771,20 @@ service EventLogService {
rpc ListEventLog(ListEventLogRequest) returns (ListEventLogResponse);
rpc AddEventLog(AddEventLogRequest) returns (AddEventLogResponse);
}

message ActorIds {
repeated uint32 ids = 1;
}

message FragmentIdToActorIdMap {
map<uint32, ActorIds> map = 1;
}
kwannoel marked this conversation as resolved.
Show resolved Hide resolved

/// Provides all the ids: relation_id, fragment_id, actor_id
/// in an hierarchical format.
/// relation_id -> [fragment_id]
/// fragment_id -> [actor_id]
message RelationIdInfos {
// relation_id -> FragmentIdToActorIdMap
map<uint32, FragmentIdToActorIdMap> map = 1;
}
96 changes: 95 additions & 1 deletion src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,22 @@ pub struct DashboardService {
pub type Service = Arc<DashboardService>;

pub(super) mod handlers {
use std::collections::HashMap;

use anyhow::Context;
use axum::Json;
use futures::future::join_all;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::TableId;
use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{PbDatabase, PbSchema, Sink, Source, Subscription, Table, View};
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::PbTableFragments;
use risingwave_pb::meta::{
ActorIds, FragmentIdToActorIdMap, PbTableFragments, RelationIdInfos,
};
use risingwave_pb::monitor_service::{
GetBackPressureResponse, HeapProfilingResponse, ListHeapProfilingResponse,
StackTraceResponse,
Expand Down Expand Up @@ -211,6 +217,92 @@ pub(super) mod handlers {
Ok(Json(table_fragments))
}

pub async fn get_relation_id_infos(
Extension(srv): Extension<Service>,
) -> Result<Json<RelationIdInfos>> {
let map = match &srv.metadata_manager {
MetadataManager::V1(mgr) => {
let core = mgr.fragment_manager.get_fragment_read_guard().await;
let table_fragments = core.table_fragments();
let mut map = HashMap::new();
for (id, tf) in table_fragments {
let mut fragment_id_to_actor_ids = HashMap::new();
for (fragment_id, fragment) in &tf.fragments {
let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
}
map.insert(
id.table_id,
FragmentIdToActorIdMap {
map: fragment_id_to_actor_ids,
},
);
}
map
}
MetadataManager::V2(mgr) => {
let table_fragments = mgr
.catalog_controller
.table_fragments()
.await
.map_err(err)?;
let mut map = HashMap::new();
for (id, tf) in table_fragments {
let mut fragment_id_to_actor_ids = HashMap::new();
for (fragment_id, fragment) in &tf.fragments {
let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
}
map.insert(
id as u32,
FragmentIdToActorIdMap {
map: fragment_id_to_actor_ids,
},
);
}
map
}
};
let relation_id_infos = RelationIdInfos { map };

Ok(Json(relation_id_infos))
}

pub async fn list_fragments_by_job_id(
Extension(srv): Extension<Service>,
Path(job_id): Path<u32>,
) -> Result<Json<PbTableFragments>> {
let table_fragments = match &srv.metadata_manager {
MetadataManager::V1(mgr) => {
if let Some(tf) = mgr
.fragment_manager
.get_fragment_read_guard()
.await
.table_fragments()
.get(&TableId::new(job_id))
{
tf.to_protobuf()
} else {
bail!("job_id {} not found", job_id)
}
}
MetadataManager::V2(mgr) => {
let mut table_fragments = mgr
.catalog_controller
.table_fragments()
.await
.map_err(err)?;
if let Some(tf) = table_fragments.remove(&(job_id as i32)) {
tf
} else {
bail!("job_id {} not found", job_id)
}
}
};

Ok(Json(table_fragments))
}

pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
let users = match &srv.metadata_manager {
MetadataManager::V1(mgr) => mgr.catalog_manager.list_users().await,
Expand Down Expand Up @@ -428,6 +520,8 @@ impl DashboardService {
let api_router = Router::new()
.route("/clusters/:ty", get(list_clusters))
.route("/fragments2", get(list_fragments))
.route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
.route("/relation_id_infos", get(get_relation_id_infos))
.route("/views", get(list_views))
.route("/materialized_views", get(list_materialized_views))
.route("/tables", get(list_tables))
Expand Down
Loading