Skip to content

Commit

Permalink
feat(catalog): implement pg_settings (#15108)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Feb 18, 2024
1 parent 0975be2 commit c82698a
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 27 deletions.
8 changes: 4 additions & 4 deletions e2e_test/batch/catalog/issue_8791.slt.part
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# UNION and other complex queries should also be in local mode
query I
SELECT name FROM pg_catalog.pg_settings union select 'a';
SELECT amname FROM pg_catalog.pg_am union select 'a';
----
a

query T
SELECT name FROM (SELECT pg_catalog.lower(name) AS name FROM pg_catalog.pg_settings UNION ALL SELECT 'session authorization' UNION ALL SELECT 'all') ss WHERE substring(name,1,0)=''
LIMIT 1000
SELECT amname FROM (SELECT pg_catalog.lower(amname) AS amname FROM pg_catalog.pg_am UNION ALL SELECT 'session authorization' UNION ALL SELECT 'all') ss WHERE substring(amname,1,0)=''
LIMIT 1000;
----
session authorization
all

query I
with q as ( select name FROM pg_catalog.pg_settings ) select * from q;
with q as ( select amname FROM pg_catalog.pg_am ) select * from q;
----
45 changes: 44 additions & 1 deletion e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
@@ -1,6 +1,49 @@
query TT
SELECT * FROM pg_catalog.pg_settings;
SELECT name FROM pg_catalog.pg_settings order by name;
----
application_name
background_ddl
batch_enable_distributed_dml
batch_parallelism
bytea_output
client_encoding
client_min_messages
create_compaction_group_for_mv
datestyle
extra_float_digits
idle_in_transaction_session_timeout
intervalstyle
lock_timeout
max_split_range_gap
query_epoch
query_mode
row_security
rw_batch_enable_lookup_join
rw_batch_enable_sort_agg
rw_enable_join_ordering
rw_enable_share_plan
rw_enable_two_phase_agg
rw_force_split_distinct_agg
rw_force_two_phase_agg
rw_implicit_flush
rw_streaming_allow_jsonb_in_stream_key
rw_streaming_enable_bushy_join
rw_streaming_enable_delta_join
rw_streaming_over_window_cache_policy
search_path
server_encoding
server_version
server_version_num
sink_decouple
standard_conforming_strings
statement_timeout
streaming_enable_arrangement_backfill
streaming_parallelism
streaming_rate_limit
synchronize_seqscans
timezone
transaction_isolation
visibility_mode

query TT
SELECT * FROM pg_catalog.pg_settings where name='dummy';
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use std::sync::{Arc, LazyLock};
use async_trait::async_trait;
use futures::future::BoxFuture;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::acl::AclMode;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, Field, SysCatalogReader, TableDesc, TableId, DEFAULT_SUPER_USER_ID,
NON_RESERVED_SYS_CATALOG_ID,
};
use risingwave_common::error::BoxedError;
use risingwave_common::session_config::ConfigMap;
use risingwave_common::types::DataType;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
Expand Down Expand Up @@ -104,7 +106,10 @@ pub struct SysCatalogReaderImpl {
worker_node_manager: WorkerNodeManagerRef,
// Read from meta.
meta_client: Arc<dyn FrontendMetaClient>,
// Read auth context.
auth_context: Arc<AuthContext>,
// Read config.
config: Arc<RwLock<ConfigMap>>,
}

impl SysCatalogReaderImpl {
Expand All @@ -114,13 +119,15 @@ impl SysCatalogReaderImpl {
worker_node_manager: WorkerNodeManagerRef,
meta_client: Arc<dyn FrontendMetaClient>,
auth_context: Arc<AuthContext>,
config: Arc<RwLock<ConfigMap>>,
) -> Self {
Self {
catalog_reader,
user_info_reader,
worker_node_manager,
meta_client,
auth_context,
config,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,28 @@
use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

use crate::catalog::system_catalog::SysCatalogReaderImpl;

/// The catalog `pg_settings` stores settings.
/// Ref: [`https://www.postgresql.org/docs/current/view-pg-settings.html`]
#[system_catalog(view, "pg_catalog.pg_settings")]
#[derive(Fields)]
struct PgSetting {
name: String,
setting: String,
short_desc: String,
}

#[system_catalog(table, "pg_catalog.pg_settings")]
fn read_pg_settings(reader: &SysCatalogReaderImpl) -> Vec<PgSetting> {
let config_reader = reader.config.read();
let all_variables = config_reader.show_all();

all_variables
.iter()
.map(|info| PgSetting {
name: info.name.clone(),
setting: info.setting.clone(),
short_desc: info.description.clone(),
})
.collect()
}
8 changes: 2 additions & 6 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId};
use crate::scheduler::task_context::FrontendBatchTaskContext;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
use crate::scheduler::{ReadSnapshot, SchedulerError, SchedulerResult};
use crate::session::{AuthContext, FrontendEnv, SessionImpl};
use crate::session::{FrontendEnv, SessionImpl};

pub type LocalQueryStream = ReceiverStream<Result<DataChunk, BoxedError>>;

Expand Down Expand Up @@ -94,10 +94,6 @@ impl LocalQueryExecution {
}
}

fn auth_context(&self) -> Arc<AuthContext> {
self.session.auth_context()
}

fn shutdown_rx(&self) -> ShutdownToken {
self.session.reset_cancel_query_flag()
}
Expand All @@ -106,7 +102,7 @@ impl LocalQueryExecution {
pub async fn run_inner(self) {
debug!(%self.query.query_id, self.sql, "Starting to run query");

let context = FrontendBatchTaskContext::new(self.front_env.clone(), self.auth_context());
let context = FrontendBatchTaskContext::new(self.session.clone());

let task_id = TaskId {
query_id: self.query.query_id.id.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ impl ExecutionContext {
}

pub fn to_batch_task_context(&self) -> FrontendBatchTaskContext {
FrontendBatchTaskContext::new(self.session.env().clone(), self.session.auth_context())
FrontendBatchTaskContext::new(self.session.clone())
}
}
28 changes: 14 additions & 14 deletions src/frontend/src/scheduler/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@ use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_rpc_client::ComputeClientPoolRef;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::session::{AuthContext, FrontendEnv};
use crate::session::SessionImpl;

/// Batch task execution context in frontend.
#[derive(Clone)]
pub struct FrontendBatchTaskContext {
env: FrontendEnv,
auth_context: Arc<AuthContext>,
session: Arc<SessionImpl>,
}

impl FrontendBatchTaskContext {
pub fn new(env: FrontendEnv, auth_context: Arc<AuthContext>) -> Self {
Self { env, auth_context }
pub fn new(session: Arc<SessionImpl>) -> Self {
Self { session }
}
}

Expand All @@ -47,16 +46,17 @@ impl BatchTaskContext for FrontendBatchTaskContext {

fn catalog_reader(&self) -> SysCatalogReaderRef {
Arc::new(SysCatalogReaderImpl::new(
self.env.catalog_reader().clone(),
self.env.user_info_reader().clone(),
self.env.worker_node_manager_ref(),
self.env.meta_client_ref(),
self.auth_context.clone(),
self.session.env().catalog_reader().clone(),
self.session.env().user_info_reader().clone(),
self.session.env().worker_node_manager_ref(),
self.session.env().meta_client_ref(),
self.session.auth_context(),
self.session.shared_config(),
))
}

fn is_local_addr(&self, peer_addr: &HostAddr) -> bool {
is_local_address(self.env.server_address(), peer_addr)
is_local_address(self.session.env().server_address(), peer_addr)
}

fn state_store(&self) -> risingwave_storage::store_impl::StateStoreImpl {
Expand All @@ -68,19 +68,19 @@ impl BatchTaskContext for FrontendBatchTaskContext {
}

fn client_pool(&self) -> ComputeClientPoolRef {
self.env.client_pool()
self.session.env().client_pool()
}

fn get_config(&self) -> &BatchConfig {
self.env.batch_config()
self.session.env().batch_config()
}

fn dml_manager(&self) -> risingwave_dml::dml_manager::DmlManagerRef {
unimplemented!("not supported in local mode")
}

fn source_metrics(&self) -> Arc<SourceMetrics> {
self.env.source_metrics()
self.session.env().source_metrics()
}

fn store_mem_usage(&self, _val: usize) {
Expand Down

0 comments on commit c82698a

Please sign in to comment.