Skip to content

Commit

Permalink
feat(dashboard): add suscription in catalog (#16747)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored May 14, 2024
1 parent 0971e55 commit a0b2350
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 9 deletions.
1 change: 1 addition & 0 deletions dashboard/components/Layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ function Layout({ children }: { children: React.ReactNode }) {
<NavButton href="/internal_tables/">Internal Tables</NavButton>
<NavButton href="/sinks/">Sinks</NavButton>
<NavButton href="/views/">Views</NavButton>
<NavButton href="/subscriptions/">Subscriptions</NavButton>
</Section>
<Section>
<NavTitle>Streaming</NavTitle>
Expand Down
16 changes: 10 additions & 6 deletions dashboard/components/Relations.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,16 @@ export function Relations<R extends Relation>(
{extraColumns.map((c) => (
<Td key={c.name}>{c.content(r)}</Td>
))}
<Td overflowWrap="normal">
{r.columns
.filter((col) => ("isHidden" in col ? !col.isHidden : true))
.map((col) => extractColumnInfo(col))
.join(", ")}
</Td>
{r.columns && r.columns.length > 0 && (
<Td overflowWrap="normal">
{r.columns
.filter((col) =>
"isHidden" in col ? !col.isHidden : true
)
.map((col) => extractColumnInfo(col))
.join(", ")}
</Td>
)}
</Tr>
))}
</Tbody>
Expand Down
16 changes: 14 additions & 2 deletions dashboard/lib/api/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
Schema,
Sink,
Source,
Subscription,
Table,
View,
} from "../../proto/gen/catalog"
Expand All @@ -47,9 +48,9 @@ export interface Relation {
owner: number
schemaId: number
databaseId: number
columns: (ColumnCatalog | Field)[]

// For display
columns?: (ColumnCatalog | Field)[]
ownerName?: string
schemaName?: string
databaseName?: string
Expand All @@ -66,6 +67,8 @@ export function relationType(x: Relation) {
return "SINK"
} else if ((x as Source).info !== undefined) {
return "SOURCE"
} else if ((x as Subscription).dependentTableId !== undefined) {
return "SUBSCRIPTION"
} else {
return "UNKNOWN"
}
Expand Down Expand Up @@ -98,7 +101,8 @@ export async function getRelations() {
await getTables(),
await getIndexes(),
await getSinks(),
await getSources()
await getSources(),
await getSubscriptions()
)
relations = sortBy(relations, (x) => x.id)
return relations
Expand Down Expand Up @@ -150,6 +154,14 @@ export async function getViews() {
return views
}

export async function getSubscriptions() {
let subscriptions: Subscription[] = (await api.get("/subscriptions")).map(
Subscription.fromJSON
)
subscriptions = sortBy(subscriptions, (x) => x.id)
return subscriptions
}

export async function getUsers() {
let users: UserInfo[] = (await api.get("/users")).map(UserInfo.fromJSON)
users = sortBy(users, (x) => x.id)
Expand Down
4 changes: 4 additions & 0 deletions dashboard/mock-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ app.get("/indexes", (req, res, next) => {
res.json(require("./mock/indexes.json"))
})

app.get("/indexes", (req, res, next) => {
res.json(require("./mock/indexes.json"))
})

app.get("/internal_tables", (req, res, next) => {
res.json(require("./mock/internal_tables.json"))
})
Expand Down
27 changes: 27 additions & 0 deletions dashboard/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dashboard/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"clean": "rm -rf .next/ && rm -rf out/"
},
"dependencies": {
"16": "^0.0.2",
"@chakra-ui/react": "^2.3.1",
"@emotion/react": "^11.10.4",
"@emotion/styled": "^11.10.4",
Expand Down
38 changes: 38 additions & 0 deletions dashboard/pages/subscriptions.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2024 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { Column, Relations } from "../components/Relations"
import { getSubscriptions } from "../lib/api/streaming"
import { Subscription as RwSubscription } from "../proto/gen/catalog"

export default function Subscriptions() {
const subscriptionRetentionSeconds: Column<RwSubscription> = {
name: "Retention Seconds",
width: 3,
content: (r) => r.retentionSeconds ?? "unknown",
}

const subscriptionDependentTableId: Column<RwSubscription> = {
name: "Dependent Table Id",
width: 3,
content: (r) => r.dependentTableId ?? "unknown",
}
return Relations("Subscriptions", getSubscriptions, [
subscriptionRetentionSeconds,
subscriptionDependentTableId,
])
}
23 changes: 23 additions & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,29 @@ impl CatalogController {
}
}));

let subscription_dependencies: Vec<(SubscriptionId, TableId)> = Subscription::find()
.select_only()
.columns([
subscription::Column::SubscriptionId,
subscription::Column::DependentTableId,
])
.join(JoinType::InnerJoin, subscription::Relation::Object.def())
.filter(
subscription::Column::SubscriptionState
.eq(Into::<i32>::into(SubscriptionState::Created))
.and(subscription::Column::DependentTableId.is_not_null()),
)
.into_tuple()
.all(&inner.db)
.await?;

obj_dependencies.extend(subscription_dependencies.into_iter().map(
|(subscription_id, table_id)| PbObjectDependencies {
object_id: subscription_id as _,
referenced_object_id: table_id as _,
},
));

Ok(obj_dependencies)
}

Expand Down
18 changes: 17 additions & 1 deletion src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub(super) mod handlers {
use itertools::Itertools;
use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{PbDatabase, PbSchema, Sink, Source, Table, View};
use risingwave_pb::catalog::{PbDatabase, PbSchema, Sink, Source, Subscription, Table, View};
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::PbTableFragments;
Expand Down Expand Up @@ -141,6 +141,21 @@ pub(super) mod handlers {
list_table_catalogs_inner(&srv.metadata_manager, TableType::Index).await
}

pub async fn list_subscription(
Extension(srv): Extension<Service>,
) -> Result<Json<Vec<Subscription>>> {
let subscriptions = match &srv.metadata_manager {
MetadataManager::V1(mgr) => mgr.catalog_manager.list_subscriptions().await,
MetadataManager::V2(mgr) => mgr
.catalog_controller
.list_subscriptions()
.await
.map_err(err)?,
};

Ok(Json(subscriptions))
}

pub async fn list_internal_tables(
Extension(srv): Extension<Service>,
) -> Result<Json<Vec<Table>>> {
Expand Down Expand Up @@ -417,6 +432,7 @@ impl DashboardService {
.route("/materialized_views", get(list_materialized_views))
.route("/tables", get(list_tables))
.route("/indexes", get(list_indexes))
.route("/subscriptions", get(list_subscription))
.route("/internal_tables", get(list_internal_tables))
.route("/sources", get(list_sources))
.route("/sinks", get(list_sinks))
Expand Down
7 changes: 7 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3702,6 +3702,13 @@ impl CatalogManager {
}
}

for subscription in core.subscriptions.values() {
dependencies.push(PbObjectDependencies {
object_id: subscription.id,
referenced_object_id: subscription.dependent_table_id,
});
}

dependencies
}

Expand Down

0 comments on commit a0b2350

Please sign in to comment.