From c82698a4452f6a7cb472f3acfaba35d612adeec0 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Sun, 18 Feb 2024 16:45:50 +0800 Subject: [PATCH] feat(catalog): implement `pg_settings` (#15108) Signed-off-by: Runji Wang --- e2e_test/batch/catalog/issue_8791.slt.part | 8 ++-- e2e_test/batch/catalog/pg_settings.slt.part | 45 ++++++++++++++++++- .../src/catalog/system_catalog/mod.rs | 7 +++ .../system_catalog/pg_catalog/pg_settings.rs | 19 +++++++- src/frontend/src/scheduler/local.rs | 8 +--- src/frontend/src/scheduler/mod.rs | 2 +- src/frontend/src/scheduler/task_context.rs | 28 ++++++------ 7 files changed, 90 insertions(+), 27 deletions(-) diff --git a/e2e_test/batch/catalog/issue_8791.slt.part b/e2e_test/batch/catalog/issue_8791.slt.part index b8339c44a23f4..482d0965ba543 100644 --- a/e2e_test/batch/catalog/issue_8791.slt.part +++ b/e2e_test/batch/catalog/issue_8791.slt.part @@ -1,16 +1,16 @@ # UNION and other complex queries should also be in local mode query I -SELECT name FROM pg_catalog.pg_settings union select 'a'; +SELECT amname FROM pg_catalog.pg_am union select 'a'; ---- a query T -SELECT name FROM (SELECT pg_catalog.lower(name) AS name FROM pg_catalog.pg_settings UNION ALL SELECT 'session authorization' UNION ALL SELECT 'all') ss WHERE substring(name,1,0)='' -LIMIT 1000 +SELECT amname FROM (SELECT pg_catalog.lower(amname) AS amname FROM pg_catalog.pg_am UNION ALL SELECT 'session authorization' UNION ALL SELECT 'all') ss WHERE substring(amname,1,0)='' +LIMIT 1000; ---- session authorization all query I -with q as ( select name FROM pg_catalog.pg_settings ) select * from q; +with q as ( select amname FROM pg_catalog.pg_am ) select * from q; ---- diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 0481ab1a1dd5b..09e2546a856d8 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -1,6 +1,49 @@ query TT -SELECT * FROM pg_catalog.pg_settings; +SELECT name FROM pg_catalog.pg_settings order by name; ---- +application_name +background_ddl +batch_enable_distributed_dml +batch_parallelism +bytea_output +client_encoding +client_min_messages +create_compaction_group_for_mv +datestyle +extra_float_digits +idle_in_transaction_session_timeout +intervalstyle +lock_timeout +max_split_range_gap +query_epoch +query_mode +row_security +rw_batch_enable_lookup_join +rw_batch_enable_sort_agg +rw_enable_join_ordering +rw_enable_share_plan +rw_enable_two_phase_agg +rw_force_split_distinct_agg +rw_force_two_phase_agg +rw_implicit_flush +rw_streaming_allow_jsonb_in_stream_key +rw_streaming_enable_bushy_join +rw_streaming_enable_delta_join +rw_streaming_over_window_cache_policy +search_path +server_encoding +server_version +server_version_num +sink_decouple +standard_conforming_strings +statement_timeout +streaming_enable_arrangement_backfill +streaming_parallelism +streaming_rate_limit +synchronize_seqscans +timezone +transaction_isolation +visibility_mode query TT SELECT * FROM pg_catalog.pg_settings where name='dummy'; diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index a0be5af42fd36..61ec69b77ae5a 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -22,6 +22,7 @@ use std::sync::{Arc, LazyLock}; use async_trait::async_trait; use futures::future::BoxFuture; use itertools::Itertools; +use parking_lot::RwLock; use risingwave_common::acl::AclMode; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{ @@ -29,6 +30,7 @@ use risingwave_common::catalog::{ NON_RESERVED_SYS_CATALOG_ID, }; use risingwave_common::error::BoxedError; +use risingwave_common::session_config::ConfigMap; use risingwave_common::types::DataType; use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism}; @@ -104,7 +106,10 @@ pub struct SysCatalogReaderImpl { worker_node_manager: WorkerNodeManagerRef, // Read from meta. meta_client: Arc, + // Read auth context. auth_context: Arc, + // Read config. + config: Arc>, } impl SysCatalogReaderImpl { @@ -114,6 +119,7 @@ impl SysCatalogReaderImpl { worker_node_manager: WorkerNodeManagerRef, meta_client: Arc, auth_context: Arc, + config: Arc>, ) -> Self { Self { catalog_reader, @@ -121,6 +127,7 @@ impl SysCatalogReaderImpl { worker_node_manager, meta_client, auth_context, + config, } } } diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_settings.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_settings.rs index 5acd1dab05a7d..4fc0fb057108f 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_settings.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_settings.rs @@ -15,11 +15,28 @@ use risingwave_common::types::Fields; use risingwave_frontend_macro::system_catalog; +use crate::catalog::system_catalog::SysCatalogReaderImpl; + /// The catalog `pg_settings` stores settings. /// Ref: [`https://www.postgresql.org/docs/current/view-pg-settings.html`] -#[system_catalog(view, "pg_catalog.pg_settings")] #[derive(Fields)] struct PgSetting { name: String, setting: String, + short_desc: String, +} + +#[system_catalog(table, "pg_catalog.pg_settings")] +fn read_pg_settings(reader: &SysCatalogReaderImpl) -> Vec { + let config_reader = reader.config.read(); + let all_variables = config_reader.show_all(); + + all_variables + .iter() + .map(|info| PgSetting { + name: info.name.clone(), + setting: info.setting.clone(), + short_desc: info.description.clone(), + }) + .collect() } diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index c155cfe9aa234..5e1838e765a49 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -52,7 +52,7 @@ use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId}; use crate::scheduler::task_context::FrontendBatchTaskContext; use crate::scheduler::worker_node_manager::WorkerNodeSelector; use crate::scheduler::{ReadSnapshot, SchedulerError, SchedulerResult}; -use crate::session::{AuthContext, FrontendEnv, SessionImpl}; +use crate::session::{FrontendEnv, SessionImpl}; pub type LocalQueryStream = ReceiverStream>; @@ -94,10 +94,6 @@ impl LocalQueryExecution { } } - fn auth_context(&self) -> Arc { - self.session.auth_context() - } - fn shutdown_rx(&self) -> ShutdownToken { self.session.reset_cancel_query_flag() } @@ -106,7 +102,7 @@ impl LocalQueryExecution { pub async fn run_inner(self) { debug!(%self.query.query_id, self.sql, "Starting to run query"); - let context = FrontendBatchTaskContext::new(self.front_env.clone(), self.auth_context()); + let context = FrontendBatchTaskContext::new(self.session.clone()); let task_id = TaskId { query_id: self.query.query_id.id.clone(), diff --git a/src/frontend/src/scheduler/mod.rs b/src/frontend/src/scheduler/mod.rs index bb27231c21fdc..225f15d5b66ca 100644 --- a/src/frontend/src/scheduler/mod.rs +++ b/src/frontend/src/scheduler/mod.rs @@ -66,6 +66,6 @@ impl ExecutionContext { } pub fn to_batch_task_context(&self) -> FrontendBatchTaskContext { - FrontendBatchTaskContext::new(self.session.env().clone(), self.session.auth_context()) + FrontendBatchTaskContext::new(self.session.clone()) } } diff --git a/src/frontend/src/scheduler/task_context.rs b/src/frontend/src/scheduler/task_context.rs index ed1c4d78dc45c..dfb2496dad556 100644 --- a/src/frontend/src/scheduler/task_context.rs +++ b/src/frontend/src/scheduler/task_context.rs @@ -25,18 +25,17 @@ use risingwave_connector::source::monitor::SourceMetrics; use risingwave_rpc_client::ComputeClientPoolRef; use crate::catalog::system_catalog::SysCatalogReaderImpl; -use crate::session::{AuthContext, FrontendEnv}; +use crate::session::SessionImpl; /// Batch task execution context in frontend. #[derive(Clone)] pub struct FrontendBatchTaskContext { - env: FrontendEnv, - auth_context: Arc, + session: Arc, } impl FrontendBatchTaskContext { - pub fn new(env: FrontendEnv, auth_context: Arc) -> Self { - Self { env, auth_context } + pub fn new(session: Arc) -> Self { + Self { session } } } @@ -47,16 +46,17 @@ impl BatchTaskContext for FrontendBatchTaskContext { fn catalog_reader(&self) -> SysCatalogReaderRef { Arc::new(SysCatalogReaderImpl::new( - self.env.catalog_reader().clone(), - self.env.user_info_reader().clone(), - self.env.worker_node_manager_ref(), - self.env.meta_client_ref(), - self.auth_context.clone(), + self.session.env().catalog_reader().clone(), + self.session.env().user_info_reader().clone(), + self.session.env().worker_node_manager_ref(), + self.session.env().meta_client_ref(), + self.session.auth_context(), + self.session.shared_config(), )) } fn is_local_addr(&self, peer_addr: &HostAddr) -> bool { - is_local_address(self.env.server_address(), peer_addr) + is_local_address(self.session.env().server_address(), peer_addr) } fn state_store(&self) -> risingwave_storage::store_impl::StateStoreImpl { @@ -68,11 +68,11 @@ impl BatchTaskContext for FrontendBatchTaskContext { } fn client_pool(&self) -> ComputeClientPoolRef { - self.env.client_pool() + self.session.env().client_pool() } fn get_config(&self) -> &BatchConfig { - self.env.batch_config() + self.session.env().batch_config() } fn dml_manager(&self) -> risingwave_dml::dml_manager::DmlManagerRef { @@ -80,7 +80,7 @@ impl BatchTaskContext for FrontendBatchTaskContext { } fn source_metrics(&self) -> Arc { - self.env.source_metrics() + self.session.env().source_metrics() } fn store_mem_usage(&self, _val: usize) {