diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index 238f1bc4e6d00..90b9813fb9d26 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -23,12 +23,13 @@ use risingwave_common::types::Timestamptz; use risingwave_common::util::StackTraceResponseExt; use risingwave_hummock_sdk::level::Level; use risingwave_meta_model_v2::table::TableType; -use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::event_log::Event; use risingwave_pb::meta::EventLog; use risingwave_pb::monitor_service::StackTraceResponse; use risingwave_rpc_client::ComputeClientPool; +use risingwave_sqlparser::ast::{CompatibleSourceSchema, Statement, Value}; +use risingwave_sqlparser::parser::Parser; use serde_json::json; use thiserror_ext::AsReport; @@ -688,65 +689,35 @@ impl DiagnoseCommand { .list_sources() .await? .into_iter() - .map(|s| { - // The usage of secrets suggests that it's safe to display the definition. - let redact = if !s.get_secret_refs().is_empty() { - false - } else { - !s.get_info() - .map(|i| !i.get_format_encode_secret_refs().is_empty()) - .unwrap_or(false) - }; - (s.id, (s.name, s.schema_id, s.definition, redact)) - }) + .map(|s| (s.id, (s.name, s.schema_id, s.definition))) .collect::>(); let tables = mgr .catalog_controller .list_tables_by_type(TableType::Table) .await? .into_iter() - .map(|t| { - let redact = - if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) = - t.optional_associated_source_id - { - sources.get(&source_id).map(|s| s.3).unwrap_or(true) - } else { - false - }; - (t.id, (t.name, t.schema_id, t.definition, redact)) - }) + .map(|t| (t.id, (t.name, t.schema_id, t.definition))) .collect::>(); let mvs = mgr .catalog_controller .list_tables_by_type(TableType::MaterializedView) .await? .into_iter() - .map(|t| (t.id, (t.name, t.schema_id, t.definition, false))) + .map(|t| (t.id, (t.name, t.schema_id, t.definition))) .collect::>(); let indexes = mgr .catalog_controller .list_tables_by_type(TableType::Index) .await? .into_iter() - .map(|t| (t.id, (t.name, t.schema_id, t.definition, false))) + .map(|t| (t.id, (t.name, t.schema_id, t.definition))) .collect::>(); let sinks = mgr .catalog_controller .list_sinks() .await? .into_iter() - .map(|s| { - // The usage of secrets suggests that it's safe to display the definition. - let redact = if !s.get_secret_refs().is_empty() { - false - } else { - !s.format_desc - .map(|i| !i.get_secret_refs().is_empty()) - .unwrap_or(false) - }; - (s.id, (s.name, s.schema_id, s.definition, redact)) - }) + .map(|s| (s.id, (s.name, s.schema_id, s.definition))) .collect::>(); let catalogs = [ ("SOURCE", sources), @@ -766,13 +737,10 @@ impl DiagnoseCommand { row.add_cell("definition".into()); row }); - for (id, (name, schema_id, definition, redact)) in items { + for (id, (name, schema_id, definition)) in items { let mut row = Row::new(); - let may_redact = if redact { - "[REDACTED]".into() - } else { - definition - }; + let may_redact = + redact_all_sql_options(&definition).unwrap_or_else(|| "[REDACTED]".into()); row.add_cell(id.into()); row.add_cell(name.into()); row.add_cell(schema_id.into()); @@ -803,3 +771,52 @@ fn try_add_cell>(row: &mut comfy_table::Row, t: Optio fn merge_prometheus_selector<'a>(selectors: impl IntoIterator) -> String { selectors.into_iter().filter(|s| !s.is_empty()).join(",") } + +fn redact_all_sql_options(sql: &str) -> Option { + let Ok(mut statements) = Parser::parse_sql(sql) else { + return None; + }; + let mut redacted = String::new(); + for statement in &mut statements { + let options = match statement { + Statement::CreateTable { + with_options, + source_schema, + .. + } => { + let connector_schema = match source_schema { + Some(CompatibleSourceSchema::V2(cs)) => Some(&mut cs.row_options), + _ => None, + }; + (Some(with_options), connector_schema) + } + Statement::CreateSource { stmt } => { + let connector_schema = match &mut stmt.source_schema { + CompatibleSourceSchema::V2(cs) => Some(&mut cs.row_options), + _ => None, + }; + (Some(&mut stmt.with_properties.0), connector_schema) + } + Statement::CreateSink { stmt } => { + let connector_schema = match &mut stmt.sink_schema { + Some(cs) => Some(&mut cs.row_options), + _ => None, + }; + (Some(&mut stmt.with_properties.0), connector_schema) + } + _ => (None, None), + }; + if let Some(options) = options.0 { + for option in options { + option.value = Value::SingleQuotedString("[REDACTED]".into()); + } + } + if let Some(options) = options.1 { + for option in options { + option.value = Value::SingleQuotedString("[REDACTED]".into()); + } + } + writeln!(&mut redacted, "{statement}").unwrap(); + } + Some(redacted) +}