diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 3933ddf6b9ac1..ee92ccb90a7fd 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -52,7 +52,7 @@ steps: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true - timeout_in_minutes: 15 + timeout_in_minutes: 20 retry: *auto-retry - label: "docslt" diff --git a/src/common/proc_macro/src/lib.rs b/src/common/proc_macro/src/lib.rs index cf6d3a202e0b3..19f23545e3681 100644 --- a/src/common/proc_macro/src/lib.rs +++ b/src/common/proc_macro/src/lib.rs @@ -24,6 +24,7 @@ use syn::parse_macro_input; mod config; mod estimate_size; +mod session_config; /// Sections in the configuration file can use `#[derive(OverrideConfig)]` to generate the /// implementation of overwriting configs from the file. @@ -247,3 +248,18 @@ pub fn derive_estimate_size(input: TokenStream) -> TokenStream { } } } + +/// To add a new parameter, you can add a field with `#[parameter]` in the struct +/// A default value is required by setting the `default` option. +/// The field name will be the parameter name. You can overwrite the parameter name by setting the `rename` option. +/// To check the input parameter, you can use `check_hook` option. +/// +/// `flags` options include +/// - `SETTER`: to manually write a `set_your_parameter_name` function, in which you should call `set_your_parameter_name_inner`. +/// - `REPORT`: to report the parameter through `ConfigReporter` +#[proc_macro_derive(SessionConfig, attributes(parameter))] +#[proc_macro_error] +pub fn session_config(input: TokenStream) -> TokenStream { + let input = parse_macro_input!(input); + session_config::derive_config(input).into() +} diff --git a/src/common/proc_macro/src/session_config.rs b/src/common/proc_macro/src/session_config.rs new file mode 100644 index 0000000000000..ea29e0a20fec8 --- /dev/null +++ b/src/common/proc_macro/src/session_config.rs @@ -0,0 +1,262 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bae::FromAttributes; +use proc_macro2::TokenStream; +use proc_macro_error::{abort, OptionExt, ResultExt}; +use quote::{format_ident, quote, quote_spanned}; +use syn::DeriveInput; + +#[derive(FromAttributes)] +struct Parameter { + pub rename: Option, + pub default: syn::Expr, + pub flags: Option, + pub check_hook: Option, +} + +pub(crate) fn derive_config(input: DeriveInput) -> TokenStream { + let syn::Data::Struct(syn::DataStruct { fields, .. }) = input.data else { + abort!(input, "Only struct is supported"); + }; + + let mut default_fields = vec![]; + let mut struct_impl_set = vec![]; + let mut struct_impl_get = vec![]; + let mut struct_impl_reset = vec![]; + let mut set_match_branches = vec![]; + let mut get_match_branches = vec![]; + let mut show_all_list = vec![]; + + for field in fields { + let field_ident = field.ident.expect_or_abort("Field need to be named"); + let ty = field.ty; + + let mut doc_list = vec![]; + for attr in &field.attrs { + if attr.path.is_ident("doc") { + let meta = attr.parse_meta().expect_or_abort("Failed to parse meta"); + if let syn::Meta::NameValue(val) = meta { + if let syn::Lit::Str(desc) = val.lit { + doc_list.push(desc.value().trim().to_string()); + } + } + } + } + + let description: TokenStream = format!("r#\"{}\"#", doc_list.join(" ")).parse().unwrap(); + + let attr = + Parameter::from_attributes(&field.attrs).expect_or_abort("Failed to parse attribute"); + let Parameter { + rename, + default, + flags, + check_hook: check_hook_name, + } = attr; + + let entry_name = if let Some(rename) = rename { + if !(rename.value().is_ascii() && rename.value().to_ascii_lowercase() == rename.value()) + { + abort!(rename, "Expect `rename` to be an ascii lower case string"); + } + quote! {#rename} + } else { + let ident = format_ident!("{}", field_ident.to_string().to_lowercase()); + quote! {stringify!(#ident)} + }; + + let flags = flags.map(|f| f.value()).unwrap_or_default(); + let flags: Vec<_> = flags.split('|').map(|str| str.trim()).collect(); + + default_fields.push(quote_spanned! { + field_ident.span()=> + #field_ident: #default.into(), + }); + + let set_func_name = format_ident!("set_{}_str", field_ident); + let set_t_func_name = format_ident!("set_{}", field_ident); + let set_t_inner_func_name = format_ident!("set_{}_inner", field_ident); + let set_t_func_doc: TokenStream = + format!("r#\"Set parameter {} by a typed value.\"#", entry_name) + .parse() + .unwrap(); + let set_func_doc: TokenStream = format!("r#\"Set parameter {} by a string.\"#", entry_name) + .parse() + .unwrap(); + + let gen_set_func_name = if flags.contains(&"SETTER") { + set_t_inner_func_name.clone() + } else { + set_t_func_name.clone() + }; + + let check_hook = if let Some(check_hook_name) = check_hook_name { + quote! { + #check_hook_name(&val).map_err(|_e| { + ErrorCode::InvalidConfigValue { + config_entry: #entry_name.to_string(), + config_value: val.to_string(), + } + })?; + } + } else { + quote! {} + }; + + let report_hook = if flags.contains(&"REPORT") { + quote! { + if self.#field_ident != val { + reporter.report_status(#entry_name, val.to_string()); + } + } + } else { + quote! {} + }; + + struct_impl_set.push(quote! { + #[doc = #set_func_doc] + pub fn #set_func_name( + &mut self, + val: &str, + reporter: &mut impl ConfigReporter + ) -> RwResult<()> { + let val_t: #ty = val.parse().map_err(|_e| { + ErrorCode::InvalidConfigValue { + config_entry: #entry_name.to_string(), + config_value: val.to_string(), + } + })?; + + self.#set_t_func_name(val_t, reporter)?; + Ok(()) + } + + #[doc = #set_t_func_doc] + pub fn #gen_set_func_name( + &mut self, + val: #ty, + reporter: &mut impl ConfigReporter + ) -> RwResult<()> { + #check_hook + #report_hook + + self.#field_ident = val; + Ok(()) + } + + }); + + let reset_func_name = format_ident!("reset_{}", field_ident); + struct_impl_reset.push(quote_spanned! { + field_ident.span()=> + + #[allow(clippy::useless_conversion)] + pub fn #reset_func_name(&mut self) { + self.#field_ident = #default.into(); + } + }); + + let get_func_name = format_ident!("{}_str", field_ident); + let get_t_func_name = format_ident!("{}", field_ident); + let get_func_doc: TokenStream = + format!("r#\"Get a value string of parameter {} \"#", entry_name) + .parse() + .unwrap(); + let get_t_func_doc: TokenStream = + format!("r#\"Get a typed value of parameter {} \"#", entry_name) + .parse() + .unwrap(); + + struct_impl_get.push(quote! { + #[doc = #get_func_doc] + pub fn #get_func_name(&self) -> String { + self.#get_t_func_name().to_string() + } + + #[doc = #get_t_func_doc] + pub fn #get_t_func_name(&self) -> #ty { + self.#field_ident.clone() + } + + }); + + get_match_branches.push(quote! { + #entry_name => Ok(self.#get_func_name()), + }); + + set_match_branches.push(quote! { + #entry_name => self.#set_func_name(&value, reporter), + }); + + if !flags.contains(&"NO_SHOW_ALL") { + show_all_list.push(quote! { + VariableInfo { + name: #entry_name.to_string(), + setting: self.#field_ident.to_string(), + description : #description.to_string(), + }, + + }); + } + } + + let struct_ident = input.ident; + quote! { + impl Default for #struct_ident { + #[allow(clippy::useless_conversion)] + fn default() -> Self { + Self { + #(#default_fields)* + } + } + } + + impl #struct_ident { + fn new() -> Self { + Default::default() + } + + + #(#struct_impl_get)* + + #(#struct_impl_set)* + + #(#struct_impl_reset)* + + /// Set a parameter given it's name and value string. + pub fn set(&mut self, key_name: &str, value: String, reporter: &mut impl ConfigReporter) -> RwResult<()> { + match key_name.to_ascii_lowercase().as_ref() { + #(#set_match_branches)* + _ => Err(ErrorCode::UnrecognizedConfigurationParameter(key_name.to_string()).into()), + } + } + + /// Get a parameter by it's name. + pub fn get(&self, key_name: &str) -> RwResult { + match key_name.to_ascii_lowercase().as_ref() { + #(#get_match_branches)* + _ => Err(ErrorCode::UnrecognizedConfigurationParameter(key_name.to_string()).into()), + } + } + + /// Show all parameters. + pub fn show_all(&self) -> Vec { + vec![ + #(#show_all_list)* + ] + } + } + } +} diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 7a7e73ad8dd1f..0600684b9ff90 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod non_zero64; mod over_window; mod query_mode; mod search_path; @@ -19,1075 +20,276 @@ pub mod sink_decouple; mod transaction_isolation_level; mod visibility_mode; -use std::num::NonZeroU64; -use std::ops::Deref; - use chrono_tz::Tz; -use educe::{self, Educe}; -use itertools::Itertools; pub use over_window::OverWindowCachePolicy; pub use query_mode::QueryMode; +use risingwave_common_proc_macro::SessionConfig; pub use search_path::{SearchPath, USER_NAME_WILD_CARD}; -use tracing::info; -use crate::error::{ErrorCode, RwError}; +use self::non_zero64::ConfigNonZeroU64; +use crate::error::{ErrorCode, Result as RwResult}; use crate::session_config::sink_decouple::SinkDecouple; use crate::session_config::transaction_isolation_level::IsolationLevel; pub use crate::session_config::visibility_mode::VisibilityMode; -use crate::util::epoch::Epoch; - -// This is a hack, &'static str is not allowed as a const generics argument. -// TODO: refine this using the adt_const_params feature. -const CONFIG_KEYS: [&str; 40] = [ - "RW_IMPLICIT_FLUSH", - "CREATE_COMPACTION_GROUP_FOR_MV", - "QUERY_MODE", - "EXTRA_FLOAT_DIGITS", - "APPLICATION_NAME", - "DATESTYLE", - "RW_BATCH_ENABLE_LOOKUP_JOIN", - "MAX_SPLIT_RANGE_GAP", - "SEARCH_PATH", - "TRANSACTION ISOLATION LEVEL", - "QUERY_EPOCH", - "RW_BATCH_ENABLE_SORT_AGG", - "VISIBILITY_MODE", - "TIMEZONE", - "STREAMING_PARALLELISM", - "RW_STREAMING_ENABLE_DELTA_JOIN", - "RW_ENABLE_TWO_PHASE_AGG", - "RW_FORCE_TWO_PHASE_AGG", - "RW_ENABLE_SHARE_PLAN", - "INTERVALSTYLE", - "BATCH_PARALLELISM", - "RW_STREAMING_ENABLE_BUSHY_JOIN", - "RW_ENABLE_JOIN_ORDERING", - "SERVER_VERSION", - "SERVER_VERSION_NUM", - "RW_FORCE_SPLIT_DISTINCT_AGG", - "CLIENT_MIN_MESSAGES", - "CLIENT_ENCODING", - "SINK_DECOUPLE", - "SYNCHRONIZE_SEQSCANS", - "STATEMENT_TIMEOUT", - "LOCK_TIMEOUT", - "ROW_SECURITY", - "STANDARD_CONFORMING_STRINGS", - "STREAMING_RATE_LIMIT", - "CDC_BACKFILL", - "RW_STREAMING_OVER_WINDOW_CACHE_POLICY", - "BACKGROUND_DDL", - "SERVER_ENCODING", - "STREAMING_ENABLE_ARRANGEMENT_BACKFILL", -]; - -// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] = -// "RW_IMPLICIT_FLUSH". -const IMPLICIT_FLUSH: usize = 0; -const CREATE_COMPACTION_GROUP_FOR_MV: usize = 1; -const QUERY_MODE: usize = 2; -const EXTRA_FLOAT_DIGITS: usize = 3; -const APPLICATION_NAME: usize = 4; -const DATE_STYLE: usize = 5; -const BATCH_ENABLE_LOOKUP_JOIN: usize = 6; -const MAX_SPLIT_RANGE_GAP: usize = 7; -const SEARCH_PATH: usize = 8; -const TRANSACTION_ISOLATION_LEVEL: usize = 9; -const QUERY_EPOCH: usize = 10; -const BATCH_ENABLE_SORT_AGG: usize = 11; -const VISIBILITY_MODE: usize = 12; -const TIMEZONE: usize = 13; -const STREAMING_PARALLELISM: usize = 14; -const STREAMING_ENABLE_DELTA_JOIN: usize = 15; -const ENABLE_TWO_PHASE_AGG: usize = 16; -const FORCE_TWO_PHASE_AGG: usize = 17; -const RW_ENABLE_SHARE_PLAN: usize = 18; -const INTERVAL_STYLE: usize = 19; -const BATCH_PARALLELISM: usize = 20; -const STREAMING_ENABLE_BUSHY_JOIN: usize = 21; -const RW_ENABLE_JOIN_ORDERING: usize = 22; -const SERVER_VERSION: usize = 23; -const SERVER_VERSION_NUM: usize = 24; -const FORCE_SPLIT_DISTINCT_AGG: usize = 25; -const CLIENT_MIN_MESSAGES: usize = 26; -const CLIENT_ENCODING: usize = 27; -const SINK_DECOUPLE: usize = 28; -const SYNCHRONIZE_SEQSCANS: usize = 29; -const STATEMENT_TIMEOUT: usize = 30; -const LOCK_TIMEOUT: usize = 31; -const ROW_SECURITY: usize = 32; -const STANDARD_CONFORMING_STRINGS: usize = 33; -const STREAMING_RATE_LIMIT: usize = 34; -const CDC_BACKFILL: usize = 35; -const STREAMING_OVER_WINDOW_CACHE_POLICY: usize = 36; -const BACKGROUND_DDL: usize = 37; -const SERVER_ENCODING: usize = 38; -const STREAMING_ENABLE_ARRANGEMENT_BACKFILL: usize = 39; - -trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> { - fn entry_name() -> &'static str; -} - -struct ConfigBool(bool); - -impl Default for ConfigBool { - fn default() -> Self { - ConfigBool(DEFAULT) - } -} - -impl ConfigEntry for ConfigBool { - fn entry_name() -> &'static str { - CONFIG_KEYS[NAME] - } -} - -impl TryFrom<&[&str]> for ConfigBool { - type Error = RwError; - - fn try_from(value: &[&str]) -> Result { - if value.len() != 1 { - return Err(ErrorCode::InternalError(format!( - "SET {} takes only one argument", - ::entry_name() - )) - .into()); - } - - let s = value[0]; - if s.eq_ignore_ascii_case("true") - || s.eq_ignore_ascii_case("on") - || s.eq_ignore_ascii_case("yes") - || s.eq_ignore_ascii_case("1") - { - Ok(ConfigBool(true)) - } else if s.eq_ignore_ascii_case("false") - || s.eq_ignore_ascii_case("off") - || s.eq_ignore_ascii_case("no") - || s.eq_ignore_ascii_case("0") - { - Ok(ConfigBool(false)) - } else { - Err(ErrorCode::InvalidConfigValue { - config_entry: Self::entry_name().to_string(), - config_value: s.to_string(), - } - .into()) - } - } -} - -impl Deref for ConfigBool { - type Target = bool; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -#[derive(Default, Clone, PartialEq, Eq)] -struct ConfigString(String); - -impl Deref for ConfigString { - type Target = String; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} -impl TryFrom<&[&str]> for ConfigString { - type Error = RwError; +pub const SESSION_CONFIG_LIST_SEP: &str = ", "; - fn try_from(value: &[&str]) -> Result { - if value.len() != 1 { - return Err(ErrorCode::InternalError(format!( - "SET {} takes only one argument", - Self::entry_name() - )) - .into()); - } - - Ok(Self(value[0].to_string())) - } -} - -impl ConfigEntry for ConfigString { - fn entry_name() -> &'static str { - CONFIG_KEYS[NAME] - } -} - -struct ConfigI32(i32); - -impl Default for ConfigI32 { - fn default() -> Self { - ConfigI32(DEFAULT) - } -} - -impl Deref for ConfigI32 { - type Target = i32; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl ConfigEntry for ConfigI32 { - fn entry_name() -> &'static str { - CONFIG_KEYS[NAME] - } -} - -impl TryFrom<&[&str]> for ConfigI32 { - type Error = RwError; - - fn try_from(value: &[&str]) -> Result { - if value.len() != 1 { - return Err(ErrorCode::InternalError(format!( - "SET {} takes only one argument", - Self::entry_name() - )) - .into()); - } - - let s = value[0]; - s.parse::().map(ConfigI32).map_err(|_e| { - ErrorCode::InvalidConfigValue { - config_entry: Self::entry_name().to_string(), - config_value: s.to_string(), - } - .into() - }) - } -} - -struct ConfigU64(u64); - -impl Default for ConfigU64 { - fn default() -> Self { - ConfigU64(DEFAULT) - } -} - -impl Deref for ConfigU64 { - type Target = u64; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl ConfigEntry for ConfigU64 { - fn entry_name() -> &'static str { - CONFIG_KEYS[NAME] - } -} - -impl TryFrom<&[&str]> for ConfigU64 { - type Error = RwError; - - fn try_from(value: &[&str]) -> Result { - if value.len() != 1 { - return Err(ErrorCode::InternalError(format!( - "SET {} takes only one argument", - Self::entry_name() - )) - .into()); - } - - let s = value[0]; - s.parse::().map(ConfigU64).map_err(|_e| { - ErrorCode::InvalidConfigValue { - config_entry: Self::entry_name().to_string(), - config_value: s.to_string(), - } - .into() - }) - } -} - -pub struct VariableInfo { - pub name: String, - pub setting: String, - pub description: String, -} - -type ImplicitFlush = ConfigBool; -type CreateCompactionGroupForMv = ConfigBool; -type ApplicationName = ConfigString; -type ExtraFloatDigit = ConfigI32; -// TODO: We should use more specified type here. -type DateStyle = ConfigString; -type BatchEnableLookupJoin = ConfigBool; -type BatchEnableSortAgg = ConfigBool; -type MaxSplitRangeGap = ConfigI32; -type QueryEpoch = ConfigU64; -type Timezone = ConfigString; -type StreamingParallelism = ConfigU64; -type StreamingEnableDeltaJoin = ConfigBool; -type StreamingEnableBushyJoin = ConfigBool; -type EnableTwoPhaseAgg = ConfigBool; -type ForceTwoPhaseAgg = ConfigBool; -type EnableSharePlan = ConfigBool; -type IntervalStyle = ConfigString; -type BatchParallelism = ConfigU64; -type EnableJoinOrdering = ConfigBool; -type ServerVersion = ConfigString; -type ServerVersionNum = ConfigI32; -type ForceSplitDistinctAgg = ConfigBool; -type ClientMinMessages = ConfigString; -type ClientEncoding = ConfigString; -type SynchronizeSeqscans = ConfigBool; -type StatementTimeout = ConfigI32; -type LockTimeout = ConfigI32; -type RowSecurity = ConfigBool; -type StandardConformingStrings = ConfigString; -type StreamingRateLimit = ConfigU64; -type CdcBackfill = ConfigBool; -type BackgroundDdl = ConfigBool; -type ServerEncoding = ConfigString; -type StreamingEnableArrangementBackfill = ConfigBool; - -/// Report status or notice to caller. -pub trait ConfigReporter { - fn report_status(&mut self, key: &str, new_val: String); -} - -// Report nothing. -impl ConfigReporter for () { - fn report_status(&mut self, _key: &str, _new_val: String) {} -} - -#[derive(Educe)] -#[educe(Default)] +/// This is the Session Config of RisingWave. +#[derive(SessionConfig)] pub struct ConfigMap { /// If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block /// until the entire dataflow is refreshed. In other words, every related table & MV will /// be able to see the write. - implicit_flush: ImplicitFlush, + #[parameter(default = false, rename = "rw_implicit_flush")] + implicit_flush: bool, /// If `CREATE_COMPACTION_GROUP_FOR_MV` is on, dedicated compaction groups will be created in /// MV creation. - create_compaction_group_for_mv: CreateCompactionGroupForMv, + #[parameter(default = false)] + create_compaction_group_for_mv: bool, /// A temporary config variable to force query running in either local or distributed mode. /// The default value is auto which means let the system decide to run batch queries in local /// or distributed mode automatically. + #[parameter(default = QueryMode::default())] query_mode: QueryMode, - /// see - extra_float_digit: ExtraFloatDigit, + /// Sets the number of digits displayed for floating-point values. + /// See + #[parameter(default = 1)] + extra_float_digits: i32, - /// see - application_name: ApplicationName, + /// Sets the application name to be reported in statistics and logs. + /// See + #[parameter(default = "", flags = "REPORT")] + application_name: String, + /// It is typically set by an application upon connection to the server. /// see - date_style: DateStyle, + #[parameter(default = "", rename = "datestyle")] + date_style: String, - /// To force the usage of lookup join instead of hash join in batch execution - batch_enable_lookup_join: BatchEnableLookupJoin, + /// Force the use of lookup join instead of hash join when possible for local batch execution. + #[parameter(default = true, rename = "rw_batch_enable_lookup_join")] + batch_enable_lookup_join: bool, - /// To open the usage of sortAgg instead of hash agg when order property is satisfied in batch + /// Enable usage of sortAgg instead of hash agg when order property is satisfied in batch /// execution - batch_enable_sort_agg: BatchEnableSortAgg, + #[parameter(default = true, rename = "rw_batch_enable_sort_agg")] + batch_enable_sort_agg: bool, - /// It's the max gap allowed to transform small range scan scan into multi point lookup. - max_split_range_gap: MaxSplitRangeGap, + /// The max gap allowed to transform small range scan scan into multi point lookup. + #[parameter(default = 8)] + max_split_range_gap: i32, - /// see + /// Sets the order in which schemas are searched when an object (table, data type, function, etc.) + /// is referenced by a simple name with no schema specified. + /// See + #[parameter(default = SearchPath::default())] search_path: SearchPath, /// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint. + #[parameter(default = VisibilityMode::default())] visibility_mode: VisibilityMode, - /// see + /// See + #[parameter(default = IsolationLevel::default())] transaction_isolation_level: IsolationLevel, - /// select as of specific epoch - query_epoch: QueryEpoch, + /// Select as of specific epoch. + /// Sets the historical epoch for querying data. If 0, querying latest data. + #[parameter(default = ConfigNonZeroU64::default())] + query_epoch: ConfigNonZeroU64, /// Session timezone. Defaults to UTC. - #[educe(Default(expression = "ConfigString::(String::from(\"UTC\"))"))] - timezone: Timezone, + #[parameter(default = "UTC", check_hook = check_timezone)] + timezone: String, /// If `STREAMING_PARALLELISM` is non-zero, CREATE MATERIALIZED VIEW/TABLE/INDEX will use it as /// streaming parallelism. - streaming_parallelism: StreamingParallelism, + #[parameter(default = ConfigNonZeroU64::default())] + streaming_parallelism: ConfigNonZeroU64, /// Enable delta join for streaming queries. Defaults to false. - streaming_enable_delta_join: StreamingEnableDeltaJoin, + #[parameter(default = false, rename = "rw_streaming_enable_delta_join")] + streaming_enable_delta_join: bool, /// Enable bushy join for streaming queries. Defaults to true. - streaming_enable_bushy_join: StreamingEnableBushyJoin, + #[parameter(default = true, rename = "rw_streaming_enable_bushy_join")] + streaming_enable_bushy_join: bool, /// Enable arrangement backfill for streaming queries. Defaults to false. - streaming_enable_arrangement_backfill: StreamingEnableArrangementBackfill, + #[parameter(default = false)] + streaming_enable_arrangement_backfill: bool, /// Enable join ordering for streaming and batch queries. Defaults to true. - enable_join_ordering: EnableJoinOrdering, + #[parameter(default = true, rename = "rw_enable_join_ordering")] + enable_join_ordering: bool, /// Enable two phase agg optimization. Defaults to true. /// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false. - enable_two_phase_agg: EnableTwoPhaseAgg, + #[parameter(default = true, flags = "SETTER", rename = "rw_enable_two_phase_agg")] + enable_two_phase_agg: bool, /// Force two phase agg optimization whenever there's a choice between /// optimizations. Defaults to false. /// Setting this to true will always set `ENABLE_TWO_PHASE_AGG` to false. - force_two_phase_agg: ForceTwoPhaseAgg, + #[parameter(default = false, flags = "SETTER", rename = "rw_force_two_phase_agg")] + force_two_phase_agg: bool, /// Enable sharing of common sub-plans. /// This means that DAG structured query plans can be constructed, + #[parameter(default = true, rename = "rw_enable_share_plan")] /// rather than only tree structured query plans. - enable_share_plan: EnableSharePlan, + enable_share_plan: bool, /// Enable split distinct agg - force_split_distinct_agg: ForceSplitDistinctAgg, + #[parameter(default = false, rename = "rw_force_split_distinct_agg")] + force_split_distinct_agg: bool, - /// see - interval_style: IntervalStyle, + /// See + #[parameter(default = "", rename = "intervalstyle")] + interval_style: String, - batch_parallelism: BatchParallelism, + /// If `BATCH_PARALLELISM` is non-zero, batch queries will use this parallelism. + #[parameter(default = ConfigNonZeroU64::default())] + batch_parallelism: ConfigNonZeroU64, /// The version of PostgreSQL that Risingwave claims to be. - #[educe(Default(expression = "ConfigString::(String::from(\"9.5.0\"))"))] - server_version: ServerVersion, - server_version_num: ServerVersionNum, + #[parameter(default = "9.5.0")] + server_version: String, + + /// The version of PostgreSQL that Risingwave claims to be. + #[parameter(default = 90500)] + server_version_num: i32, /// see - #[educe(Default( - expression = "ConfigString::(String::from(\"notice\"))" - ))] - client_min_messages: ClientMinMessages, + #[parameter(default = "notice")] + client_min_messages: String, /// see - #[educe(Default(expression = "ConfigString::(String::from(\"UTF8\"))"))] - client_encoding: ClientEncoding, + #[parameter(default = "UTF8", check_hook = check_client_encoding)] + client_encoding: String, /// Enable decoupling sink and internal streaming graph or not + #[parameter(default = SinkDecouple::default())] sink_decouple: SinkDecouple, /// See /// Unused in RisingWave, support for compatibility. - synchronize_seqscans: SynchronizeSeqscans, + #[parameter(default = false)] + synchronize_seqscans: bool, /// Abort any statement that takes more than the specified amount of time. If /// log_min_error_statement is set to ERROR or lower, the statement that timed out will also be /// logged. If this value is specified without units, it is taken as milliseconds. A value of /// zero (the default) disables the timeout. - statement_timeout: StatementTimeout, + #[parameter(default = 0)] + statement_timeout: i32, - /// see + /// See /// Unused in RisingWave, support for compatibility. - lock_timeout: LockTimeout, + #[parameter(default = 0)] + lock_timeout: i32, /// see . /// Unused in RisingWave, support for compatibility. - row_security: RowSecurity, + #[parameter(default = true)] + row_security: bool, /// see - #[educe(Default( - expression = "ConfigString::(String::from(\"on\"))" - ))] - standard_conforming_strings: StandardConformingStrings, + #[parameter(default = "on")] + standard_conforming_strings: String, - streaming_rate_limit: StreamingRateLimit, + /// Set streaming rate limit (rows per second) for each parallelism for mv backfilling + #[parameter(default = ConfigNonZeroU64::default())] + streaming_rate_limit: ConfigNonZeroU64, - cdc_backfill: CdcBackfill, + /// Enable backfill for CDC table to allow lock-free and incremental snapshot + #[parameter(default = false)] + cdc_backfill: bool, /// Cache policy for partition cache in streaming over window. /// Can be "full", "recent", "recent_first_n" or "recent_last_n". + #[parameter(default = OverWindowCachePolicy::default(), rename = "rw_streaming_over_window_cache_policy")] streaming_over_window_cache_policy: OverWindowCachePolicy, - background_ddl: BackgroundDdl, + /// Run DDL statements in background + #[parameter(default = false)] + background_ddl: bool, /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time. - #[educe(Default(expression = "ConfigString::(String::from(\"UTF8\"))"))] - server_encoding: ServerEncoding, + #[parameter(default = "UTF8")] + server_encoding: String, + + #[parameter(default = "hex", check_hook = check_bytea_output)] + bytea_output: String, } -impl ConfigMap { - pub fn set( - &mut self, - key: &str, - val: Vec, - mut reporter: impl ConfigReporter, - ) -> Result<(), RwError> { - info!(%key, ?val, "set config"); - let val = val.iter().map(AsRef::as_ref).collect_vec(); - if key.eq_ignore_ascii_case(ImplicitFlush::entry_name()) { - self.implicit_flush = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(CreateCompactionGroupForMv::entry_name()) { - self.create_compaction_group_for_mv = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(QueryMode::entry_name()) { - self.query_mode = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(ExtraFloatDigit::entry_name()) { - self.extra_float_digit = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(ApplicationName::entry_name()) { - let new_application_name = val.as_slice().try_into()?; - if self.application_name != new_application_name { - self.application_name = new_application_name.clone(); - reporter.report_status(ApplicationName::entry_name(), new_application_name.0); - } - } else if key.eq_ignore_ascii_case(DateStyle::entry_name()) { - self.date_style = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(BatchEnableLookupJoin::entry_name()) { - self.batch_enable_lookup_join = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(BatchEnableSortAgg::entry_name()) { - self.batch_enable_sort_agg = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(MaxSplitRangeGap::entry_name()) { - self.max_split_range_gap = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(SearchPath::entry_name()) { - self.search_path = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(VisibilityMode::entry_name()) { - self.visibility_mode = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) { - self.query_epoch = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(Timezone::entry_name()) { - let raw: Timezone = val.as_slice().try_into()?; - // Check if the provided string is a valid timezone. - Tz::from_str_insensitive(&raw.0).map_err(|_e| ErrorCode::InvalidConfigValue { - config_entry: Timezone::entry_name().to_string(), - config_value: raw.0.to_string(), - })?; - self.timezone = raw; - } else if key.eq_ignore_ascii_case(StreamingParallelism::entry_name()) { - self.streaming_parallelism = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(StreamingEnableDeltaJoin::entry_name()) { - self.streaming_enable_delta_join = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(StreamingEnableBushyJoin::entry_name()) { - self.streaming_enable_bushy_join = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(StreamingEnableArrangementBackfill::entry_name()) { - self.streaming_enable_arrangement_backfill = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(EnableJoinOrdering::entry_name()) { - self.enable_join_ordering = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(EnableTwoPhaseAgg::entry_name()) { - self.enable_two_phase_agg = val.as_slice().try_into()?; - if !*self.enable_two_phase_agg { - self.force_two_phase_agg = ConfigBool(false); - } - } else if key.eq_ignore_ascii_case(ForceTwoPhaseAgg::entry_name()) { - self.force_two_phase_agg = val.as_slice().try_into()?; - if *self.force_two_phase_agg { - self.enable_two_phase_agg = ConfigBool(true); - } - } else if key.eq_ignore_ascii_case(ForceSplitDistinctAgg::entry_name()) { - self.force_split_distinct_agg = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(EnableSharePlan::entry_name()) { - self.enable_share_plan = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(IntervalStyle::entry_name()) { - self.interval_style = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(BatchParallelism::entry_name()) { - self.batch_parallelism = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(ClientMinMessages::entry_name()) { - // TODO: validate input and fold to lowercase after #10697 refactor - self.client_min_messages = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(ClientEncoding::entry_name()) { - let enc: ClientEncoding = val.as_slice().try_into()?; - // https://github.com/postgres/postgres/blob/REL_15_3/src/common/encnames.c#L525 - let clean = enc - .as_str() - .replace(|c: char| !c.is_ascii_alphanumeric(), ""); - if !clean.eq_ignore_ascii_case("UTF8") { - return Err(ErrorCode::InvalidConfigValue { - config_entry: ClientEncoding::entry_name().into(), - config_value: enc.0, - } - .into()); - } - // No actual assignment because we only support UTF8. - } else if key.eq_ignore_ascii_case("bytea_output") { - // TODO: We only support hex now. - if !val.first().is_some_and(|val| *val == "hex") { - return Err(ErrorCode::InvalidConfigValue { - config_entry: "bytea_output".into(), - config_value: val.first().map(ToString::to_string).unwrap_or_default(), - } - .into()); - } - } else if key.eq_ignore_ascii_case(SinkDecouple::entry_name()) { - self.sink_decouple = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(SynchronizeSeqscans::entry_name()) { - self.synchronize_seqscans = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(StatementTimeout::entry_name()) { - self.statement_timeout = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(LockTimeout::entry_name()) { - self.lock_timeout = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(RowSecurity::entry_name()) { - self.row_security = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(StandardConformingStrings::entry_name()) { - self.standard_conforming_strings = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(StreamingRateLimit::entry_name()) { - self.streaming_rate_limit = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(CdcBackfill::entry_name()) { - self.cdc_backfill = val.as_slice().try_into()? - } else if key.eq_ignore_ascii_case(OverWindowCachePolicy::entry_name()) { - self.streaming_over_window_cache_policy = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) { - self.background_ddl = val.as_slice().try_into()?; - } else if key.eq_ignore_ascii_case(ServerEncoding::entry_name()) { - let enc: ServerEncoding = val.as_slice().try_into()?; - // https://github.com/postgres/postgres/blob/REL_15_3/src/common/encnames.c#L525 - let clean = enc - .as_str() - .replace(|c: char| !c.is_ascii_alphanumeric(), ""); - if !clean.eq_ignore_ascii_case("UTF8") { - return Err(ErrorCode::InvalidConfigValue { - config_entry: ServerEncoding::entry_name().into(), - config_value: enc.0, - } - .into()); - } - // No actual assignment because we only support UTF8. - } else { - return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()); - } +fn check_timezone(val: &str) -> Result<(), String> { + // Check if the provided string is a valid timezone. + Tz::from_str_insensitive(val).map_err(|_e| "Not a valid timezone")?; + Ok(()) +} +fn check_client_encoding(val: &str) -> Result<(), String> { + // https://github.com/postgres/postgres/blob/REL_15_3/src/common/encnames.c#L525 + let clean = val.replace(|c: char| !c.is_ascii_alphanumeric(), ""); + if !clean.eq_ignore_ascii_case("UTF8") { + Err("Only support 'UTF8' for CLIENT_ENCODING".to_string()) + } else { Ok(()) } +} - pub fn get(&self, key: &str) -> Result { - if key.eq_ignore_ascii_case(ImplicitFlush::entry_name()) { - Ok(self.implicit_flush.to_string()) - } else if key.eq_ignore_ascii_case(CreateCompactionGroupForMv::entry_name()) { - Ok(self.create_compaction_group_for_mv.to_string()) - } else if key.eq_ignore_ascii_case(QueryMode::entry_name()) { - Ok(self.query_mode.to_string()) - } else if key.eq_ignore_ascii_case(ExtraFloatDigit::entry_name()) { - Ok(self.extra_float_digit.to_string()) - } else if key.eq_ignore_ascii_case(ApplicationName::entry_name()) { - Ok(self.application_name.to_string()) - } else if key.eq_ignore_ascii_case(DateStyle::entry_name()) { - Ok(self.date_style.to_string()) - } else if key.eq_ignore_ascii_case(BatchEnableLookupJoin::entry_name()) { - Ok(self.batch_enable_lookup_join.to_string()) - } else if key.eq_ignore_ascii_case(BatchEnableSortAgg::entry_name()) { - Ok(self.batch_enable_sort_agg.to_string()) - } else if key.eq_ignore_ascii_case(MaxSplitRangeGap::entry_name()) { - Ok(self.max_split_range_gap.to_string()) - } else if key.eq_ignore_ascii_case(SearchPath::entry_name()) { - Ok(self.search_path.to_string()) - } else if key.eq_ignore_ascii_case(VisibilityMode::entry_name()) { - Ok(self.visibility_mode.to_string()) - } else if key.eq_ignore_ascii_case(IsolationLevel::entry_name()) { - Ok(self.transaction_isolation_level.to_string()) - } else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) { - Ok(self.query_epoch.to_string()) - } else if key.eq_ignore_ascii_case(Timezone::entry_name()) { - Ok(self.timezone.to_string()) - } else if key.eq_ignore_ascii_case(StreamingParallelism::entry_name()) { - Ok(self.streaming_parallelism.to_string()) - } else if key.eq_ignore_ascii_case(StreamingEnableDeltaJoin::entry_name()) { - Ok(self.streaming_enable_delta_join.to_string()) - } else if key.eq_ignore_ascii_case(StreamingEnableBushyJoin::entry_name()) { - Ok(self.streaming_enable_bushy_join.to_string()) - } else if key.eq_ignore_ascii_case(StreamingEnableArrangementBackfill::entry_name()) { - Ok(self.streaming_enable_arrangement_backfill.to_string()) - } else if key.eq_ignore_ascii_case(EnableJoinOrdering::entry_name()) { - Ok(self.enable_join_ordering.to_string()) - } else if key.eq_ignore_ascii_case(EnableTwoPhaseAgg::entry_name()) { - Ok(self.enable_two_phase_agg.to_string()) - } else if key.eq_ignore_ascii_case(ForceTwoPhaseAgg::entry_name()) { - Ok(self.force_two_phase_agg.to_string()) - } else if key.eq_ignore_ascii_case(EnableSharePlan::entry_name()) { - Ok(self.enable_share_plan.to_string()) - } else if key.eq_ignore_ascii_case(IntervalStyle::entry_name()) { - Ok(self.interval_style.to_string()) - } else if key.eq_ignore_ascii_case(BatchParallelism::entry_name()) { - Ok(self.batch_parallelism.to_string()) - } else if key.eq_ignore_ascii_case(ServerVersion::entry_name()) { - Ok(self.server_version.to_string()) - } else if key.eq_ignore_ascii_case(ServerVersionNum::entry_name()) { - Ok(self.server_version_num.to_string()) - } else if key.eq_ignore_ascii_case(ApplicationName::entry_name()) { - Ok(self.application_name.to_string()) - } else if key.eq_ignore_ascii_case(ForceSplitDistinctAgg::entry_name()) { - Ok(self.force_split_distinct_agg.to_string()) - } else if key.eq_ignore_ascii_case(ClientMinMessages::entry_name()) { - Ok(self.client_min_messages.to_string()) - } else if key.eq_ignore_ascii_case(ClientEncoding::entry_name()) { - Ok(self.client_encoding.to_string()) - } else if key.eq_ignore_ascii_case("bytea_output") { - // TODO: We only support hex now. - Ok("hex".to_string()) - } else if key.eq_ignore_ascii_case(SinkDecouple::entry_name()) { - Ok(self.sink_decouple.to_string()) - } else if key.eq_ignore_ascii_case(SynchronizeSeqscans::entry_name()) { - Ok(self.synchronize_seqscans.to_string()) - } else if key.eq_ignore_ascii_case(StatementTimeout::entry_name()) { - Ok(self.statement_timeout.to_string()) - } else if key.eq_ignore_ascii_case(LockTimeout::entry_name()) { - Ok(self.lock_timeout.to_string()) - } else if key.eq_ignore_ascii_case(RowSecurity::entry_name()) { - Ok(self.row_security.to_string()) - } else if key.eq_ignore_ascii_case(StandardConformingStrings::entry_name()) { - Ok(self.standard_conforming_strings.to_string()) - } else if key.eq_ignore_ascii_case(StreamingRateLimit::entry_name()) { - Ok(self.streaming_rate_limit.to_string()) - } else if key.eq_ignore_ascii_case(CdcBackfill::entry_name()) { - Ok(self.cdc_backfill.to_string()) - } else if key.eq_ignore_ascii_case(OverWindowCachePolicy::entry_name()) { - Ok(self.streaming_over_window_cache_policy.to_string()) - } else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) { - Ok(self.background_ddl.to_string()) - } else if key.eq_ignore_ascii_case(ServerEncoding::entry_name()) { - Ok(self.server_encoding.to_string()) - } else { - Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()) - } - } - - pub fn get_all(&self) -> Vec { - vec![ - VariableInfo{ - name : ImplicitFlush::entry_name().to_lowercase(), - setting : self.implicit_flush.to_string(), - description : String::from("If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block until the entire dataflow is refreshed.") - }, - VariableInfo{ - name : CreateCompactionGroupForMv::entry_name().to_lowercase(), - setting : self.create_compaction_group_for_mv.to_string(), - description : String::from("If `CREATE_COMPACTION_GROUP_FOR_MV` is on, dedicated compaction groups will be created in MV creation.") - }, - VariableInfo{ - name : QueryMode::entry_name().to_lowercase(), - setting : self.query_mode.to_string(), - description : String::from("A temporary config variable to force query running in either local or distributed mode. If the value is auto, the system will decide for you automatically.") - }, - VariableInfo{ - name : ExtraFloatDigit::entry_name().to_lowercase(), - setting : self.extra_float_digit.to_string(), - description : String::from("Sets the number of digits displayed for floating-point values.") - }, - VariableInfo{ - name : ApplicationName::entry_name().to_lowercase(), - setting : self.application_name.to_string(), - description : String::from("Sets the application name to be reported in statistics and logs.") - }, - VariableInfo{ - name : DateStyle::entry_name().to_lowercase(), - setting : self.date_style.to_string(), - description : String::from("It is typically set by an application upon connection to the server.") - }, - VariableInfo{ - name : BatchEnableLookupJoin::entry_name().to_lowercase(), - setting : self.batch_enable_lookup_join.to_string(), - description : String::from("To enable the usage of lookup join instead of hash join when possible for local batch execution.") - }, - VariableInfo{ - name : BatchEnableSortAgg::entry_name().to_lowercase(), - setting : self.batch_enable_sort_agg.to_string(), - description : String::from("To enable the usage of sort agg instead of hash join when order property is satisfied for batch execution.") - }, - VariableInfo{ - name : MaxSplitRangeGap::entry_name().to_lowercase(), - setting : self.max_split_range_gap.to_string(), - description : String::from("It's the max gap allowed to transform small range scan scan into multi point lookup.") - }, - VariableInfo { - name: SearchPath::entry_name().to_lowercase(), - setting : self.search_path.to_string(), - description : String::from("Sets the order in which schemas are searched when an object (table, data type, function, etc.) is referenced by a simple name with no schema specified") - }, - VariableInfo{ - name : VisibilityMode::entry_name().to_lowercase(), - setting : self.visibility_mode.to_string(), - description : String::from("If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.") - }, - VariableInfo{ - name: QueryEpoch::entry_name().to_lowercase(), - setting : self.query_epoch.to_string(), - description : String::from("Sets the historical epoch for querying data. If 0, querying latest data.") - }, - VariableInfo{ - name : Timezone::entry_name().to_lowercase(), - setting : self.timezone.to_string(), - description : String::from("The session timezone. This will affect how timestamps are cast into timestamps with timezone.") - }, - VariableInfo{ - name : StreamingParallelism::entry_name().to_lowercase(), - setting : self.streaming_parallelism.to_string(), - description: String::from("Sets the parallelism for streaming. If 0, use default value.") - }, - VariableInfo{ - name : StreamingEnableDeltaJoin::entry_name().to_lowercase(), - setting : self.streaming_enable_delta_join.to_string(), - description: String::from("Enable delta join in streaming queries.") - }, - VariableInfo{ - name : StreamingEnableBushyJoin::entry_name().to_lowercase(), - setting : self.streaming_enable_bushy_join.to_string(), - description: String::from("Enable bushy join in streaming queries.") - }, - VariableInfo{ - name : StreamingEnableArrangementBackfill::entry_name().to_lowercase(), - setting : self.streaming_enable_arrangement_backfill.to_string(), - description: String::from("Enable arrangement backfill in streaming queries.") - }, - VariableInfo{ - name : EnableJoinOrdering::entry_name().to_lowercase(), - setting : self.enable_join_ordering.to_string(), - description: String::from("Enable join ordering for streaming and batch queries.") - }, - VariableInfo{ - name : EnableTwoPhaseAgg::entry_name().to_lowercase(), - setting : self.enable_two_phase_agg.to_string(), - description: String::from("Enable two phase aggregation.") - }, - VariableInfo{ - name : ForceTwoPhaseAgg::entry_name().to_lowercase(), - setting : self.force_two_phase_agg.to_string(), - description: String::from("Force two phase aggregation.") - }, - VariableInfo{ - name : EnableSharePlan::entry_name().to_lowercase(), - setting : self.enable_share_plan.to_string(), - description: String::from("Enable sharing of common sub-plans. This means that DAG structured query plans can be constructed, rather than only tree structured query plans.") - }, - VariableInfo{ - name : IntervalStyle::entry_name().to_lowercase(), - setting : self.interval_style.to_string(), - description : String::from("It is typically set by an application upon connection to the server.") - }, - VariableInfo{ - name : BatchParallelism::entry_name().to_lowercase(), - setting : self.batch_parallelism.to_string(), - description: String::from("Sets the parallelism for batch. If 0, use default value.") - }, - VariableInfo{ - name : ServerVersion::entry_name().to_lowercase(), - setting : self.server_version.to_string(), - description : String::from("The version of the server.") - }, - VariableInfo{ - name : ServerVersionNum::entry_name().to_lowercase(), - setting : self.server_version_num.to_string(), - description : String::from("The version number of the server.") - }, - VariableInfo{ - name : ForceSplitDistinctAgg::entry_name().to_lowercase(), - setting : self.force_split_distinct_agg.to_string(), - description : String::from("Enable split the distinct aggregation.") - }, - VariableInfo{ - name : ClientMinMessages::entry_name().to_lowercase(), - setting : self.client_min_messages.to_string(), - description : String::from("Sets the message levels that are sent to the client.") - }, - VariableInfo{ - name : ClientEncoding::entry_name().to_lowercase(), - setting : self.client_encoding.to_string(), - description : String::from("Sets the client's character set encoding.") - }, - VariableInfo{ - name: "bytea_output".to_string(), - setting: "hex".to_string(), - description: "Sets the output format for bytea.".to_string(), - }, - VariableInfo{ - name: SinkDecouple::entry_name().to_lowercase(), - setting: self.sink_decouple.to_string(), - description: String::from("Enable decoupling sink and internal streaming graph or not") - }, - VariableInfo{ - name: SynchronizeSeqscans::entry_name().to_lowercase(), - setting: self.synchronize_seqscans.to_string(), - description: String::from("Unused in RisingWave") - }, - VariableInfo{ - name: StatementTimeout::entry_name().to_lowercase(), - setting: self.statement_timeout.to_string(), - description: String::from("Sets the maximum allowed duration of any statement, currently just a mock variable and not adopted in RW"), - }, - VariableInfo{ - name: LockTimeout::entry_name().to_lowercase(), - setting: self.lock_timeout.to_string(), - description: String::from("Unused in RisingWave"), - }, - VariableInfo{ - name: RowSecurity::entry_name().to_lowercase(), - setting: self.row_security.to_string(), - description: String::from("Unused in RisingWave"), - }, - VariableInfo{ - name: StandardConformingStrings::entry_name().to_lowercase(), - setting: self.standard_conforming_strings.to_string(), - description: String::from("Unused in RisingWave"), - }, - VariableInfo{ - name: StreamingRateLimit::entry_name().to_lowercase(), - setting: self.streaming_rate_limit.to_string(), - description: String::from("Set streaming rate limit (rows per second) for each parallelism for mv backfilling"), - }, - VariableInfo{ - name: CdcBackfill::entry_name().to_lowercase(), - setting: self.cdc_backfill.to_string(), - description: String::from("Enable backfill for CDC table to allow lock-free and incremental snapshot"), - }, - VariableInfo{ - name: OverWindowCachePolicy::entry_name().to_lowercase(), - setting: self.streaming_over_window_cache_policy.to_string(), - description: String::from(r#"Cache policy for partition cache in streaming over window. Can be "full", "recent", "recent_first_n" or "recent_last_n"."#), - }, - VariableInfo{ - name: BackgroundDdl::entry_name().to_lowercase(), - setting: self.background_ddl.to_string(), - description: String::from("Run DDL statements in background"), - }, - VariableInfo{ - name : ServerEncoding::entry_name().to_lowercase(), - setting : self.server_encoding.to_string(), - description : String::from("Sets the server character set encoding.") - }, - ] - } - - pub fn get_implicit_flush(&self) -> bool { - *self.implicit_flush - } - - pub fn get_create_compaction_group_for_mv(&self) -> bool { - *self.create_compaction_group_for_mv - } - - pub fn get_query_mode(&self) -> QueryMode { - self.query_mode - } - - pub fn get_extra_float_digit(&self) -> i32 { - *self.extra_float_digit - } - - pub fn get_application_name(&self) -> &str { - &self.application_name - } - - pub fn get_date_style(&self) -> &str { - &self.date_style - } - - pub fn get_batch_enable_lookup_join(&self) -> bool { - *self.batch_enable_lookup_join - } - - pub fn get_batch_enable_sort_agg(&self) -> bool { - *self.batch_enable_sort_agg +fn check_bytea_output(val: &str) -> Result<(), String> { + if val == "hex" { + Ok(()) + } else { + Err("Only support 'hex' for BYTEA_OUTPUT".to_string()) } +} - pub fn get_max_split_range_gap(&self) -> u64 { - if *self.max_split_range_gap < 0 { - 0 +impl ConfigMap { + pub fn set_force_two_phase_agg( + &mut self, + val: bool, + reporter: &mut impl ConfigReporter, + ) -> RwResult<()> { + self.set_force_two_phase_agg_inner(val, reporter)?; + if self.force_two_phase_agg { + self.set_enable_two_phase_agg(true, reporter) } else { - *self.max_split_range_gap as u64 + Ok(()) } } - pub fn get_search_path(&self) -> SearchPath { - self.search_path.clone() - } - - pub fn get_visible_mode(&self) -> VisibilityMode { - self.visibility_mode - } - - pub fn get_query_epoch(&self) -> Option { - if self.query_epoch.0 != 0 { - return Some((self.query_epoch.0).into()); - } - None - } - - pub fn get_timezone(&self) -> &str { - &self.timezone - } - - pub fn get_streaming_parallelism(&self) -> Option { - if self.streaming_parallelism.0 != 0 { - return Some(self.streaming_parallelism.0); - } - None - } - - pub fn get_streaming_enable_delta_join(&self) -> bool { - *self.streaming_enable_delta_join - } - - pub fn get_streaming_enable_bushy_join(&self) -> bool { - *self.streaming_enable_bushy_join - } - - pub fn get_streaming_enable_arrangement_backfill(&self) -> bool { - *self.streaming_enable_arrangement_backfill - } - - pub fn get_enable_join_ordering(&self) -> bool { - *self.enable_join_ordering - } - - pub fn get_enable_two_phase_agg(&self) -> bool { - *self.enable_two_phase_agg - } - - pub fn get_force_split_distinct_agg(&self) -> bool { - *self.force_split_distinct_agg - } - - pub fn get_force_two_phase_agg(&self) -> bool { - *self.force_two_phase_agg - } - - pub fn get_enable_share_plan(&self) -> bool { - *self.enable_share_plan - } - - pub fn get_interval_style(&self) -> &str { - &self.interval_style - } - - pub fn get_batch_parallelism(&self) -> Option { - if self.batch_parallelism.0 != 0 { - return Some(NonZeroU64::new(self.batch_parallelism.0).unwrap()); - } - None - } - - pub fn get_client_min_message(&self) -> &str { - &self.client_min_messages - } - - pub fn get_client_encoding(&self) -> &str { - &self.client_encoding - } - - pub fn get_sink_decouple(&self) -> SinkDecouple { - self.sink_decouple - } - - pub fn get_standard_conforming_strings(&self) -> &str { - &self.standard_conforming_strings - } - - pub fn get_streaming_rate_limit(&self) -> Option { - if self.streaming_rate_limit.0 != 0 { - return Some(self.streaming_rate_limit.0 as u32); + pub fn set_enable_two_phase_agg( + &mut self, + val: bool, + reporter: &mut impl ConfigReporter, + ) -> RwResult<()> { + self.set_enable_two_phase_agg_inner(val, reporter)?; + if !self.force_two_phase_agg { + self.set_force_two_phase_agg(false, reporter) + } else { + Ok(()) } - None - } - - pub fn get_cdc_backfill(&self) -> bool { - self.cdc_backfill.0 } +} - pub fn get_streaming_over_window_cache_policy(&self) -> OverWindowCachePolicy { - self.streaming_over_window_cache_policy - } +pub struct VariableInfo { + pub name: String, + pub setting: String, + pub description: String, +} - pub fn get_background_ddl(&self) -> bool { - self.background_ddl.0 - } +/// Report status or notice to caller. +pub trait ConfigReporter { + fn report_status(&mut self, key: &str, new_val: String); +} - pub fn get_server_encoding(&self) -> &str { - &self.server_encoding - } +// Report nothing. +impl ConfigReporter for () { + fn report_status(&mut self, _key: &str, _new_val: String) {} } diff --git a/src/common/src/session_config/non_zero64.rs b/src/common/src/session_config/non_zero64.rs new file mode 100644 index 0000000000000..2bec6eb5f462f --- /dev/null +++ b/src/common/src/session_config/non_zero64.rs @@ -0,0 +1,53 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::num::NonZeroU64; +use std::str::FromStr; + +/// When set this config as `0`, the value is `None`, otherwise the value is +/// `Some(val)` +#[derive(Copy, Default, Debug, Clone, PartialEq, Eq)] +pub struct ConfigNonZeroU64(pub Option); + +impl FromStr for ConfigNonZeroU64 { + type Err = (); + + fn from_str(s: &str) -> Result { + let parsed = s.parse::().map_err(|_| ())?; + if parsed == 0 { + Ok(Self(None)) + } else { + Ok(Self(NonZeroU64::new(parsed))) + } + } +} + +impl std::fmt::Display for ConfigNonZeroU64 { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let ConfigNonZeroU64(Some(inner)) = self { + write!(f, "{}", inner) + } else { + write!(f, "0") + } + } +} + +impl ConfigNonZeroU64 { + pub fn map(self, f: F) -> Option + where + F: FnOnce(NonZeroU64) -> U, + { + self.0.map(f) + } +} diff --git a/src/common/src/session_config/over_window.rs b/src/common/src/session_config/over_window.rs index 832d7a1e6c288..d3c4e23433f3a 100644 --- a/src/common/src/session_config/over_window.rs +++ b/src/common/src/session_config/over_window.rs @@ -15,14 +15,10 @@ use std::str::FromStr; use enum_as_inner::EnumAsInner; -use parse_display::{Display, FromStr}; +use parse_display::Display; use risingwave_pb::stream_plan::PbOverWindowCachePolicy; -use super::{ConfigEntry, CONFIG_KEYS, STREAMING_OVER_WINDOW_CACHE_POLICY}; -use crate::error::ErrorCode::{self, InvalidConfigValue}; -use crate::error::RwError; - -#[derive(Copy, Default, Debug, Clone, PartialEq, Eq, FromStr, Display, EnumAsInner)] +#[derive(Copy, Default, Debug, Clone, PartialEq, Eq, Display, EnumAsInner)] #[display(style = "snake_case")] pub enum OverWindowCachePolicy { /// Cache all entries. @@ -36,32 +32,18 @@ pub enum OverWindowCachePolicy { RecentLastN, } -impl TryFrom<&[&str]> for OverWindowCachePolicy { - type Error = RwError; +impl FromStr for OverWindowCachePolicy { + type Err = (); - fn try_from(value: &[&str]) -> Result { - if value.len() != 1 { - return Err(ErrorCode::InternalError(format!( - "SET {} takes only one argument", - Self::entry_name() - )) - .into()); + fn from_str(s: &str) -> Result { + let s = s.to_ascii_lowercase().replace('-', "_"); + match s.as_str() { + "full" => Ok(Self::Full), + "recent" => Ok(Self::Recent), + "recent_first_n" => Ok(Self::RecentFirstN), + "recent_last_n" => Ok(Self::RecentLastN), + _ => Err(()), } - - let s = value[0].to_ascii_lowercase().replace('-', "_"); - OverWindowCachePolicy::from_str(&s).map_err(|_| { - InvalidConfigValue { - config_entry: Self::entry_name().to_string(), - config_value: s.to_string(), - } - .into() - }) - } -} - -impl ConfigEntry for OverWindowCachePolicy { - fn entry_name() -> &'static str { - CONFIG_KEYS[STREAMING_OVER_WINDOW_CACHE_POLICY] } } @@ -93,33 +75,33 @@ mod tests { #[test] fn parse_over_window_cache_policy() { assert_eq!( - OverWindowCachePolicy::try_from(["full"].as_slice()).unwrap(), + OverWindowCachePolicy::from_str("full").unwrap(), OverWindowCachePolicy::Full ); assert_eq!( - OverWindowCachePolicy::try_from(["recent"].as_slice()).unwrap(), + OverWindowCachePolicy::from_str("recent").unwrap(), OverWindowCachePolicy::Recent ); assert_eq!( - OverWindowCachePolicy::try_from(["RECENT"].as_slice()).unwrap(), + OverWindowCachePolicy::from_str("RECENT").unwrap(), OverWindowCachePolicy::Recent ); assert_eq!( - OverWindowCachePolicy::try_from(["recent_first_n"].as_slice()).unwrap(), + OverWindowCachePolicy::from_str("recent_first_n").unwrap(), OverWindowCachePolicy::RecentFirstN ); assert_eq!( - OverWindowCachePolicy::try_from(["recent_last_n"].as_slice()).unwrap(), + OverWindowCachePolicy::from_str("recent_last_n").unwrap(), OverWindowCachePolicy::RecentLastN ); assert_eq!( - OverWindowCachePolicy::try_from(["recent-last-n"].as_slice()).unwrap(), + OverWindowCachePolicy::from_str("recent-last-n").unwrap(), OverWindowCachePolicy::RecentLastN ); assert_eq!( - OverWindowCachePolicy::try_from(["recent_last_N"].as_slice()).unwrap(), + OverWindowCachePolicy::from_str("recent_last_N").unwrap(), OverWindowCachePolicy::RecentLastN ); - assert!(OverWindowCachePolicy::try_from(["foo"].as_slice()).is_err()); + assert!(OverWindowCachePolicy::from_str("foo").is_err()); } } diff --git a/src/common/src/session_config/query_mode.rs b/src/common/src/session_config/query_mode.rs index 71b1f2e66e9dd..6879a73bfed9b 100644 --- a/src/common/src/session_config/query_mode.rs +++ b/src/common/src/session_config/query_mode.rs @@ -15,10 +15,7 @@ //! Contains configurations that could be accessed via "set" command. use std::fmt::Formatter; - -use super::{ConfigEntry, CONFIG_KEYS, QUERY_MODE}; -use crate::error::ErrorCode::{self, InvalidConfigValue}; -use crate::error::RwError; +use std::str::FromStr; #[derive(Copy, Default, Debug, Clone, PartialEq, Eq)] pub enum QueryMode { @@ -30,25 +27,10 @@ pub enum QueryMode { Distributed, } -impl ConfigEntry for QueryMode { - fn entry_name() -> &'static str { - CONFIG_KEYS[QUERY_MODE] - } -} - -impl TryFrom<&[&str]> for QueryMode { - type Error = RwError; +impl FromStr for QueryMode { + type Err = (); - fn try_from(value: &[&str]) -> Result { - if value.len() != 1 { - return Err(ErrorCode::InternalError(format!( - "SET {} takes only one argument", - Self::entry_name() - )) - .into()); - } - - let s = value[0]; + fn from_str(s: &str) -> Result { if s.eq_ignore_ascii_case("local") { Ok(Self::Local) } else if s.eq_ignore_ascii_case("distributed") { @@ -56,10 +38,7 @@ impl TryFrom<&[&str]> for QueryMode { } else if s.eq_ignore_ascii_case("auto") { Ok(Self::Auto) } else { - Err(InvalidConfigValue { - config_entry: Self::entry_name().to_string(), - config_value: s.to_string(), - })? + Err(()) } } } @@ -80,30 +59,18 @@ mod tests { #[test] fn parse_query_mode() { + assert_eq!(QueryMode::from_str("auto").unwrap(), QueryMode::Auto); + assert_eq!(QueryMode::from_str("Auto").unwrap(), QueryMode::Auto); + assert_eq!(QueryMode::from_str("local").unwrap(), QueryMode::Local); + assert_eq!(QueryMode::from_str("Local").unwrap(), QueryMode::Local); assert_eq!( - QueryMode::try_from(["auto"].as_slice()).unwrap(), - QueryMode::Auto - ); - assert_eq!( - QueryMode::try_from(["Auto"].as_slice()).unwrap(), - QueryMode::Auto - ); - assert_eq!( - QueryMode::try_from(["local"].as_slice()).unwrap(), - QueryMode::Local - ); - assert_eq!( - QueryMode::try_from(["Local"].as_slice()).unwrap(), - QueryMode::Local - ); - assert_eq!( - QueryMode::try_from(["distributed"].as_slice()).unwrap(), + QueryMode::from_str("distributed").unwrap(), QueryMode::Distributed ); assert_eq!( - QueryMode::try_from(["diStributed"].as_slice()).unwrap(), + QueryMode::from_str("diStributed").unwrap(), QueryMode::Distributed ); - assert!(QueryMode::try_from(["ab"].as_slice()).is_err()); + assert!(QueryMode::from_str("ab").is_err()); } } diff --git a/src/common/src/session_config/search_path.rs b/src/common/src/session_config/search_path.rs index fe2dde70a823b..c176503824d43 100644 --- a/src/common/src/session_config/search_path.rs +++ b/src/common/src/session_config/search_path.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::{ConfigEntry, CONFIG_KEYS, SEARCH_PATH}; +use std::str::FromStr; + +use super::SESSION_CONFIG_LIST_SEP; use crate::catalog::{DEFAULT_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME}; use crate::error::RwError; @@ -50,30 +52,25 @@ impl SearchPath { impl Default for SearchPath { fn default() -> Self { [USER_NAME_WILD_CARD, DEFAULT_SCHEMA_NAME] - .as_slice() - .try_into() + .join(SESSION_CONFIG_LIST_SEP) + .parse() .unwrap() } } -impl ConfigEntry for SearchPath { - fn entry_name() -> &'static str { - CONFIG_KEYS[SEARCH_PATH] - } -} - -impl TryFrom<&[&str]> for SearchPath { - type Error = RwError; +impl FromStr for SearchPath { + type Err = RwError; - fn try_from(value: &[&str]) -> Result { + fn from_str(s: &str) -> Result { + let paths = s.split(SESSION_CONFIG_LIST_SEP).map(|path| path.trim()); let mut real_path = vec![]; - for p in value { + for p in paths { let p = p.trim(); if !p.is_empty() { real_path.push(p.to_string()); } } - let string = real_path.join(", "); + let string = real_path.join(SESSION_CONFIG_LIST_SEP); let mut path = real_path.clone(); let rw_catalog = RW_CATALOG_SCHEMA_NAME.to_string(); diff --git a/src/common/src/session_config/sink_decouple.rs b/src/common/src/session_config/sink_decouple.rs index 7a3c1925ebd8e..0129654b138da 100644 --- a/src/common/src/session_config/sink_decouple.rs +++ b/src/common/src/session_config/sink_decouple.rs @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::error::ErrorCode::InvalidConfigValue; -use crate::error::{ErrorCode, RwError}; -use crate::session_config::{ConfigEntry, CONFIG_KEYS, SINK_DECOUPLE}; +use std::str::FromStr; #[derive(Copy, Default, Debug, Clone, PartialEq, Eq)] pub enum SinkDecouple { @@ -27,38 +25,19 @@ pub enum SinkDecouple { Disable, } -impl<'a> TryFrom<&'a [&'a str]> for SinkDecouple { - type Error = RwError; +impl FromStr for SinkDecouple { + type Err = (); - fn try_from(value: &'a [&'a str]) -> Result { - if value.len() != 1 { - return Err(ErrorCode::InternalError(format!( - "SET {} takes only one argument", - Self::entry_name() - )) - .into()); - } - - let s = value[0]; + fn from_str(s: &str) -> Result { match s.to_ascii_lowercase().as_str() { "true" | "enable" => Ok(Self::Enable), "false" | "disable" => Ok(Self::Disable), "default" => Ok(Self::Default), - _ => Err(InvalidConfigValue { - config_entry: Self::entry_name().to_string(), - config_value: s.to_string(), - } - .into()), + _ => Err(()), } } } -impl ConfigEntry for SinkDecouple { - fn entry_name() -> &'static str { - CONFIG_KEYS[SINK_DECOUPLE] - } -} - impl ToString for SinkDecouple { fn to_string(&self) -> String { match self { diff --git a/src/common/src/session_config/transaction_isolation_level.rs b/src/common/src/session_config/transaction_isolation_level.rs index 7f50f74866c91..af558e8525dd9 100644 --- a/src/common/src/session_config/transaction_isolation_level.rs +++ b/src/common/src/session_config/transaction_isolation_level.rs @@ -13,9 +13,7 @@ // limitations under the License. use std::fmt::Formatter; - -use crate::error::{ErrorCode, RwError}; -use crate::session_config::{ConfigEntry, CONFIG_KEYS, TRANSACTION_ISOLATION_LEVEL}; +use std::str::FromStr; #[derive(Copy, Default, Debug, Clone, PartialEq, Eq)] // Some variants are never constructed so allow dead code here. @@ -28,20 +26,11 @@ pub enum IsolationLevel { Serializable, } -impl ConfigEntry for IsolationLevel { - fn entry_name() -> &'static str { - CONFIG_KEYS[TRANSACTION_ISOLATION_LEVEL] - } -} - -impl TryFrom<&[&str]> for IsolationLevel { - type Error = RwError; +impl FromStr for IsolationLevel { + type Err = (); - fn try_from(_value: &[&str]) -> Result { - Err( - ErrorCode::InternalError("Support set transaction isolation level first".to_string()) - .into(), - ) + fn from_str(_s: &str) -> Result { + Err(()) } } diff --git a/src/common/src/session_config/visibility_mode.rs b/src/common/src/session_config/visibility_mode.rs index 78e559ae369b9..b8aa4f6faef3a 100644 --- a/src/common/src/session_config/visibility_mode.rs +++ b/src/common/src/session_config/visibility_mode.rs @@ -15,11 +15,7 @@ //! Contains configurations that could be accessed via "set" command. use std::fmt::Formatter; - -use super::{ConfigEntry, CONFIG_KEYS}; -use crate::error::ErrorCode::{self, InvalidConfigValue}; -use crate::error::RwError; -use crate::session_config::VISIBILITY_MODE; +use std::str::FromStr; #[derive(Copy, Default, Debug, Clone, PartialEq, Eq)] pub enum VisibilityMode { @@ -32,25 +28,10 @@ pub enum VisibilityMode { Checkpoint, } -impl ConfigEntry for VisibilityMode { - fn entry_name() -> &'static str { - CONFIG_KEYS[VISIBILITY_MODE] - } -} - -impl TryFrom<&[&str]> for VisibilityMode { - type Error = RwError; - - fn try_from(value: &[&str]) -> Result { - if value.len() != 1 { - return Err(ErrorCode::InternalError(format!( - "SET {} takes only one argument", - Self::entry_name() - )) - .into()); - } +impl FromStr for VisibilityMode { + type Err = (); - let s = value[0]; + fn from_str(s: &str) -> Result { if s.eq_ignore_ascii_case("all") { Ok(Self::All) } else if s.eq_ignore_ascii_case("checkpoint") { @@ -58,10 +39,7 @@ impl TryFrom<&[&str]> for VisibilityMode { } else if s.eq_ignore_ascii_case("default") { Ok(Self::Default) } else { - Err(InvalidConfigValue { - config_entry: Self::entry_name().to_string(), - config_value: s.to_string(), - })? + Err(()) } } } @@ -83,25 +61,25 @@ mod tests { #[test] fn parse_query_mode() { assert_eq!( - VisibilityMode::try_from(["all"].as_slice()).unwrap(), + VisibilityMode::from_str("all").unwrap(), VisibilityMode::All ); assert_eq!( - VisibilityMode::try_from(["All"].as_slice()).unwrap(), + VisibilityMode::from_str("All").unwrap(), VisibilityMode::All ); assert_eq!( - VisibilityMode::try_from(["checkpoint"].as_slice()).unwrap(), + VisibilityMode::from_str("checkpoint").unwrap(), VisibilityMode::Checkpoint ); assert_eq!( - VisibilityMode::try_from(["checkPoint"].as_slice()).unwrap(), + VisibilityMode::from_str("checkPoint").unwrap(), VisibilityMode::Checkpoint ); assert_eq!( - VisibilityMode::try_from(["default"].as_slice()).unwrap(), + VisibilityMode::from_str("default").unwrap(), VisibilityMode::Default ); - assert!(VisibilityMode::try_from(["ab"].as_slice()).is_err()); + assert!(VisibilityMode::from_str("ab").is_err()); } } diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index a89f67a73ec4b..ea065af3746f6 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -265,7 +265,7 @@ impl TestCase { if let Some(ref config_map) = self.with_config_map() { for (key, val) in config_map { - session.set_config(key, vec![val.to_owned()]).unwrap(); + session.set_config(key, val.to_owned()).unwrap(); } } diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 96435d4e9f656..6d30d85e8a9ce 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -1181,7 +1181,7 @@ impl Binder { let mut session_config = binder.session_config.write(); // TODO: report session config changes if necessary. - session_config.set(setting_name, vec![new_value.to_string()], ())?; + session_config.set(setting_name, new_value.to_string(), &mut())?; Ok(ExprImpl::literal_varchar(new_value.to_string())) }))), diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 00c82a8981da4..a7468768ba10e 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -210,7 +210,7 @@ impl Binder { next_values_id: 0, next_share_id: 0, session_config: session.shared_config(), - search_path: session.config().get_search_path(), + search_path: session.config().search_path(), bind_for, shared_views: HashMap::new(), included_relations: HashSet::new(), diff --git a/src/frontend/src/handler/alter_owner.rs b/src/frontend/src/handler/alter_owner.rs index 349dda495e907..6737c46af8441 100644 --- a/src/frontend/src/handler/alter_owner.rs +++ b/src/frontend/src/handler/alter_owner.rs @@ -61,7 +61,7 @@ pub async fn handle_alter_owner( let db_name = session.database(); let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/alter_relation_rename.rs b/src/frontend/src/handler/alter_relation_rename.rs index dc7fca6e010a1..26578f707424d 100644 --- a/src/frontend/src/handler/alter_relation_rename.rs +++ b/src/frontend/src/handler/alter_relation_rename.rs @@ -32,7 +32,7 @@ pub async fn handle_rename_table( let (schema_name, real_table_name) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?; let new_table_name = Binder::resolve_table_name(new_table_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); @@ -76,7 +76,7 @@ pub async fn handle_rename_index( let (schema_name, real_index_name) = Binder::resolve_schema_qualified_name(db_name, index_name.clone())?; let new_index_name = Binder::resolve_index_name(new_index_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); @@ -107,7 +107,7 @@ pub async fn handle_rename_view( let (schema_name, real_view_name) = Binder::resolve_schema_qualified_name(db_name, view_name.clone())?; let new_view_name = Binder::resolve_view_name(new_view_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); @@ -137,7 +137,7 @@ pub async fn handle_rename_sink( let (schema_name, real_sink_name) = Binder::resolve_schema_qualified_name(db_name, sink_name.clone())?; let new_sink_name = Binder::resolve_sink_name(new_sink_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); @@ -167,7 +167,7 @@ pub async fn handle_rename_source( let (schema_name, real_source_name) = Binder::resolve_schema_qualified_name(db_name, source_name.clone())?; let new_source_name = Binder::resolve_source_name(new_source_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/alter_set_schema.rs b/src/frontend/src/handler/alter_set_schema.rs index 4a6b8f6543af9..27b0ea772aab5 100644 --- a/src/frontend/src/handler/alter_set_schema.rs +++ b/src/frontend/src/handler/alter_set_schema.rs @@ -38,7 +38,7 @@ pub async fn handle_alter_set_schema( let db_name = session.database(); let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 385a1010b50c9..95e9c3bf7e597 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -41,7 +41,7 @@ pub async fn handle_alter_source_column( let db_name = session.database(); let (schema_name, real_source_name) = Binder::resolve_schema_qualified_name(db_name, source_name.clone())?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index fa64794710025..2d51bcc48f8cc 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -48,7 +48,7 @@ pub async fn handle_alter_table_column( let db_name = session.database(); let (schema_name, real_table_name) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); @@ -225,8 +225,10 @@ pub async fn handle_alter_table_column( let graph = StreamFragmentGraph { parallelism: session .config() - .get_streaming_parallelism() - .map(|parallelism| Parallelism { parallelism }), + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }), ..build_graph(plan) }; diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index 2cc7bb3b49df8..006230552ea02 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -53,7 +53,7 @@ pub(crate) fn gen_create_index_plan( ) -> Result<(PlanRef, PbTable, PbIndex)> { let db_name = session.database(); let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); @@ -429,10 +429,13 @@ pub async fn handle_create_index( distributed_by, )?; let mut graph = build_graph(plan); - graph.parallelism = session - .config() - .get_streaming_parallelism() - .map(|parallelism| Parallelism { parallelism }); + graph.parallelism = + session + .config() + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }); (graph, index_table, index) }; diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index f90c67397a9d5..f42bb1d9bb284 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -118,7 +118,7 @@ pub fn gen_create_mv_plan( let materialize = plan_root.gen_materialize_plan(table_name, definition, emit_on_window_close)?; let mut table = materialize.table().to_prost(schema_id, database_id); - if session.config().get_create_compaction_group_for_mv() { + if session.config().create_compaction_group_for_mv() { table.properties.insert( String::from("independent_compaction_group"), String::from("1"), @@ -185,10 +185,13 @@ It only indicates the physical clustering of the data, which may improve the per gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?; let context = plan.plan_base().ctx().clone(); let mut graph = build_graph(plan); - graph.parallelism = session - .config() - .get_streaming_parallelism() - .map(|parallelism| Parallelism { parallelism }); + graph.parallelism = + session + .config() + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }); // Set the timezone for the stream environment let env = graph.env.as_mut().unwrap(); env.timezone = context.get_session_timezone(); @@ -208,7 +211,7 @@ It only indicates the physical clustering of the data, which may improve the per table.name.clone(), )); - let run_in_background = session.config().get_background_ddl(); + let run_in_background = session.config().background_ddl(); let create_type = if run_in_background { CreateType::Background } else { diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 5d520e4eb79c5..8a30a3d681cc1 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -208,10 +208,13 @@ pub async fn handle_create_sink( ); } let mut graph = build_graph(plan); - graph.parallelism = session - .config() - .get_streaming_parallelism() - .map(|parallelism| Parallelism { parallelism }); + graph.parallelism = + session + .config() + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }); (sink, graph) }; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 2cbc46b941fdd..cf2ed281805be 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1111,7 +1111,7 @@ pub async fn handle_create_source( // gated the feature with a session variable let create_cdc_source_job = - is_cdc_connector(&with_properties) && session.config().get_cdc_backfill(); + is_cdc_connector(&with_properties) && session.config().cdc_backfill(); let (columns_from_resolve_source, source_info) = bind_columns_from_source( &session, @@ -1225,10 +1225,13 @@ pub async fn handle_create_source( // generate stream graph for cdc source job let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?; let mut graph = build_graph(stream_plan); - graph.parallelism = session - .config() - .get_streaming_parallelism() - .map(|parallelism| Parallelism { parallelism }); + graph.parallelism = + session + .config() + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }); graph }; catalog_writer @@ -1308,7 +1311,7 @@ pub mod tests { let frontend = LocalFrontend::new(Default::default()).await; let session = frontend.session_ref(); session - .set_config("cdc_backfill", vec!["true".to_string()]) + .set_config("cdc_backfill", "true".to_string()) .unwrap(); frontend diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index fe6f8a865c19b..6eceaa5825266 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -540,7 +540,7 @@ pub(crate) async fn gen_create_table_plan_with_source( } let cdc_table_type = CdcTableType::from_properties(&properties); - if cdc_table_type.can_backfill() && context.session_ctx().config().get_cdc_backfill() { + if cdc_table_type.can_backfill() && context.session_ctx().config().cdc_backfill() { // debezium connector will only consume changelogs from latest offset on this mode properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into()); @@ -1050,10 +1050,13 @@ pub async fn handle_create_table( .await?; let mut graph = build_graph(plan); - graph.parallelism = session - .config() - .get_streaming_parallelism() - .map(|parallelism| Parallelism { parallelism }); + graph.parallelism = + session + .config() + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }); (graph, source, table, job_type) }; diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 9f1fe54a4d151..60be14084f8b4 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -108,10 +108,13 @@ pub async fn handle_create_as( Some(col_id_gen.into_version()), )?; let mut graph = build_graph(plan); - graph.parallelism = session - .config() - .get_streaming_parallelism() - .map(|parallelism| Parallelism { parallelism }); + graph.parallelism = + session + .config() + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }); (graph, source, table) }; diff --git a/src/frontend/src/handler/drop_connection.rs b/src/frontend/src/handler/drop_connection.rs index fe6bdc6649f13..d05caabd5d322 100644 --- a/src/frontend/src/handler/drop_connection.rs +++ b/src/frontend/src/handler/drop_connection.rs @@ -30,7 +30,7 @@ pub async fn handle_drop_connection( let db_name = session.database(); let (schema_name, connection_name) = Binder::resolve_schema_qualified_name(db_name, connection_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/drop_function.rs b/src/frontend/src/handler/drop_function.rs index c3bda771ec7cd..56ac4d787a716 100644 --- a/src/frontend/src/handler/drop_function.rs +++ b/src/frontend/src/handler/drop_function.rs @@ -39,7 +39,7 @@ pub async fn handle_drop_function( let db_name = session.database(); let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, func_desc.name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/drop_index.rs b/src/frontend/src/handler/drop_index.rs index 20987fd26950d..6208f17f5c752 100644 --- a/src/frontend/src/handler/drop_index.rs +++ b/src/frontend/src/handler/drop_index.rs @@ -33,7 +33,7 @@ pub async fn handle_drop_index( let session = handler_args.session; let db_name = session.database(); let (schema_name, index_name) = Binder::resolve_schema_qualified_name(db_name, index_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/drop_mv.rs b/src/frontend/src/handler/drop_mv.rs index 50b462c612e2b..6481c83c7cde4 100644 --- a/src/frontend/src/handler/drop_mv.rs +++ b/src/frontend/src/handler/drop_mv.rs @@ -32,7 +32,7 @@ pub async fn handle_drop_mv( let session = handler_args.session; let db_name = session.database(); let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index 6b1a864e03964..5554f69ac9fbb 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -30,7 +30,7 @@ pub async fn handle_drop_sink( let session = handler_args.session; let db_name = session.database(); let (schema_name, sink_name) = Binder::resolve_schema_qualified_name(db_name, sink_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/drop_source.rs b/src/frontend/src/handler/drop_source.rs index 2347879c27c0a..94fb1dbd42ef9 100644 --- a/src/frontend/src/handler/drop_source.rs +++ b/src/frontend/src/handler/drop_source.rs @@ -30,7 +30,7 @@ pub async fn handle_drop_source( let session = handler_args.session; let db_name = session.database(); let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/drop_table.rs b/src/frontend/src/handler/drop_table.rs index 9c49e372a9cc7..6bc4a850de9a2 100644 --- a/src/frontend/src/handler/drop_table.rs +++ b/src/frontend/src/handler/drop_table.rs @@ -31,7 +31,7 @@ pub async fn handle_drop_table( let session = handler_args.session; let db_name = session.database(); let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/drop_view.rs b/src/frontend/src/handler/drop_view.rs index 4682978b72235..90c0778435be3 100644 --- a/src/frontend/src/handler/drop_view.rs +++ b/src/frontend/src/handler/drop_view.rs @@ -30,7 +30,7 @@ pub async fn handle_drop_view( let session = handler_args.session; let db_name = session.database(); let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index e1774d8d5e820..78441c774f383 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -173,7 +173,7 @@ async fn do_handle_explain( batch_plan_fragmenter = Some(BatchPlanFragmenter::new( worker_node_manager_reader, session.env().catalog_reader().clone(), - session.config().get_batch_parallelism(), + session.config().batch_parallelism().0, plan.clone(), )?); } diff --git a/src/frontend/src/handler/handle_privilege.rs b/src/frontend/src/handler/handle_privilege.rs index 07b87fa3bc3d2..71a94aa65b179 100644 --- a/src/frontend/src/handler/handle_privilege.rs +++ b/src/frontend/src/handler/handle_privilege.rs @@ -62,7 +62,7 @@ fn make_prost_privilege( } GrantObjects::Mviews(tables) => { let db_name = session.database(); - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; for name in tables { @@ -85,7 +85,7 @@ fn make_prost_privilege( } GrantObjects::Tables(tables) => { let db_name = session.database(); - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; for name in tables { @@ -108,7 +108,7 @@ fn make_prost_privilege( } GrantObjects::Sources(sources) => { let db_name = session.database(); - let search_path = session.config().get_search_path(); + let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; for name in sources { diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index e11562bccb467..4e533cd98cf0d 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -188,7 +188,7 @@ fn gen_batch_query_plan( } (true, false) => QueryMode::Distributed, (false, true) => QueryMode::Local, - (false, false) => match session.config().get_query_mode() { + (false, false) => match session.config().query_mode() { QueryMode::Auto => determine_query_mode(batch_plan.clone()), QueryMode::Local => QueryMode::Local, QueryMode::Distributed => QueryMode::Distributed, @@ -285,7 +285,7 @@ fn gen_batch_plan_fragmenter( let plan_fragmenter = BatchPlanFragmenter::new( worker_node_manager_reader, session.env().catalog_reader().clone(), - session.config().get_batch_parallelism(), + session.config().batch_parallelism().0, plan, )?; @@ -401,7 +401,7 @@ async fn execute( // it sent. This is achieved by the `callback` in `PgResponse`. let callback = async move { // Implicitly flush the writes. - if session.config().get_implicit_flush() && stmt_type.is_dml() { + if session.config().implicit_flush() && stmt_type.is_dml() { do_flush(&session).await?; } diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 1be30c2d470eb..04f4be646e13b 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -73,7 +73,7 @@ where session: Arc, ) -> Self { let session_data = StaticSessionData { - timezone: session.config().get_timezone().into(), + timezone: session.config().timezone(), }; Self { chunk_stream, diff --git a/src/frontend/src/handler/variable.rs b/src/frontend/src/handler/variable.rs index 7d108ae35128e..112107e725318 100644 --- a/src/frontend/src/handler/variable.rs +++ b/src/frontend/src/handler/variable.rs @@ -17,7 +17,7 @@ use pgwire::pg_protocol::ParameterStatus; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Row; use risingwave_common::error::{ErrorCode, Result}; -use risingwave_common::session_config::ConfigReporter; +use risingwave_common::session_config::{ConfigReporter, SESSION_CONFIG_LIST_SEP}; use risingwave_common::system_param::is_mutable; use risingwave_common::types::{DataType, ScalarRefImpl}; use risingwave_sqlparser::ast::{Ident, SetTimeZoneValue, SetVariableValue, Value}; @@ -26,20 +26,25 @@ use super::RwPgResponse; use crate::handler::HandlerArgs; use crate::utils::infer_stmt_row_desc::infer_show_variable; +fn set_var_to_guc_str(value: &SetVariableValue) -> String { + match value { + SetVariableValue::Literal(Value::DoubleQuotedString(s)) + | SetVariableValue::Literal(Value::SingleQuotedString(s)) => s.clone(), + SetVariableValue::List(list) => list + .iter() + .map(set_var_to_guc_str) + .join(SESSION_CONFIG_LIST_SEP), + _ => value.to_string(), + } +} + pub fn handle_set( handler_args: HandlerArgs, name: Ident, - value: Vec, + value: SetVariableValue, ) -> Result { // Strip double and single quotes - let string_vals = value - .into_iter() - .map(|v| match v { - SetVariableValue::Literal(Value::DoubleQuotedString(s)) - | SetVariableValue::Literal(Value::SingleQuotedString(s)) => s, - _ => v.to_string(), - }) - .collect_vec(); + let string_val = set_var_to_guc_str(&value); let mut status = ParameterStatus::default(); @@ -60,7 +65,7 @@ pub fn handle_set( // We remark that the name of session parameter is always case-insensitive. handler_args.session.set_config_report( &name.real_value().to_lowercase(), - string_vals, + string_val, Reporter { status: &mut status, }, @@ -85,7 +90,7 @@ pub(super) fn handle_set_time_zone( _ => Ok(value.to_string()), }?; - handler_args.session.set_config("timezone", vec![tz_info])?; + handler_args.session.set_config("timezone", tz_info)?; Ok(PgResponse::empty_result(StatementType::SET_VARIABLE)) } @@ -114,7 +119,7 @@ pub(super) async fn handle_show( fn handle_show_all(handler_args: HandlerArgs) -> Result> { let config_reader = handler_args.session.config(); - let all_variables = config_reader.get_all(); + let all_variables = config_reader.show_all(); let rows = all_variables .iter() diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index cfae8a04504a2..df5ee593af8c8 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -516,7 +516,7 @@ impl LogicalOptimizer { // If share plan is disable, we need to remove all the share operator generated by the // binder, e.g. CTE and View. However, we still need to share source to ensure self // source join can return correct result. - let enable_share_plan = ctx.session_ctx().config().get_enable_share_plan(); + let enable_share_plan = ctx.session_ctx().config().enable_share_plan(); if enable_share_plan { // Common sub-plan sharing. plan = plan.common_subplan_sharing(); @@ -547,7 +547,7 @@ impl LogicalOptimizer { // Predicate Push-down plan = Self::predicate_pushdown(plan, explain_trace, &ctx); - if plan.ctx().session_ctx().config().get_enable_join_ordering() { + if plan.ctx().session_ctx().config().enable_join_ordering() { // Merge inner joins and intermediate filters into multijoin // This rule assumes that filters have already been pushed down near to // their relevant joins. @@ -558,7 +558,7 @@ impl LogicalOptimizer { .ctx() .session_ctx() .config() - .get_streaming_enable_bushy_join() + .streaming_enable_bushy_join() { plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_ORDERING); } else { @@ -583,7 +583,7 @@ impl LogicalOptimizer { plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW); plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW); - let force_split_distinct_agg = ctx.session_ctx().config().get_force_split_distinct_agg(); + let force_split_distinct_agg = ctx.session_ctx().config().force_split_distinct_agg(); // TODO: better naming of the OptimizationStage // Convert distinct aggregates. plan = if force_split_distinct_agg { @@ -643,7 +643,7 @@ impl LogicalOptimizer { let mut last_total_rule_applied_before_predicate_pushdown = ctx.total_rule_applied(); plan = Self::predicate_pushdown(plan, explain_trace, &ctx); - if plan.ctx().session_ctx().config().get_enable_join_ordering() { + if plan.ctx().session_ctx().config().enable_join_ordering() { // Merge inner joins and intermediate filters into multijoin // This rule assumes that filters have already been pushed down near to // their relevant joins. diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 70d5ae1769c83..3c1b987893380 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -308,7 +308,7 @@ impl PlanRoot { ApplyOrder::BottomUp, )); - if ctx.session_ctx().config().get_streaming_enable_delta_join() { + if ctx.session_ctx().config().streaming_enable_delta_join() { // TODO: make it a logical optimization. // Rewrite joins with index to delta join plan = plan.optimize_by_rules(&OptimizationStage::new( diff --git a/src/frontend/src/optimizer/optimizer_context.rs b/src/frontend/src/optimizer/optimizer_context.rs index 032ace1e04a63..0f0bee2d7a8be 100644 --- a/src/frontend/src/optimizer/optimizer_context.rs +++ b/src/frontend/src/optimizer/optimizer_context.rs @@ -69,7 +69,7 @@ impl OptimizerContext { /// Create a new [`OptimizerContext`] from the given [`HandlerArgs`] and [`ExplainOptions`]. pub fn new(mut handler_args: HandlerArgs, explain_options: ExplainOptions) -> Self { let session_timezone = RefCell::new(SessionTimezone::new( - handler_args.session.config().get_timezone().to_owned(), + handler_args.session.config().timezone().to_owned(), )); let overwrite_options = OverwriteOptions::new(&mut handler_args); Self { diff --git a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs index bf20fa9a76cc5..0ca1822397c52 100644 --- a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs @@ -47,7 +47,7 @@ impl BatchSimpleAgg { .ctx() .session_ctx() .config() - .get_enable_two_phase_agg() + .enable_two_phase_agg() } pub(crate) fn can_two_phase_agg(&self) -> bool { diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 46d9a56456e37..8cfaaff070554 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -93,7 +93,7 @@ impl Agg { } fn two_phase_agg_forced(&self) -> bool { - self.ctx().session_ctx().config().get_force_two_phase_agg() + self.ctx().session_ctx().config().force_two_phase_agg() } pub fn two_phase_agg_enabled(&self) -> bool { @@ -144,11 +144,7 @@ impl Agg { } pub fn new(agg_calls: Vec, group_key: IndexSet, input: PlanRef) -> Self { - let enable_two_phase = input - .ctx() - .session_ctx() - .config() - .get_enable_two_phase_agg(); + let enable_two_phase = input.ctx().session_ctx().config().enable_two_phase_agg(); Self { agg_calls, group_key, diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 244c0459cfdcd..3d648d00e5c13 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -1101,11 +1101,7 @@ impl ToBatch for LogicalAgg { }; let agg_plan = if self.group_key().is_empty() { BatchSimpleAgg::new(new_logical).into() - } else if self - .ctx() - .session_ctx() - .config() - .get_batch_enable_sort_agg() + } else if self.ctx().session_ctx().config().batch_enable_sort_agg() && new_logical.input_provides_order_on_group_keys() { BatchSortAgg::new(new_logical).into() diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index e8a0e24a84cd2..dade239c7758f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1321,7 +1321,7 @@ impl ToBatch for LogicalJoin { )) .into()); } - if config.get_batch_enable_lookup_join() { + if config.batch_enable_lookup_join() { if let Some(lookup_join) = self.to_batch_lookup_join_with_index_selection( predicate.clone(), logical_join.clone(), diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 37b16c954e434..b6bb98d342ffc 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -441,11 +441,7 @@ impl LogicalScan { } else { let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges( self.core.table_desc.clone(), - self.base - .ctx() - .session_ctx() - .config() - .get_max_split_range_gap(), + self.base.ctx().session_ctx().config().max_split_range_gap() as u64, )?; let mut scan = self.clone(); scan.core.predicate = predicate; // We want to keep `required_col_idx` unchanged, so do not call `clone_with_predicate`. diff --git a/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs b/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs index 9fc356709cc14..f2d8be84e275c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs @@ -328,11 +328,7 @@ impl LogicalSysScan { } else { let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges( self.core.table_desc.clone(), - self.base - .ctx() - .session_ctx() - .config() - .get_max_split_range_gap(), + self.base.ctx().session_ctx().config().max_split_range_gap() as u64, )?; let mut scan = self.clone(); scan.core.predicate = predicate; // We want to keep `required_col_idx` unchanged, so do not call `clone_with_predicate`. diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs index 77296a89f4add..4a97c24263821 100644 --- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs @@ -128,7 +128,7 @@ impl StreamNode for StreamOverWindow { .ctx() .session_ctx() .config() - .get_streaming_over_window_cache_policy(); + .streaming_over_window_cache_policy(); PbNodeBody::OverWindow(OverWindowNode { calls, diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index b54019fc7dbb6..ea2144cdfebb1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -430,7 +430,7 @@ impl StreamNode for StreamSink { PbNodeBody::Sink(SinkNode { sink_desc: Some(self.sink_desc.to_proto()), table: Some(table.to_internal_table_prost()), - log_store_type: match self.base.ctx().session_ctx().config().get_sink_decouple() { + log_store_type: match self.base.ctx().session_ctx().config().sink_decouple() { SinkDecouple::Default => { let enable_sink_decouple = match_sink_name_str!( diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index f2ba8ff976150..3823733b74824 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -147,8 +147,8 @@ impl LocalQueryExecution { let catalog_reader = self.front_env.catalog_reader().clone(); let auth_context = self.session.auth_context().clone(); let db_name = self.session.database().to_string(); - let search_path = self.session.config().get_search_path().clone(); - let time_zone = self.session.config().get_timezone().to_owned(); + let search_path = self.session.config().search_path(); + let time_zone = self.session.config().timezone(); let exec = async move { let mut data_stream = self.run().map(|r| r.map_err(|e| Box::new(e) as BoxedError)); diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 6ed7f5c864e30..dc22d5635e58e 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -607,17 +607,17 @@ impl SessionImpl { self.config_map.read() } - pub fn set_config(&self, key: &str, value: Vec) -> Result<()> { - self.config_map.write().set(key, value, ()) + pub fn set_config(&self, key: &str, value: String) -> Result<()> { + self.config_map.write().set(key, value, &mut ()) } pub fn set_config_report( &self, key: &str, - value: Vec, - reporter: impl ConfigReporter, + value: String, + mut reporter: impl ConfigReporter, ) -> Result<()> { - self.config_map.write().set(key, value, reporter) + self.config_map.write().set(key, value, &mut reporter) } pub fn session_id(&self) -> SessionId { @@ -655,7 +655,7 @@ impl SessionImpl { let (schema_name, relation_name) = { let (schema_name, relation_name) = Binder::resolve_schema_qualified_name(db_name, name)?; - let search_path = self.config().get_search_path(); + let search_path = self.config().search_path(); let user_name = &self.auth_context().user_name; let schema_name = match schema_name { Some(schema_name) => schema_name, @@ -682,7 +682,7 @@ impl SessionImpl { let (schema_name, connection_name) = { let (schema_name, connection_name) = Binder::resolve_schema_qualified_name(db_name, name)?; - let search_path = self.config().get_search_path(); + let search_path = self.config().search_path(); let user_name = &self.auth_context().user_name; let schema_name = match schema_name { Some(schema_name) => schema_name, @@ -704,7 +704,7 @@ impl SessionImpl { ) -> Result<(DatabaseId, SchemaId)> { let db_name = self.database(); - let search_path = self.config().get_search_path(); + let search_path = self.config().search_path(); let user_name = &self.auth_context().user_name; let catalog_reader = self.env().catalog_reader().read_guard(); @@ -732,7 +732,7 @@ impl SessionImpl { connection_name: &str, ) -> Result> { let db_name = self.database(); - let search_path = self.config().get_search_path(); + let search_path = self.config().search_path(); let user_name = &self.auth_context().user_name; let catalog_reader = self.env().catalog_reader().read_guard(); @@ -841,7 +841,7 @@ impl SessionImpl { } pub fn is_barrier_read(&self) -> bool { - match self.config().get_visible_mode() { + match self.config().visibility_mode() { VisibilityMode::Default => self.env.batch_config.enable_barrier_read, VisibilityMode::All => true, VisibilityMode::Checkpoint => false, @@ -1129,7 +1129,7 @@ impl Session for SessionImpl { } } - fn set_config(&self, key: &str, value: Vec) -> std::result::Result<(), BoxedError> { + fn set_config(&self, key: &str, value: String) -> std::result::Result<(), BoxedError> { Self::set_config(self, key, value).map_err(Into::into) } diff --git a/src/frontend/src/session/transaction.rs b/src/frontend/src/session/transaction.rs index 5b704266033df..99692fd09d52e 100644 --- a/src/frontend/src/session/transaction.rs +++ b/src/frontend/src/session/transaction.rs @@ -17,6 +17,7 @@ use std::sync::{Arc, Weak}; use parking_lot::{MappedMutexGuard, Mutex, MutexGuard}; use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::util::epoch::Epoch; use super::SessionImpl; use crate::catalog::catalog_service::CatalogWriter; @@ -201,7 +202,7 @@ impl SessionImpl { self.txn_ctx() .snapshot .get_or_insert_with(|| { - let query_epoch = self.config().get_query_epoch(); + let query_epoch = self.config().query_epoch().map(|epoch| Epoch(epoch.get())); if let Some(query_epoch) = query_epoch { ReadSnapshot::Other(query_epoch) diff --git a/src/frontend/src/utils/overwrite_options.rs b/src/frontend/src/utils/overwrite_options.rs index 14e838e689d7b..bc3514527add2 100644 --- a/src/frontend/src/utils/overwrite_options.rs +++ b/src/frontend/src/utils/overwrite_options.rs @@ -35,7 +35,10 @@ impl OverwriteOptions { // FIXME(tabVersion): validate the value Some(x.parse::().unwrap()) } else { - args.session.config().get_streaming_rate_limit() + args.session + .config() + .streaming_rate_limit() + .map(|limit| limit.get() as u32) } }; let ttl = args diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index db610c8f8f504..a4867ba9d5ae8 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1232,7 +1232,7 @@ pub enum Statement { SetVariable { local: bool, variable: Ident, - value: Vec, + value: SetVariableValue, }, /// `SHOW ` /// @@ -1691,7 +1691,6 @@ impl fmt::Display for Statement { f, "{name} = {value}", name = variable, - value = display_comma_separated(value) ) } Statement::ShowVariable { variable } => { @@ -2655,6 +2654,7 @@ impl fmt::Display for CreateFunctionUsing { pub enum SetVariableValue { Ident(Ident), Literal(Value), + List(Vec), Default, } @@ -2664,6 +2664,11 @@ impl fmt::Display for SetVariableValue { match self { Ident(ident) => write!(f, "{}", ident), Literal(literal) => write!(f, "{}", literal), + List(list) => write!( + f, + "{}", + list.iter().map(|value| value.to_string()).join(", ") + ), Default => write!(f, "DEFAULT"), } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index a8a20db439f11..439aed4a18e7f 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3224,20 +3224,32 @@ impl Parser { } fn parse_set_variable(&mut self) -> Result { - let token = self.peek_token(); - match (self.parse_value(), token.token) { - (Ok(value), _) => Ok(SetVariableValue::Literal(value)), - (Err(_), Token::Word(w)) => { - if w.keyword == Keyword::DEFAULT { - Ok(SetVariableValue::Default) - } else { - Ok(SetVariableValue::Ident(w.to_ident()?)) + let mut values = vec![]; + loop { + let token = self.peek_token(); + let value = match (self.parse_value(), token.token) { + (Ok(value), _) => SetVariableValue::Literal(value), + (Err(_), Token::Word(w)) => { + if w.keyword == Keyword::DEFAULT { + SetVariableValue::Default + } else { + SetVariableValue::Ident(w.to_ident()?) + } } - } - (Err(_), unexpected) => { - self.expected("variable value", unexpected.with_location(token.location)) + (Err(_), unexpected) => { + self.expected("variable value", unexpected.with_location(token.location))? + } + }; + values.push(value); + if !self.consume_token(&Token::Comma) { + break; } } + if values.len() == 1 { + Ok(values[0].clone()) + } else { + Ok(SetVariableValue::List(values)) + } } pub fn parse_number_value(&mut self) -> Result { @@ -4015,19 +4027,12 @@ impl Parser { } let variable = self.parse_identifier()?; if self.consume_token(&Token::Eq) || self.parse_keyword(Keyword::TO) { - let mut values = vec![]; - loop { - let value = self.parse_set_variable()?; - values.push(value); - if self.consume_token(&Token::Comma) { - continue; - } - return Ok(Statement::SetVariable { - local: modifier == Some(Keyword::LOCAL), - variable, - value: values, - }); - } + let value = self.parse_set_variable()?; + Ok(Statement::SetVariable { + local: modifier == Some(Keyword::LOCAL), + variable, + value, + }) } else if variable.value == "CHARACTERISTICS" { self.expect_keywords(&[Keyword::AS, Keyword::TRANSACTION])?; Ok(Statement::SetTransaction { diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index 7a2a737582ac9..94eb2d53fbfa5 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -392,7 +392,7 @@ fn parse_set() { Statement::SetVariable { local: false, variable: "a".into(), - value: vec![SetVariableValue::Ident("b".into())], + value: SetVariableValue::Ident("b".into()), } ); @@ -402,9 +402,7 @@ fn parse_set() { Statement::SetVariable { local: false, variable: "a".into(), - value: vec![SetVariableValue::Literal(Value::SingleQuotedString( - "b".into() - ))], + value: SetVariableValue::Literal(Value::SingleQuotedString("b".into())), } ); @@ -414,7 +412,7 @@ fn parse_set() { Statement::SetVariable { local: false, variable: "a".into(), - value: vec![SetVariableValue::Literal(number("0"))], + value: SetVariableValue::Literal(number("0")), } ); @@ -424,7 +422,7 @@ fn parse_set() { Statement::SetVariable { local: false, variable: "a".into(), - value: vec![SetVariableValue::Default], + value: SetVariableValue::Default, } ); @@ -434,7 +432,7 @@ fn parse_set() { Statement::SetVariable { local: true, variable: "a".into(), - value: vec![SetVariableValue::Ident("b".into())], + value: SetVariableValue::Ident("b".into()), } ); diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index b7a68b8a50722..cb250b7a6e16c 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -405,7 +405,7 @@ where let application_name = msg.config.get("application_name"); if let Some(application_name) = application_name { session - .set_config("application_name", vec![application_name.clone()]) + .set_config("application_name", application_name.clone()) .map_err(PsqlError::StartupError)?; } diff --git a/src/utils/pgwire/src/pg_server.rs b/src/utils/pgwire/src/pg_server.rs index 8743f387d6646..8967ab6e8f3ba 100644 --- a/src/utils/pgwire/src/pg_server.rs +++ b/src/utils/pgwire/src/pg_server.rs @@ -106,7 +106,7 @@ pub trait Session: Send + Sync { fn id(&self) -> SessionId; - fn set_config(&self, key: &str, value: Vec) -> Result<(), BoxedError>; + fn set_config(&self, key: &str, value: String) -> Result<(), BoxedError>; fn transaction_status(&self) -> TransactionStatus; @@ -351,7 +351,7 @@ mod tests { (0, 0) } - fn set_config(&self, _key: &str, _value: Vec) -> Result<(), BoxedError> { + fn set_config(&self, _key: &str, _value: String) -> Result<(), BoxedError> { Ok(()) }