From 240416f43bae35c11d1d8a6d9e081fdd42fcc6bb Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 15 Jan 2024 12:00:17 +0800 Subject: [PATCH 01/43] feat: enable or disable tracing with system params (#14528) Signed-off-by: Bugen Zhao Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- proto/meta.proto | 1 + src/common/src/config.rs | 7 ++- src/common/src/system_param/common.rs | 48 +++++++++++++++++++ src/common/src/system_param/local_manager.rs | 19 +++++++- src/common/src/system_param/mod.rs | 3 ++ src/common/src/system_param/reader.rs | 12 ++++- src/common/src/util/tracing.rs | 2 + src/common/src/util/tracing/layer.rs | 32 +++++++++++++ src/config/example.toml | 1 + src/meta/src/controller/system_param.rs | 11 ++++- src/meta/src/manager/system_param/mod.rs | 11 ++++- src/utils/runtime/src/logger.rs | 50 ++++++++++++++++---- 12 files changed, 180 insertions(+), 17 deletions(-) create mode 100644 src/common/src/system_param/common.rs create mode 100644 src/common/src/util/tracing/layer.rs diff --git a/proto/meta.proto b/proto/meta.proto index 0ce5fc887fe9d..3e41032b33bfa 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -552,6 +552,7 @@ message SystemParams { optional uint32 max_concurrent_creating_streaming_jobs = 12; optional bool pause_on_next_bootstrap = 13; optional string wasm_storage_url = 14; + optional bool enable_tracing = 15; } message GetSystemParamsRequest {} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 8655c5fcbdf6d..78cb7146370de 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -902,6 +902,10 @@ pub struct SystemConfig { #[serde(default = "default::system::wasm_storage_url")] pub wasm_storage_url: Option, + + /// Whether to enable distributed tracing. + #[serde(default = "default::system::enable_tracing")] + pub enable_tracing: Option, } /// The subsections `[storage.object_store]`. @@ -955,8 +959,9 @@ impl SystemConfig { backup_storage_directory: self.backup_storage_directory, max_concurrent_creating_streaming_jobs: self.max_concurrent_creating_streaming_jobs, pause_on_next_bootstrap: self.pause_on_next_bootstrap, - telemetry_enabled: None, // deprecated wasm_storage_url: self.wasm_storage_url, + enable_tracing: self.enable_tracing, + telemetry_enabled: None, // deprecated } } } diff --git a/src/common/src/system_param/common.rs b/src/common/src/system_param/common.rs new file mode 100644 index 0000000000000..b8dcf825e2dda --- /dev/null +++ b/src/common/src/system_param/common.rs @@ -0,0 +1,48 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Mutex; + +use super::reader::SystemParamsReader; +use crate::util::tracing::layer::toggle_otel_layer; + +/// Node-independent handler for system parameter changes. +/// +/// Currently, it is only used to enable or disable the distributed tracing layer. +pub struct CommonHandler { + last_params: Mutex>, +} + +impl CommonHandler { + /// Create a new handler with the initial parameters. + pub fn new(initial: SystemParamsReader) -> Self { + let this = Self { + last_params: None.into(), + }; + this.handle_change(initial); + this + } + + /// Handle the change of system parameters. + // TODO: directly call this method with the difference of old and new params. + pub fn handle_change(&self, new_params: SystemParamsReader) { + let mut last_params = self.last_params.lock().unwrap(); + + if last_params.as_ref().map(|p| p.enable_tracing()) != Some(new_params.enable_tracing()) { + toggle_otel_layer(new_params.enable_tracing()); + } + + last_params.replace(new_params); + } +} diff --git a/src/common/src/system_param/local_manager.rs b/src/common/src/system_param/local_manager.rs index 7103ed6737104..312c5577a0f81 100644 --- a/src/common/src/system_param/local_manager.rs +++ b/src/common/src/system_param/local_manager.rs @@ -19,6 +19,7 @@ use arc_swap::ArcSwap; use risingwave_pb::meta::SystemParams; use tokio::sync::watch::{channel, Receiver, Sender}; +use super::common::CommonHandler; use super::reader::SystemParamsReader; use super::system_params_for_test; @@ -40,9 +41,23 @@ pub struct LocalSystemParamsManager { } impl LocalSystemParamsManager { - pub fn new(params: SystemParamsReader) -> Self { - let params = Arc::new(ArcSwap::from_pointee(params)); + pub fn new(initial_params: SystemParamsReader) -> Self { + let params = Arc::new(ArcSwap::from_pointee(initial_params.clone())); let (tx, _) = channel(params.clone()); + + // Spawn a task to run the common handler. + tokio::spawn({ + let mut rx = tx.subscribe(); + async move { + let handler = CommonHandler::new(initial_params); + + while rx.changed().await.is_ok() { + let new_params = (**rx.borrow_and_update().load()).clone(); + handler.handle_change(new_params); + } + } + }); + Self { params, tx } } diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 366cc61d2dd53..cffa7a4564a5f 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -20,6 +20,7 @@ //! - Add a new entry to `for_all_undeprecated_params` in this file. //! - Add a new method to [`reader::SystemParamsReader`]. +pub mod common; pub mod local_manager; pub mod reader; @@ -56,6 +57,7 @@ macro_rules! for_all_params { { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true }, { pause_on_next_bootstrap, bool, Some(false), true }, { wasm_storage_url, String, Some("fs://.risingwave/data".to_string()), false }, + { enable_tracing, bool, Some(false), true }, } }; } @@ -359,6 +361,7 @@ mod tests { (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"), (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"), (WASM_STORAGE_URL_KEY, "a"), + (ENABLE_TRACING_KEY, "true"), ("a_deprecated_param", "foo"), ]; diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 0059974203c6d..24ecd83f5f061 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -14,7 +14,7 @@ use risingwave_pb::meta::PbSystemParams; -use super::system_params_to_kv; +use super::{default, system_params_to_kv}; /// A wrapper for [`risingwave_pb::meta::SystemParams`] for 2 purposes: /// - Avoid misuse of deprecated fields by hiding their getters. @@ -77,7 +77,15 @@ impl SystemParamsReader { } pub fn pause_on_next_bootstrap(&self) -> bool { - self.prost.pause_on_next_bootstrap.unwrap_or(false) + self.prost + .pause_on_next_bootstrap + .unwrap_or_else(|| default::pause_on_next_bootstrap().unwrap()) + } + + pub fn enable_tracing(&self) -> bool { + self.prost + .enable_tracing + .unwrap_or_else(|| default::enable_tracing().unwrap()) } pub fn wasm_storage_url(&self) -> &str { diff --git a/src/common/src/util/tracing.rs b/src/common/src/util/tracing.rs index f87a5efd4baef..e7da6e8e7d580 100644 --- a/src/common/src/util/tracing.rs +++ b/src/common/src/util/tracing.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod layer; + use std::collections::HashMap; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/src/common/src/util/tracing/layer.rs b/src/common/src/util/tracing/layer.rs new file mode 100644 index 0000000000000..a5268a55dc90e --- /dev/null +++ b/src/common/src/util/tracing/layer.rs @@ -0,0 +1,32 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::OnceLock; + +static TOGGLE_OTEL_LAYER: OnceLock> = OnceLock::new(); + +/// Set the function to toggle the opentelemetry tracing layer. Panics if called twice. +pub fn set_toggle_otel_layer_fn(f: impl Fn(bool) + Sync + Send + 'static) { + TOGGLE_OTEL_LAYER + .set(Box::new(f)) + .ok() + .expect("toggle otel layer fn set twice"); +} + +/// Toggle the opentelemetry tracing layer. +pub fn toggle_otel_layer(enabled: bool) { + if let Some(f) = TOGGLE_OTEL_LAYER.get() { + f(enabled); + } +} diff --git a/src/config/example.toml b/src/config/example.toml index 15738219ae08c..b2eef323c2d00 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -189,3 +189,4 @@ backup_storage_directory = "backup" max_concurrent_creating_streaming_jobs = 1 pause_on_next_bootstrap = false wasm_storage_url = "fs://.risingwave/data" +enable_tracing = false diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index c37ce5a626f13..fbcce97a97d9b 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; +use risingwave_common::system_param::common::CommonHandler; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::system_param::{ check_missing_params, derive_missing_fields, set_system_param, @@ -44,6 +45,8 @@ pub struct SystemParamsController { notification_manager: NotificationManagerRef, // Cached parameters. params: RwLock, + /// Common handler for system params. + common_handler: CommonHandler, } /// Derive system params from db models. @@ -146,7 +149,8 @@ impl SystemParamsController { let ctl = Self { db, notification_manager, - params: RwLock::new(params), + params: RwLock::new(params.clone()), + common_handler: CommonHandler::new(params.into()), }; // flush to db. ctl.flush_params().await?; @@ -196,6 +200,11 @@ impl SystemParamsController { param.update(&self.db).await?; *params_guard = params.clone(); + // TODO: check if the parameter is actually changed. + + // Run common handler. + self.common_handler.handle_change(params.clone().into()); + // Sync params to other managers on the meta node only once, since it's infallible. self.notification_manager .notify_local_subscribers(LocalNotification::SystemParamsChange(params.clone().into())) diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index 9d5e574efa8b6..14d0e311a2d89 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; +use risingwave_common::system_param::common::CommonHandler; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::system_param::{check_missing_params, set_system_param}; use risingwave_common::{for_all_params, key_of}; @@ -43,6 +44,8 @@ pub struct SystemParamsManager { notification_manager: NotificationManagerRef, // Cached parameters. params: RwLock, + /// Common handler for system params. + common_handler: CommonHandler, } impl SystemParamsManager { @@ -69,7 +72,8 @@ impl SystemParamsManager { Ok(Self { meta_store, notification_manager, - params: RwLock::new(params), + params: RwLock::new(params.clone()), + common_handler: CommonHandler::new(params.into()), }) } @@ -94,6 +98,11 @@ impl SystemParamsManager { mem_txn.commit(); + // TODO: check if the parameter is actually changed. + + // Run common handler. + self.common_handler.handle_change(params.clone().into()); + // Sync params to other managers on the meta node only once, since it's infallible. self.notification_manager .notify_local_subscribers(super::LocalNotification::SystemParamsChange( diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index e636c3a72de51..cb27840d7530f 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -18,8 +18,8 @@ use std::path::PathBuf; use either::Either; use risingwave_common::metrics::MetricsLayer; use risingwave_common::util::deployment::Deployment; -use risingwave_common::util::env_var::env_var_is_true; use risingwave_common::util::query_log::*; +use risingwave_common::util::tracing::layer::set_toggle_otel_layer_fn; use thiserror_ext::AsReport; use tracing::level_filters::LevelFilter as Level; use tracing_subscriber::filter::{FilterFn, Targets}; @@ -28,7 +28,7 @@ use tracing_subscriber::fmt::time::OffsetTime; use tracing_subscriber::fmt::FormatFields; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::prelude::*; -use tracing_subscriber::{filter, EnvFilter}; +use tracing_subscriber::{filter, reload, EnvFilter}; pub struct LoggerSettings { /// The name of the service. Used to identify the service in distributed tracing. @@ -60,15 +60,12 @@ impl LoggerSettings { /// /// If env var `RW_TRACING_ENDPOINT` is not set, the meta address will be used /// as the default tracing endpoint, which means that the embedded tracing - /// collector will be used. This can be disabled by setting env var - /// `RW_DISABLE_EMBEDDED_TRACING` to `true`. + /// collector will be used. pub fn from_opts(opts: &O) -> Self { let mut settings = Self::new(O::name()); if settings.tracing_endpoint.is_none() // no explicit endpoint - && !env_var_is_true("RW_DISABLE_EMBEDDED_TRACING") // not disabled by env var - && let Some(addr) = opts.meta_addr().exactly_one() // meta address is valid - && !Deployment::current().is_ci() - // not in CI + && let Some(addr) = opts.meta_addr().exactly_one() + // meta address is valid { // Use embedded collector in the meta service. // TODO: when there's multiple meta nodes for high availability, we may send @@ -133,6 +130,11 @@ impl LoggerSettings { } } +/// Create a filter that disables all events or spans. +fn disabled_filter() -> filter::Targets { + filter::Targets::new() +} + /// Init logger for RisingWave binaries. /// /// ## Environment variables to configure logger dynamically @@ -388,7 +390,7 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { // Tracing layer #[cfg(not(madsim))] if let Some(endpoint) = settings.tracing_endpoint { - println!("tracing enabled, exported to `{endpoint}`"); + println!("opentelemetry tracing will be exported to `{endpoint}` if enabled"); use opentelemetry::{sdk, KeyValue}; use opentelemetry_otlp::WithExportConfig; @@ -437,9 +439,37 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { .unwrap() }; + // Disable by filtering out all events or spans by default. + // + // It'll be enabled with `toggle_otel_layer` based on the system parameter `enable_tracing` later. + let (reload_filter, reload_handle) = reload::Layer::new(disabled_filter()); + + set_toggle_otel_layer_fn(move |enabled: bool| { + let result = reload_handle.modify(|f| { + *f = if enabled { + default_filter.clone() + } else { + disabled_filter() + } + }); + + match result { + Ok(_) => tracing::info!( + "opentelemetry tracing {}", + if enabled { "enabled" } else { "disabled" }, + ), + + Err(error) => tracing::error!( + error = %error.as_report(), + "failed to {} opentelemetry tracing", + if enabled { "enable" } else { "disable" }, + ), + } + }); + let layer = tracing_opentelemetry::layer() .with_tracer(otel_tracer) - .with_filter(default_filter); + .with_filter(reload_filter); layers.push(layer.boxed()); } From d65c151c50e88220b6bf2d9ed01411d8da6175ec Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 15 Jan 2024 12:11:34 +0800 Subject: [PATCH 02/43] feat(dashboard): improvements on the relation dep graph (#14505) Signed-off-by: Bugen Zhao --- dashboard/components/CatalogModal.tsx | 87 +++++++++++ .../components/FragmentDependencyGraph.tsx | 84 +++++----- dashboard/components/FragmentGraph.tsx | 13 +- .../components/RelationDependencyGraph.tsx | 128 +++++++++++----- dashboard/components/Relations.tsx | 49 +----- dashboard/lib/layout.ts | 145 ++++++++---------- dashboard/pages/api/streaming.ts | 20 ++- dashboard/pages/dependency_graph.tsx | 32 ++-- dashboard/pages/fragment_graph.tsx | 4 +- 9 files changed, 341 insertions(+), 221 deletions(-) create mode 100644 dashboard/components/CatalogModal.tsx diff --git a/dashboard/components/CatalogModal.tsx b/dashboard/components/CatalogModal.tsx new file mode 100644 index 0000000000000..cf6a2f8cc9e0d --- /dev/null +++ b/dashboard/components/CatalogModal.tsx @@ -0,0 +1,87 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { + Button, + Modal, + ModalBody, + ModalCloseButton, + ModalContent, + ModalFooter, + ModalHeader, + ModalOverlay, +} from "@chakra-ui/react" + +import Link from "next/link" +import { parseAsInteger, useQueryState } from "nuqs" +import { + Relation, + relationIsStreamingJob, + relationTypeTitleCase, +} from "../pages/api/streaming" +import { ReactJson } from "./Relations" + +export function useCatalogModal(relationList: Relation[] | undefined) { + const [modalId, setModalId] = useQueryState("modalId", parseAsInteger) + const modalData = relationList?.find((r) => r.id === modalId) + + return [modalData, setModalId] as const +} + +export function CatalogModal({ + modalData, + onClose, +}: { + modalData: Relation | undefined + onClose: () => void +}) { + return ( + + + + + Catalog of {modalData && relationTypeTitleCase(modalData)}{" "} + {modalData?.id} - {modalData?.name} + + + + {modalData && ( + + )} + + + + {modalData && relationIsStreamingJob(modalData) && ( + + )} + + + + + ) +} diff --git a/dashboard/components/FragmentDependencyGraph.tsx b/dashboard/components/FragmentDependencyGraph.tsx index 553c40ec53f92..7b3417507efcf 100644 --- a/dashboard/components/FragmentDependencyGraph.tsx +++ b/dashboard/components/FragmentDependencyGraph.tsx @@ -3,18 +3,18 @@ import * as d3 from "d3" import { Dag, DagLink, DagNode, zherebko } from "d3-dag" import { cloneDeep } from "lodash" import { useCallback, useEffect, useRef, useState } from "react" -import { Position } from "../lib/layout" +import { Enter, FragmentBox, Position } from "../lib/layout" const nodeRadius = 5 const edgeRadius = 12 export default function FragmentDependencyGraph({ - mvDependency, + fragmentDependency, svgWidth, selectedId, onSelectedIdChange, }: { - mvDependency: Dag + fragmentDependency: Dag svgWidth: number selectedId: string | undefined onSelectedIdChange: (id: string) => void | undefined @@ -24,21 +24,21 @@ export default function FragmentDependencyGraph({ const MARGIN_X = 10 const MARGIN_Y = 2 - const mvDependencyDagCallback = useCallback(() => { + const fragmentDependencyDagCallback = useCallback(() => { const layout = zherebko().nodeSize([ nodeRadius * 2, (nodeRadius + edgeRadius) * 2, nodeRadius, ]) - const dag = cloneDeep(mvDependency) + const dag = cloneDeep(fragmentDependency) const { width, height } = layout(dag) return { width, height, dag } - }, [mvDependency]) + }, [fragmentDependency]) - const mvDependencyDag = mvDependencyDagCallback() + const fragmentDependencyDag = fragmentDependencyDagCallback() useEffect(() => { - const { width, height, dag } = mvDependencyDag + const { width, height, dag } = fragmentDependencyDag // This code only handles rendering @@ -53,25 +53,27 @@ export default function FragmentDependencyGraph({ .x(({ x }) => x + MARGIN_X) .y(({ y }) => y) - const isSelected = (d: any) => d.data.id === selectedId + const isSelected = (d: DagNode) => d.data.id === selectedId const edgeSelection = svgSelection .select(".edges") - .selectAll(".edge") + .selectAll(".edge") .data(dag.links()) - const applyEdge = (sel: any) => + type EdgeSelection = typeof edgeSelection + + const applyEdge = (sel: EdgeSelection) => sel .attr("d", ({ points }: DagLink) => line(points)) .attr("fill", "none") - .attr("stroke-width", (d: any) => + .attr("stroke-width", (d) => isSelected(d.source) || isSelected(d.target) ? 2 : 1 ) - .attr("stroke", (d: any) => + .attr("stroke", (d) => isSelected(d.source) || isSelected(d.target) ? theme.colors.blue["500"] : theme.colors.gray["300"] ) - const createEdge = (sel: any) => + const createEdge = (sel: Enter) => sel.append("path").attr("class", "edge").call(applyEdge) edgeSelection.exit().remove() edgeSelection.enter().call(createEdge) @@ -80,19 +82,18 @@ export default function FragmentDependencyGraph({ // Select nodes const nodeSelection = svgSelection .select(".nodes") - .selectAll(".node") + .selectAll(".node") .data(dag.descendants()) - const applyNode = (sel: any) => + type NodeSelection = typeof nodeSelection + + const applyNode = (sel: NodeSelection) => sel - .attr( - "transform", - ({ x, y }: Position) => `translate(${x + MARGIN_X}, ${y})` - ) - .attr("fill", (d: any) => + .attr("transform", (d) => `translate(${d.x! + MARGIN_X}, ${d.y})`) + .attr("fill", (d) => isSelected(d) ? theme.colors.blue["500"] : theme.colors.gray["500"] ) - const createNode = (sel: any) => + const createNode = (sel: Enter) => sel .append("circle") .attr("class", "node") @@ -105,22 +106,23 @@ export default function FragmentDependencyGraph({ // Add text to nodes const labelSelection = svgSelection .select(".labels") - .selectAll(".label") + .selectAll(".label") .data(dag.descendants()) + type LabelSelection = typeof labelSelection - const applyLabel = (sel: any) => + const applyLabel = (sel: LabelSelection) => sel - .text((d: any) => d.data.name) + .text((d) => d.data.name) .attr("x", svgWidth - MARGIN_X) .attr("font-family", "inherit") .attr("text-anchor", "end") .attr("alignment-baseline", "middle") - .attr("y", (d: any) => d.y) - .attr("fill", (d: any) => + .attr("y", (d) => d.y!) + .attr("fill", (d) => isSelected(d) ? theme.colors.black["500"] : theme.colors.gray["500"] ) .attr("font-weight", "600") - const createLabel = (sel: any) => + const createLabel = (sel: Enter) => sel.append("text").attr("class", "label").call(applyLabel) labelSelection.exit().remove() labelSelection.enter().call(createLabel) @@ -129,11 +131,12 @@ export default function FragmentDependencyGraph({ // Add overlays const overlaySelection = svgSelection .select(".overlays") - .selectAll(".overlay") + .selectAll(".overlay") .data(dag.descendants()) + type OverlaySelection = typeof overlaySelection const STROKE_WIDTH = 3 - const applyOverlay = (sel: any) => + const applyOverlay = (sel: OverlaySelection) => sel .attr("x", STROKE_WIDTH) .attr( @@ -143,20 +146,13 @@ export default function FragmentDependencyGraph({ .attr("width", svgWidth - STROKE_WIDTH * 2) .attr( "y", - (d: any) => d.y - nodeRadius - edgeRadius + MARGIN_Y + STROKE_WIDTH + (d) => d.y! - nodeRadius - edgeRadius + MARGIN_Y + STROKE_WIDTH ) .attr("rx", 5) .attr("fill", theme.colors.gray["500"]) .attr("opacity", 0) .style("cursor", "pointer") - const createOverlay = ( - sel: d3.Selection< - d3.EnterElement, - DagNode, - d3.BaseType, - unknown - > - ) => + const createOverlay = (sel: Enter) => sel .append("rect") .attr("class", "overlay") @@ -187,7 +183,7 @@ export default function FragmentDependencyGraph({ }) .on("click", function (d, i) { if (onSelectedIdChange) { - onSelectedIdChange((i.data as any).id) + onSelectedIdChange(i.data.id) } }) @@ -196,7 +192,13 @@ export default function FragmentDependencyGraph({ overlaySelection.call(applyOverlay) setSvgHeight(`${height}px`) - }, [mvDependency, selectedId, svgWidth, onSelectedIdChange, mvDependencyDag]) + }, [ + fragmentDependency, + selectedId, + svgWidth, + onSelectedIdChange, + fragmentDependencyDag, + ]) return ( diff --git a/dashboard/components/FragmentGraph.tsx b/dashboard/components/FragmentGraph.tsx index 875d92baa2b6b..72184d1b2a8bc 100644 --- a/dashboard/components/FragmentGraph.tsx +++ b/dashboard/components/FragmentGraph.tsx @@ -17,11 +17,12 @@ import { cloneDeep } from "lodash" import { Fragment, useCallback, useEffect, useRef, useState } from "react" import { Edge, + Enter, FragmentBox, FragmentBoxPosition, Position, - generateBoxEdges, - layout, + generateFragmentEdges, + layoutItem, } from "../lib/layout" import { PlanNodeDatum } from "../pages/fragment_graph" import { StreamNode } from "../proto/gen/stream_plan" @@ -36,10 +37,6 @@ type FragmentLayout = { actorIds: string[] } & Position -type Enter = Type extends d3.Selection - ? d3.Selection - : never - function treeLayoutFlip( root: d3.HierarchyNode, { dx, dy }: { dx: number; dy: number } @@ -145,7 +142,7 @@ export default function FragmentGraph({ includedFragmentIds.add(fragmentId) } - const fragmentLayout = layout( + const fragmentLayout = layoutItem( fragmentDependencyDag.map(({ width: _1, height: _2, id, ...data }) => { const { width, height } = layoutFragmentResult.get(id)! return { width, height, id, ...data } @@ -170,7 +167,7 @@ export default function FragmentGraph({ svgHeight = Math.max(svgHeight, y + height + 50) svgWidth = Math.max(svgWidth, x + width) }) - const edges = generateBoxEdges(fragmentLayout) + const edges = generateFragmentEdges(fragmentLayout) return { layoutResult, diff --git a/dashboard/components/RelationDependencyGraph.tsx b/dashboard/components/RelationDependencyGraph.tsx index 99d40ca2615fd..0f677101cce17 100644 --- a/dashboard/components/RelationDependencyGraph.tsx +++ b/dashboard/components/RelationDependencyGraph.tsx @@ -19,15 +19,23 @@ import { theme } from "@chakra-ui/react" import * as d3 from "d3" import { useCallback, useEffect, useRef } from "react" import { - FragmentPoint, - FragmentPointPosition, + Enter, Position, - flipLayoutPoint, - generatePointEdges, + RelationPoint, + RelationPointPosition, + flipLayoutRelation, + generateRelationEdges, } from "../lib/layout" +import { + Relation, + relationIsStreamingJob, + relationType, + relationTypeTitleCase, +} from "../pages/api/streaming" +import { CatalogModal, useCatalogModal } from "./CatalogModal" function boundBox( - fragmentPosition: FragmentPointPosition[], + relationPosition: RelationPointPosition[], nodeRadius: number ): { width: number @@ -35,7 +43,7 @@ function boundBox( } { let width = 0 let height = 0 - for (const { x, y } of fragmentPosition) { + for (const { x, y } of relationPosition) { width = Math.max(width, x + nodeRadius) height = Math.max(height, y + nodeRadius) } @@ -43,21 +51,25 @@ function boundBox( } const layerMargin = 50 -const rowMargin = 200 -const nodeRadius = 10 -const layoutMargin = 100 +const rowMargin = 50 +export const nodeRadius = 12 +const layoutMargin = 50 export default function RelationDependencyGraph({ nodes, selectedId, + setSelectedId, }: { - nodes: FragmentPoint[] - selectedId?: string + nodes: RelationPoint[] + selectedId: string | undefined + setSelectedId: (id: string) => void }) { - const svgRef = useRef() + const [modalData, setModalId] = useCatalogModal(nodes.map((n) => n.relation)) + + const svgRef = useRef(null) const layoutMapCallback = useCallback(() => { - const layoutMap = flipLayoutPoint( + const layoutMap = flipLayoutRelation( nodes, layerMargin, rowMargin, @@ -68,9 +80,9 @@ export default function RelationDependencyGraph({ x: x + layoutMargin, y: y + layoutMargin, ...data, - } as FragmentPointPosition) + } as RelationPointPosition) ) - const links = generatePointEdges(layoutMap) + const links = generateRelationEdges(layoutMap) const { width, height } = boundBox(layoutMap, nodeRadius) return { layoutMap, @@ -96,29 +108,30 @@ export default function RelationDependencyGraph({ const edgeSelection = svgSelection .select(".edges") - .selectAll(".edge") + .selectAll(".edge") .data(links) + type EdgeSelection = typeof edgeSelection const isSelected = (id: string) => id === selectedId - const applyEdge = (sel: any) => + const applyEdge = (sel: EdgeSelection) => sel - .attr("d", ({ points }: any) => line(points)) + .attr("d", ({ points }) => line(points)) .attr("fill", "none") .attr("stroke-width", 1) - .attr("stroke-width", (d: any) => - isSelected(d.source) || isSelected(d.target) ? 2 : 1 + .attr("stroke-width", (d) => + isSelected(d.source) || isSelected(d.target) ? 4 : 2 ) - .attr("opacity", (d: any) => + .attr("opacity", (d) => isSelected(d.source) || isSelected(d.target) ? 1 : 0.5 ) - .attr("stroke", (d: any) => + .attr("stroke", (d) => isSelected(d.source) || isSelected(d.target) ? theme.colors.blue["500"] : theme.colors.gray["300"] ) - const createEdge = (sel: any) => + const createEdge = (sel: Enter) => sel.append("path").attr("class", "edge").call(applyEdge) edgeSelection.exit().remove() edgeSelection.enter().call(createEdge) @@ -127,21 +140,23 @@ export default function RelationDependencyGraph({ const applyNode = (g: NodeSelection) => { g.attr("transform", ({ x, y }) => `translate(${x},${y})`) + // Circle let circle = g.select("circle") if (circle.empty()) { circle = g.append("circle") } - circle - .attr("r", nodeRadius) - .style("cursor", "pointer") - .attr("fill", ({ id }) => - isSelected(id) ? theme.colors.blue["500"] : theme.colors.gray["500"] - ) + circle.attr("r", nodeRadius).attr("fill", ({ id, relation }) => { + const weight = relationIsStreamingJob(relation) ? "500" : "400" + return isSelected(id) + ? theme.colors.blue[weight] + : theme.colors.gray[weight] + }) - let text = g.select("text") + // Relation name + let text = g.select(".text") if (text.empty()) { - text = g.append("text") + text = g.append("text").attr("class", "text") } text @@ -150,24 +165,66 @@ export default function RelationDependencyGraph({ .attr("font-family", "inherit") .attr("text-anchor", "middle") .attr("dy", nodeRadius * 2) - .attr("fill", "black") .attr("font-size", 12) .attr("transform", "rotate(-8)") + // Relation type + let typeText = g.select(".type") + if (typeText.empty()) { + typeText = g.append("text").attr("class", "type") + } + + const relationTypeAbbr = (relation: Relation) => { + const type = relationType(relation) + if (type === "SINK") { + return "K" + } else { + return type.charAt(0) + } + } + + typeText + .attr("fill", "white") + .text(({ relation }) => `${relationTypeAbbr(relation)}`) + .attr("font-family", "inherit") + .attr("text-anchor", "middle") + .attr("dy", nodeRadius * 0.5) + .attr("font-size", 16) + .attr("font-weight", "bold") + + // Relation type tooltip + let typeTooltip = g.select("title") + if (typeTooltip.empty()) { + typeTooltip = g.append("title") + } + + typeTooltip.text( + ({ relation }) => + `${relation.name} (${relationTypeTitleCase(relation)})` + ) + + // Relation modal + g.style("cursor", "pointer").on("click", (_, { relation, id }) => { + setSelectedId(id) + setModalId(relation.id) + }) + return g } - const createNode = (sel: any) => + const createNode = (sel: Enter) => sel.append("g").attr("class", "node").call(applyNode) const g = svgSelection.select(".boxes") - const nodeSelection = g.selectAll(".node").data(layoutMap) + const nodeSelection = g + .selectAll(".node") + .data(layoutMap) type NodeSelection = typeof nodeSelection nodeSelection.enter().call(createNode) nodeSelection.call(applyNode) nodeSelection.exit().remove() - }, [layoutMap, links, selectedId]) + }, [layoutMap, links, selectedId, setModalId, setSelectedId]) return ( <> @@ -175,6 +232,7 @@ export default function RelationDependencyGraph({ + setModalId(null)} /> ) } diff --git a/dashboard/components/Relations.tsx b/dashboard/components/Relations.tsx index c16a70e8c6fa2..0422eaa2531fa 100644 --- a/dashboard/components/Relations.tsx +++ b/dashboard/components/Relations.tsx @@ -18,13 +18,6 @@ import { Box, Button, - Modal, - ModalBody, - ModalCloseButton, - ModalContent, - ModalFooter, - ModalHeader, - ModalOverlay, Table, TableContainer, Tbody, @@ -37,7 +30,6 @@ import loadable from "@loadable/component" import Head from "next/head" import Link from "next/link" -import { parseAsInteger, useQueryState } from "nuqs" import { Fragment } from "react" import Title from "../components/Title" import extractColumnInfo from "../lib/extractInfo" @@ -48,8 +40,9 @@ import { Source as RwSource, Table as RwTable, } from "../proto/gen/catalog" +import { CatalogModal, useCatalogModal } from "./CatalogModal" -const ReactJson = loadable(() => import("react-json-view")) +export const ReactJson = loadable(() => import("react-json-view")) export type Column = { name: string @@ -122,40 +115,10 @@ export function Relations( extraColumns: Column[] ) { const { response: relationList } = useFetch(getRelations) + const [modalData, setModalId] = useCatalogModal(relationList) - const [modalId, setModalId] = useQueryState("id", parseAsInteger) - const modalData = relationList?.find((r) => r.id === modalId) - - const catalogModal = ( - setModalId(null)} - size="3xl" - > - - - - Catalog of {modalData?.id} - {modalData?.name} - - - - {modalData && ( - - )} - - - - - - - + const modal = ( + setModalId(null)} /> ) const table = ( @@ -214,7 +177,7 @@ export function Relations( {title} - {catalogModal} + {modal} {table} ) diff --git a/dashboard/lib/layout.ts b/dashboard/lib/layout.ts index 1182976dfe8cb..924374341daa8 100644 --- a/dashboard/lib/layout.ts +++ b/dashboard/lib/layout.ts @@ -15,10 +15,20 @@ * */ -import { cloneDeep, max } from "lodash" +import { max } from "lodash" +import { Relation } from "../pages/api/streaming" import { TableFragments_Fragment } from "../proto/gen/meta" import { GraphNode } from "./algo" +export type Enter = Type extends d3.Selection< + any, + infer B, + infer C, + infer D +> + ? d3.Selection + : never + interface DagNode { node: GraphNode temp: boolean @@ -210,16 +220,16 @@ function dagLayout(nodes: GraphNode[]) { } /** - * @param fragments - * @returns Layer and row of the fragment + * @param items + * @returns Layer and row of the item */ -function gridLayout( - fragments: Array -): Map { - // turn FragmentBox to GraphNode - let idToBox = new Map() - for (let fragment of fragments) { - idToBox.set(fragment.id, fragment) +function gridLayout( + items: Array +): Map { + // turn item to GraphNode + let idToItem = new Map() + for (let item of items) { + idToItem.set(item.id, item) } let nodeToId = new Map() @@ -232,23 +242,23 @@ function gridLayout( let newNode = { nextNodes: new Array(), } - let ab = idToBox.get(id) - if (ab === undefined) { + let item = idToItem.get(id) + if (item === undefined) { throw Error(`no such id ${id}`) } - for (let id of ab.parentIds) { + for (let id of item.parentIds) { getNode(id).nextNodes.push(newNode) } idToNode.set(id, newNode) nodeToId.set(newNode, id) return newNode } - for (let fragment of fragments) { - getNode(fragment.id) + for (let item of items) { + getNode(item.id) } // run daglayout on GraphNode - let rtn = new Map() + let rtn = new Map() let allNodes = new Array() for (let _n of nodeToId.keys()) { allNodes.push(_n) @@ -257,33 +267,34 @@ function gridLayout( for (let item of resultMap) { let id = nodeToId.get(item[0]) if (!id) { - throw Error(`no corresponding fragment id of node ${item[0]}`) + throw Error(`no corresponding item of node ${item[0]}`) } - let fb = idToBox.get(id) + let fb = idToItem.get(id) if (!fb) { - throw Error(`fragment id ${id} is not present in idToBox`) + throw Error(`item id ${id} is not present in idToBox`) } rtn.set(fb, item[1]) } return rtn } -export interface FragmentBox { +export interface LayoutItemBase { id: string - name: string - order: number // preference order, fragment box with larger order will be placed at right + order: number // preference order, item with larger order will be placed at right or down width: number height: number parentIds: string[] +} + +export type FragmentBox = LayoutItemBase & { + name: string externalParentIds: string[] fragment?: TableFragments_Fragment } -export interface FragmentPoint { - id: string +export type RelationPoint = LayoutItemBase & { name: string - order: number // preference order, fragment box with larger order will be placed at right - parentIds: string[] + relation: Relation } export interface Position { @@ -292,7 +303,7 @@ export interface Position { } export type FragmentBoxPosition = FragmentBox & Position -export type FragmentPointPosition = FragmentPoint & Position +export type RelationPointPosition = RelationPoint & Position export interface Edge { points: Array @@ -301,15 +312,15 @@ export interface Edge { } /** - * @param fragments + * @param items * @returns the coordination of the top-left corner of the fragment box */ -export function layout( - fragments: Array, +export function layoutItem( + items: Array, layerMargin: number, rowMargin: number -): FragmentBoxPosition[] { - let layoutMap = gridLayout(fragments) +): (I & Position)[] { + let layoutMap = gridLayout(items) let layerRequiredWidth = new Map() let rowRequiredHeight = new Map() let maxLayer = 0, @@ -373,7 +384,7 @@ export function layout( getCumulativeMargin(i, rowMargin, rowCumulativeHeight, rowRequiredHeight) } - let rtn: Array = [] + let rtn: Array = [] for (let [data, [layer, row]] of layoutMap) { let x = layerCumulativeWidth.get(layer) @@ -391,39 +402,13 @@ export function layout( return rtn } -export function flipLayout( - fragments: Array, - layerMargin: number, - rowMargin: number -): FragmentBoxPosition[] { - const fragments_ = cloneDeep(fragments) - for (let fragment of fragments_) { - ;[fragment.width, fragment.height] = [fragment.height, fragment.width] - } - const fragmentPosition = layout(fragments_, rowMargin, layerMargin) - return fragmentPosition.map(({ x, y, ...data }) => ({ - x: y, - y: x, - ...data, - })) -} - -export function layoutPoint( - fragments: Array, +function layoutRelation( + relations: Array, layerMargin: number, rowMargin: number, nodeRadius: number -): FragmentPointPosition[] { - const fragmentBoxes: Array = [] - for (let { ...others } of fragments) { - fragmentBoxes.push({ - width: nodeRadius * 2, - height: nodeRadius * 2, - externalParentIds: [], // we don't care about external parent for point layout - ...others, - }) - } - const result = layout(fragmentBoxes, layerMargin, rowMargin) +): RelationPointPosition[] { + const result = layoutItem(relations, layerMargin, rowMargin) return result.map(({ x, y, ...data }) => ({ x: x + nodeRadius, y: y + nodeRadius, @@ -431,14 +416,14 @@ export function layoutPoint( })) } -export function flipLayoutPoint( - fragments: Array, +export function flipLayoutRelation( + relations: Array, layerMargin: number, rowMargin: number, nodeRadius: number -): FragmentPointPosition[] { - const fragmentPosition = layoutPoint( - fragments, +): RelationPointPosition[] { + const fragmentPosition = layoutRelation( + relations, rowMargin, layerMargin, nodeRadius @@ -450,21 +435,23 @@ export function flipLayoutPoint( })) } -export function generatePointEdges(layoutMap: FragmentPointPosition[]): Edge[] { +export function generateRelationEdges( + layoutMap: RelationPointPosition[] +): Edge[] { const links = [] - const fragmentMap = new Map() + const relationMap = new Map() for (const x of layoutMap) { - fragmentMap.set(x.id, x) + relationMap.set(x.id, x) } - for (const fragment of layoutMap) { - for (const parentId of fragment.parentIds) { - const parentFragment = fragmentMap.get(parentId)! + for (const relation of layoutMap) { + for (const parentId of relation.parentIds) { + const parentRelation = relationMap.get(parentId)! links.push({ points: [ - { x: fragment.x, y: fragment.y }, - { x: parentFragment.x, y: parentFragment.y }, + { x: relation.x, y: relation.y }, + { x: parentRelation.x, y: parentRelation.y }, ], - source: fragment.id, + source: relation.id, target: parentId, }) } @@ -472,7 +459,9 @@ export function generatePointEdges(layoutMap: FragmentPointPosition[]): Edge[] { return links } -export function generateBoxEdges(layoutMap: FragmentBoxPosition[]): Edge[] { +export function generateFragmentEdges( + layoutMap: FragmentBoxPosition[] +): Edge[] { const links = [] const fragmentMap = new Map() for (const x of layoutMap) { diff --git a/dashboard/pages/api/streaming.ts b/dashboard/pages/api/streaming.ts index a77a165357b9f..13fa8716f821a 100644 --- a/dashboard/pages/api/streaming.ts +++ b/dashboard/pages/api/streaming.ts @@ -45,8 +45,26 @@ export interface StreamingJob extends Relation { dependentRelations: number[] } +export function relationType(x: Relation) { + if ((x as Table).tableType !== undefined) { + return (x as Table).tableType + } else if ((x as Sink).sinkFromName !== undefined) { + return "SINK" + } else if ((x as Source).info !== undefined) { + return "SOURCE" + } else { + return "UNKNOWN" + } +} +export type RelationType = ReturnType + +export function relationTypeTitleCase(x: Relation) { + return _.startCase(_.toLower(relationType(x))) +} + export function relationIsStreamingJob(x: Relation): x is StreamingJob { - return (x as StreamingJob).dependentRelations !== undefined + const type = relationType(x) + return type !== "UNKNOWN" && type !== "SOURCE" && type !== "INTERNAL" } export async function getStreamingJobs() { diff --git a/dashboard/pages/dependency_graph.tsx b/dashboard/pages/dependency_graph.tsx index fb29f57b11bb5..a4c13a94df169 100644 --- a/dashboard/pages/dependency_graph.tsx +++ b/dashboard/pages/dependency_graph.tsx @@ -20,15 +20,17 @@ import { reverse, sortBy } from "lodash" import Head from "next/head" import { parseAsInteger, useQueryState } from "nuqs" import { Fragment, useCallback } from "react" -import RelationDependencyGraph from "../components/RelationDependencyGraph" +import RelationDependencyGraph, { + nodeRadius, +} from "../components/RelationDependencyGraph" import Title from "../components/Title" -import { FragmentPoint } from "../lib/layout" +import { RelationPoint } from "../lib/layout" import useFetch from "./api/fetch" import { Relation, getRelations, relationIsStreamingJob } from "./api/streaming" const SIDEBAR_WIDTH = "200px" -function buildDependencyAsEdges(list: Relation[]): FragmentPoint[] { +function buildDependencyAsEdges(list: Relation[]): RelationPoint[] { const edges = [] const relationSet = new Set(list.map((r) => r.id)) for (const r of reverse(sortBy(list, "id"))) { @@ -41,24 +43,27 @@ function buildDependencyAsEdges(list: Relation[]): FragmentPoint[] { .map((r) => r.toString()) : [], order: r.id, + width: nodeRadius * 2, + height: nodeRadius * 2, + relation: r, }) } return edges } export default function StreamingGraph() { - const { response: streamingJobList } = useFetch(getRelations) + const { response: relationList } = useFetch(getRelations) const [selectedId, setSelectedId] = useQueryState("id", parseAsInteger) - const mvDependencyCallback = useCallback(() => { - if (streamingJobList) { - return buildDependencyAsEdges(streamingJobList) + const relationDependencyCallback = useCallback(() => { + if (relationList) { + return buildDependencyAsEdges(relationList) } else { return undefined } - }, [streamingJobList]) + }, [relationList]) - const mvDependency = mvDependencyCallback() + const relationDependency = relationDependencyCallback() const retVal = ( @@ -77,7 +82,7 @@ export default function StreamingGraph() { - {streamingJobList?.map((r) => { + {relationList?.map((r) => { const match = selectedId === r.id return (