Skip to content

Commit

Permalink
feat(dashboard): enable compression for API & minor refactors (#15558)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Mar 11, 2024
1 parent fe21213 commit 0b23c9d
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 36 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions dashboard/lib/api/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*
*/
import { Metrics, MetricsSample } from "../../components/metrics"
import { GetBackPressureResponse } from "../../proto/gen/monitor_service"
import api from "./api"

export interface BackPressuresMetrics {
Expand Down Expand Up @@ -151,10 +152,11 @@ export const BackPressureInfo = {

// Get back pressure from meta node -> compute node
export async function fetchEmbeddedBackPressure() {
const response = await api.get("/metrics/fragment/embedded_back_pressures")
let backPressureInfos: BackPressureInfo[] = response.backPressureInfos.map(
BackPressureInfo.fromJSON
const response: GetBackPressureResponse = await api.get(
"/metrics/fragment/embedded_back_pressures"
)
let backPressureInfos: BackPressureInfo[] =
response.backPressureInfos?.map(BackPressureInfo.fromJSON) ?? []
backPressureInfos = backPressureInfos.sort((a, b) => a.actorId - b.actorId)
return backPressureInfos
}
Expand Down
6 changes: 1 addition & 5 deletions dashboard/lib/api/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@
import _ from "lodash"
import sortBy from "lodash/sortBy"
import { Sink, Source, Table, View } from "../../proto/gen/catalog"
import { ActorLocation, TableFragments } from "../../proto/gen/meta"
import { TableFragments } from "../../proto/gen/meta"
import { ColumnCatalog, Field } from "../../proto/gen/plan_common"
import api from "./api"

export async function getActors(): Promise<ActorLocation[]> {
return (await api.get("/actors")).map(ActorLocation.fromJSON)
}

export async function getFragments(): Promise<TableFragments[]> {
let fragmentList: TableFragments[] = (await api.get("/fragments2")).map(
TableFragments.fromJSON
Expand Down
7 changes: 6 additions & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ uuid = { version = "1", features = ["v4"] }

[target.'cfg(not(madsim))'.dependencies]
axum = "0.6"
tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] }
tower-http = { version = "0.4", features = [
"add-extension",
"cors",
"fs",
"compression-gzip",
] }
workspace-hack = { path = "../workspace-hack" }

[dev-dependencies]
Expand Down
31 changes: 4 additions & 27 deletions src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use risingwave_rpc_client::ComputeClientPool;
use thiserror_ext::AsReport;
use tower::{ServiceBuilder, ServiceExt};
use tower_http::add_extension::AddExtensionLayer;
use tower_http::compression::CompressionLayer;
use tower_http::cors::{self, CorsLayer};
use tower_http::services::ServeDir;

Expand Down Expand Up @@ -63,7 +64,7 @@ pub(super) mod handlers {
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{Sink, Source, Table, View};
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::meta::{ActorLocation, PbTableFragments};
use risingwave_pb::meta::PbTableFragments;
use risingwave_pb::monitor_service::{
GetBackPressureResponse, HeapProfilingResponse, ListHeapProfilingResponse,
StackTraceResponse,
Expand Down Expand Up @@ -175,30 +176,6 @@ pub(super) mod handlers {
Ok(Json(views))
}

pub async fn list_actors(
Extension(srv): Extension<Service>,
) -> Result<Json<Vec<ActorLocation>>> {
let mut node_actors = srv
.metadata_manager
.all_node_actors(true)
.await
.map_err(err)?;
let nodes = srv
.metadata_manager
.list_active_streaming_compute_nodes()
.await
.map_err(err)?;
let actors = nodes
.into_iter()
.map(|node| ActorLocation {
node: Some(node.clone()),
actors: node_actors.remove(&node.id).unwrap_or_default(),
})
.collect::<Vec<_>>();

Ok(Json(actors))
}

pub async fn list_fragments(
Extension(srv): Extension<Service>,
) -> Result<Json<Vec<PbTableFragments>>> {
Expand Down Expand Up @@ -405,7 +382,6 @@ impl DashboardService {

let api_router = Router::new()
.route("/clusters/:ty", get(list_clusters))
.route("/actors", get(list_actors))
.route("/fragments2", get(list_fragments))
.route("/views", get(list_views))
.route("/materialized_views", get(list_materialized_views))
Expand Down Expand Up @@ -465,7 +441,8 @@ impl DashboardService {
let app = Router::new()
.fallback_service(dashboard_router)
.nest("/api", api_router)
.nest("/trace", trace_ui_router);
.nest("/trace", trace_ui_router)
.layer(CompressionLayer::new());

axum::Server::bind(&srv.dashboard_addr)
.serve(app.into_make_service())
Expand Down

0 comments on commit 0b23c9d

Please sign in to comment.