-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(session_config): system wide session config by alter system set #16062
Changes from 6 commits
5c4feaa
ca6f6d0
32ba5e9
144e252
7c08141
67342c9
d2f19bd
ca0122d
03e628c
9994716
eb0afeb
0cbefc8
2f08d79
07703b0
e372d1c
856b117
feac7df
e7d1c76
5cd809f
4ec9547
0414e4e
415f3af
7c9e5fd
70e15c8
4139c1a
770b324
88b79aa
1010558
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,7 +40,9 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream { | |
let mut get_match_branches = vec![]; | ||
let mut reset_match_branches = vec![]; | ||
let mut show_all_list = vec![]; | ||
let mut list_all_list = vec![]; | ||
let mut alias_to_entry_name_branches = vec![]; | ||
let mut entry_name_list = vec![]; | ||
|
||
for field in fields { | ||
let field_ident = field.ident.expect_or_abort("Field need to be named"); | ||
|
@@ -224,20 +226,26 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream { | |
#entry_name => Ok(self.#reset_func_name(reporter)), | ||
}); | ||
|
||
let var_info = quote! { | ||
VariableInfo { | ||
name: #entry_name.to_string(), | ||
setting: self.#field_ident.to_string(), | ||
description : #description.to_string(), | ||
}, | ||
}; | ||
list_all_list.push(var_info.clone()); | ||
if !flags.contains(&"NO_SHOW_ALL") { | ||
show_all_list.push(quote! { | ||
VariableInfo { | ||
name: #entry_name.to_string(), | ||
setting: self.#field_ident.to_string(), | ||
description : #description.to_string(), | ||
}, | ||
|
||
}); | ||
show_all_list.push(var_info); | ||
} | ||
entry_name_list.push(entry_name); | ||
} | ||
|
||
let struct_ident = input.ident; | ||
quote! { | ||
use std::collections::HashSet; | ||
use std::sync::LazyLock; | ||
static PARAM_NAMES: LazyLock<HashSet<&'static str>> = LazyLock::new(|| HashSet::from([#(#entry_name_list, )*])); | ||
|
||
impl Default for #struct_ident { | ||
#[allow(clippy::useless_conversion)] | ||
fn default() -> Self { | ||
|
@@ -292,12 +300,24 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream { | |
} | ||
} | ||
|
||
/// Show all parameters. | ||
/// Show all parameters except those specified `NO_SHOW_ALL`. | ||
pub fn show_all(&self) -> Vec<VariableInfo> { | ||
vec![ | ||
#(#show_all_list)* | ||
] | ||
} | ||
|
||
/// List all parameters | ||
pub fn list_all(&self) -> Vec<VariableInfo> { | ||
Comment on lines
+319
to
+327
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The difference of these two methods is subtle. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, which set of parameters should be attributed with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
vec![ | ||
#(#list_all_list)* | ||
] | ||
} | ||
|
||
/// Check if `SessionConfig` has a parameter. | ||
pub fn has_param(key_name: &str) -> bool { | ||
PARAM_NAMES.contains(key_name) | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,8 +23,10 @@ mod visibility_mode; | |
use chrono_tz::Tz; | ||
pub use over_window::OverWindowCachePolicy; | ||
pub use query_mode::QueryMode; | ||
use risingwave_common_proc_macro::SessionConfig; | ||
use risingwave_common_proc_macro::{ConfigDoc, SessionConfig}; | ||
pub use search_path::{SearchPath, USER_NAME_WILD_CARD}; | ||
use serde::{Deserialize, Serialize}; | ||
use serde_with::{serde_as, DisplayFromStr}; | ||
use thiserror::Error; | ||
|
||
use self::non_zero64::ConfigNonZeroU64; | ||
|
@@ -50,12 +52,14 @@ pub enum SessionConfigError { | |
|
||
type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>; | ||
|
||
#[serde_as] | ||
/// This is the Session Config of RisingWave. | ||
#[derive(SessionConfig)] | ||
pub struct ConfigMap { | ||
#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)] | ||
pub struct SessionConfig { | ||
/// If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block | ||
/// until the entire dataflow is refreshed. In other words, every related table & MV will | ||
/// be able to see the write. | ||
#[serde(rename = "rw_implicit_flush")] | ||
#[parameter(default = false, rename = "rw_implicit_flush")] | ||
implicit_flush: bool, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 2 renames here 😄. How about just renaming the field to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rename this will cause too many unrelated changes to this pr. Maybe later. |
||
|
||
|
@@ -67,6 +71,7 @@ pub struct ConfigMap { | |
/// A temporary config variable to force query running in either local or distributed mode. | ||
/// The default value is auto which means let the system decide to run batch queries in local | ||
/// or distributed mode automatically. | ||
#[serde_as(as = "DisplayFromStr")] | ||
#[parameter(default = QueryMode::default())] | ||
query_mode: QueryMode, | ||
|
||
|
@@ -82,20 +87,24 @@ pub struct ConfigMap { | |
|
||
/// It is typically set by an application upon connection to the server. | ||
/// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-DATESTYLE> | ||
#[serde(rename = "datestyle")] | ||
#[parameter(default = "", rename = "datestyle")] | ||
date_style: String, | ||
|
||
/// Force the use of lookup join instead of hash join when possible for local batch execution. | ||
#[serde(rename = "rw_batch_enable_lookup_join")] | ||
#[parameter(default = true, rename = "rw_batch_enable_lookup_join")] | ||
batch_enable_lookup_join: bool, | ||
|
||
/// Enable usage of sortAgg instead of hash agg when order property is satisfied in batch | ||
/// execution | ||
#[serde(rename = "rw_batch_enable_sort_agg")] | ||
#[parameter(default = true, rename = "rw_batch_enable_sort_agg")] | ||
batch_enable_sort_agg: bool, | ||
|
||
/// Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes). | ||
/// No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens. | ||
#[serde(rename = "batch_enable_distributed_dml")] | ||
#[parameter(default = false, rename = "batch_enable_distributed_dml")] | ||
batch_enable_distributed_dml: bool, | ||
|
||
|
@@ -106,19 +115,23 @@ pub struct ConfigMap { | |
/// Sets the order in which schemas are searched when an object (table, data type, function, etc.) | ||
/// is referenced by a simple name with no schema specified. | ||
/// See <https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SEARCH-PATH> | ||
#[serde_as(as = "DisplayFromStr")] | ||
#[parameter(default = SearchPath::default())] | ||
search_path: SearchPath, | ||
|
||
/// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint. | ||
#[serde_as(as = "DisplayFromStr")] | ||
#[parameter(default = VisibilityMode::default())] | ||
visibility_mode: VisibilityMode, | ||
|
||
/// See <https://www.postgresql.org/docs/current/transaction-iso.html> | ||
#[serde_as(as = "DisplayFromStr")] | ||
#[parameter(default = IsolationLevel::default())] | ||
transaction_isolation: IsolationLevel, | ||
|
||
/// Select as of specific epoch. | ||
/// Sets the historical epoch for querying data. If 0, querying latest data. | ||
#[serde_as(as = "DisplayFromStr")] | ||
#[parameter(default = ConfigNonZeroU64::default())] | ||
query_epoch: ConfigNonZeroU64, | ||
|
||
|
@@ -128,14 +141,17 @@ pub struct ConfigMap { | |
|
||
/// If `STREAMING_PARALLELISM` is non-zero, CREATE MATERIALIZED VIEW/TABLE/INDEX will use it as | ||
/// streaming parallelism. | ||
#[serde_as(as = "DisplayFromStr")] | ||
#[parameter(default = ConfigNonZeroU64::default())] | ||
streaming_parallelism: ConfigNonZeroU64, | ||
|
||
/// Enable delta join for streaming queries. Defaults to false. | ||
#[serde(rename = "rw_streaming_enable_delta_join")] | ||
st1page marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#[parameter(default = false, rename = "rw_streaming_enable_delta_join")] | ||
streaming_enable_delta_join: bool, | ||
|
||
/// Enable bushy join for streaming queries. Defaults to true. | ||
#[serde(rename = "rw_streaming_enable_bushy_join")] | ||
#[parameter(default = true, rename = "rw_streaming_enable_bushy_join")] | ||
streaming_enable_bushy_join: bool, | ||
|
||
|
@@ -144,39 +160,47 @@ pub struct ConfigMap { | |
streaming_use_arrangement_backfill: bool, | ||
|
||
/// Allow `jsonb` in stream key | ||
#[serde(rename = "rw_streaming_allow_jsonb_in_stream_key")] | ||
#[parameter(default = false, rename = "rw_streaming_allow_jsonb_in_stream_key")] | ||
streaming_allow_jsonb_in_stream_key: bool, | ||
|
||
/// Enable join ordering for streaming and batch queries. Defaults to true. | ||
#[serde(rename = "rw_enable_join_ordering")] | ||
#[parameter(default = true, rename = "rw_enable_join_ordering")] | ||
enable_join_ordering: bool, | ||
|
||
/// Enable two phase agg optimization. Defaults to true. | ||
/// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false. | ||
#[serde(rename = "rw_enable_two_phase_agg")] | ||
#[parameter(default = true, flags = "SETTER", rename = "rw_enable_two_phase_agg")] | ||
enable_two_phase_agg: bool, | ||
|
||
/// Force two phase agg optimization whenever there's a choice between | ||
/// optimizations. Defaults to false. | ||
/// Setting this to true will always set `ENABLE_TWO_PHASE_AGG` to false. | ||
#[serde(rename = "rw_force_two_phase_agg")] | ||
#[parameter(default = false, flags = "SETTER", rename = "rw_force_two_phase_agg")] | ||
force_two_phase_agg: bool, | ||
|
||
/// Enable sharing of common sub-plans. | ||
/// This means that DAG structured query plans can be constructed, | ||
#[serde(rename = "rw_enable_share_plan")] | ||
#[parameter(default = true, rename = "rw_enable_share_plan")] | ||
/// rather than only tree structured query plans. | ||
enable_share_plan: bool, | ||
|
||
/// Enable split distinct agg | ||
#[serde(rename = "rw_force_split_distinct_agg")] | ||
#[parameter(default = false, rename = "rw_force_split_distinct_agg")] | ||
force_split_distinct_agg: bool, | ||
|
||
/// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE> | ||
#[serde(rename = "intervalstyle")] | ||
#[parameter(default = "", rename = "intervalstyle")] | ||
interval_style: String, | ||
|
||
/// If `BATCH_PARALLELISM` is non-zero, batch queries will use this parallelism. | ||
#[serde_as(as = "DisplayFromStr")] | ||
#[parameter(default = ConfigNonZeroU64::default())] | ||
batch_parallelism: ConfigNonZeroU64, | ||
|
||
|
@@ -197,6 +221,7 @@ pub struct ConfigMap { | |
client_encoding: String, | ||
|
||
/// Enable decoupling sink and internal streaming graph or not | ||
#[serde_as(as = "DisplayFromStr")] | ||
#[parameter(default = SinkDecouple::default())] | ||
sink_decouple: SinkDecouple, | ||
|
||
|
@@ -231,11 +256,14 @@ pub struct ConfigMap { | |
standard_conforming_strings: String, | ||
|
||
/// Set streaming rate limit (rows per second) for each parallelism for mv backfilling | ||
#[serde_as(as = "DisplayFromStr")] | ||
#[parameter(default = ConfigNonZeroU64::default())] | ||
streaming_rate_limit: ConfigNonZeroU64, | ||
|
||
/// Cache policy for partition cache in streaming over window. | ||
/// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`". | ||
#[serde_as(as = "DisplayFromStr")] | ||
#[serde(rename = "rw_streaming_over_window_cache_policy")] | ||
#[parameter(default = OverWindowCachePolicy::default(), rename = "rw_streaming_over_window_cache_policy")] | ||
streaming_over_window_cache_policy: OverWindowCachePolicy, | ||
|
||
|
@@ -275,7 +303,7 @@ fn check_bytea_output(val: &str) -> Result<(), String> { | |
} | ||
} | ||
|
||
impl ConfigMap { | ||
impl SessionConfig { | ||
pub fn set_force_two_phase_agg( | ||
&mut self, | ||
val: bool, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,8 +15,6 @@ | |
use std::fmt::Formatter; | ||
use std::str::FromStr; | ||
|
||
use crate::error::{bail_not_implemented, NotImplemented}; | ||
|
||
#[derive(Copy, Default, Debug, Clone, PartialEq, Eq)] | ||
// Some variants are never constructed so allow dead code here. | ||
#[allow(dead_code)] | ||
|
@@ -29,10 +27,18 @@ pub enum IsolationLevel { | |
} | ||
|
||
impl FromStr for IsolationLevel { | ||
type Err = NotImplemented; | ||
type Err = &'static str; | ||
|
||
fn from_str(_s: &str) -> Result<Self, Self::Err> { | ||
bail_not_implemented!(issue = 10736, "isolation level"); | ||
fn from_str(s: &str) -> Result<Self, Self::Err> { | ||
match s { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simply bail will cause deserialization fail There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should ignore the case. |
||
"read committed" => Ok(Self::ReadCommitted), | ||
"read uncommitted" => Ok(Self::ReadUncommitted), | ||
"repeatable read" => Ok(Self::RepeatableRead), | ||
"serializable" => Ok(Self::Serializable), | ||
_ => Err( | ||
"expect one of [read committed, read uncommitted, repeatable read, serializable]", | ||
), | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we first unify the terms? Or is there any difference between "session config" and "session param"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to use session config in frontend. But I guess it's ok to just use session params everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But let me rename it in the next pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, why renaming in next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably won't want to review 200 lines of rename in this pr...