Skip to content

Commit

Permalink
feat(dashboard): include table definition in diagnose report
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jul 29, 2024
1 parent 3facd96 commit 06b2765
Showing 1 changed file with 122 additions and 3 deletions.
125 changes: 122 additions & 3 deletions src/meta/src/manager/diagnose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::cmp::{Ordering, Reverse};
use std::collections::BinaryHeap;
use std::collections::{BTreeMap, BinaryHeap};
use std::fmt::Write;
use std::sync::Arc;

Expand All @@ -22,6 +22,8 @@ use prometheus_http_query::response::Data::Vector;
use risingwave_common::types::Timestamptz;
use risingwave_common::util::StackTraceResponseExt;
use risingwave_hummock_sdk::level::Level;
use risingwave_meta_model_v2::table::TableType;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::event_log::Event;
use risingwave_pb::meta::EventLog;
Expand All @@ -32,7 +34,8 @@ use thiserror_ext::AsReport;

use crate::hummock::HummockManagerRef;
use crate::manager::event_log::EventLogMangerRef;
use crate::manager::MetadataManager;
use crate::manager::{MetadataManager, MetadataManagerV2};
use crate::MetaResult;

pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;

Expand Down Expand Up @@ -88,7 +91,15 @@ impl DiagnoseCommand {
async fn write_catalog(&self, s: &mut String) {
match &self.metadata_manager {
MetadataManager::V1(_) => self.write_catalog_v1(s).await,
MetadataManager::V2(_) => self.write_catalog_v2(s).await,
MetadataManager::V2(mgr) => {
self.write_catalog_v2(s).await;
let _ = self.write_table_definition(mgr, s).await.inspect_err(|e| {
tracing::warn!(
error = e.to_report_string(),
"failed to display table definition"
)
});
}
}
}

Expand Down Expand Up @@ -678,6 +689,114 @@ impl DiagnoseCommand {

write!(s, "{}", all.output()).unwrap();
}

async fn write_table_definition(
&self,
mgr: &MetadataManagerV2,
s: &mut String,
) -> MetaResult<()> {
let sources = mgr
.catalog_controller
.list_sources()
.await?
.into_iter()
.map(|s| {
// The usage of secrets suggests that it's safe to display the definition.
let redact = if !s.get_secret_refs().is_empty() {
false
} else {
!s.get_info()
.map(|i| !i.get_format_encode_secret_refs().is_empty())
.unwrap_or(false)
};
(s.id, (s.name, s.schema_id, s.definition, redact))
})
.collect::<BTreeMap<_, _>>();
let tables = mgr
.catalog_controller
.list_tables_by_type(TableType::Table)
.await?
.into_iter()
.map(|t| {
let redact =
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) =
t.optional_associated_source_id
{
sources.get(&source_id).map(|s| s.3).unwrap_or(true)
} else {
false
};
(t.id, (t.name, t.schema_id, t.definition, redact))
})
.collect::<BTreeMap<_, _>>();
let mvs = mgr
.catalog_controller
.list_tables_by_type(TableType::MaterializedView)
.await?
.into_iter()
.map(|t| (t.id, (t.name, t.schema_id, t.definition, false)))
.collect::<BTreeMap<_, _>>();
let indexes = mgr
.catalog_controller
.list_tables_by_type(TableType::Index)
.await?
.into_iter()
.map(|t| (t.id, (t.name, t.schema_id, t.definition, false)))
.collect::<BTreeMap<_, _>>();
let sinks = mgr
.catalog_controller
.list_sinks()
.await?
.into_iter()
.map(|s| {
// The usage of secrets suggests that it's safe to display the definition.
let redact = if !s.get_secret_refs().is_empty() {
false
} else {
!s.format_desc
.map(|i| !i.get_secret_refs().is_empty())
.unwrap_or(false)
};
(s.id, (s.name, s.schema_id, s.definition, redact))
})
.collect::<BTreeMap<_, _>>();
let catalogs = [
("SOURCE", sources),
("TABLE", tables),
("MATERIALIZED VIEW", mvs),
("INDEX", indexes),
("SINK", sinks),
];
for (title, items) in catalogs {
use comfy_table::{Row, Table};
let mut table = Table::new();
table.set_header({
let mut row = Row::new();
row.add_cell("id".into());
row.add_cell("name".into());
row.add_cell("schema_id".into());
row.add_cell("definition".into());
row
});
for (id, (name, schema_id, definition, redact)) in items {
let mut row = Row::new();
let may_redact = if redact {
"[REDACTED]".into()
} else {
definition
};
row.add_cell(id.into());
row.add_cell(name.into());
row.add_cell(schema_id.into());
row.add_cell(may_redact.into());
table.add_row(row);
}
let _ = writeln!(s);
let _ = writeln!(s, "{title}");
let _ = writeln!(s, "{table}");
}
Ok(())
}
}

#[cfg_attr(coverage, coverage(off))]
Expand Down

0 comments on commit 06b2765

Please sign in to comment.