Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(session_config): system wide session config by alter system set #16062

Merged
merged 28 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions e2e_test/ddl/alter_session_params.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
statement ok
set RW_STREAMING_ENABLE_DELTA_JOIN to true;

statement error session param query_mode cannot be altered system wide
alter system set query_mode to auto;

connection other1
query T
show RW_STREAMING_ENABLE_DELTA_JOIN;
----
false

statement ok
set RW_STREAMING_ENABLE_DELTA_JOIN to false;

statement ok
alter system set rw_streaming_enable_delta_join to true;
yuhao-su marked this conversation as resolved.
Show resolved Hide resolved

query T
show RW_STREAMING_ENABLE_DELTA_JOIN;
----
false

sleep 1s

connection other2
query T
show RW_STREAMING_ENABLE_DELTA_JOIN;
----
true

statement ok
alter system set RW_STREAMING_ENABLE_DELTA_JOIN to default;
12 changes: 0 additions & 12 deletions e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,3 @@ Caused by these errors (recent errors listed first):
1: Failed to get/set session config
2: Invalid value `maybe` for `rw_implicit_flush`
3: Invalid bool


statement error
set transaction_isolation to 'read committed';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is for testing the error ui for setting an invalid value to a session variable. If you find this not applicable to transaction_isolation, please replace it with another session variable instead of simply removing it.

----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Failed to get/set session config
2: Invalid value `read committed` for `transaction_isolation`
3: Feature is not yet implemented: isolation level
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/10736
24 changes: 24 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ message MetaSnapshot {
repeated catalog.Connection connections = 17;
repeated catalog.Subscription subscriptions = 19;
repeated user.UserInfo users = 8;
GetSessionParamsResponse session_params = 20;
// for streaming
repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
repeated common.WorkerNode nodes = 10;
Expand Down Expand Up @@ -432,6 +433,7 @@ message SubscribeResponse {
catalog.Schema schema = 5;
catalog.Function function = 6;
user.UserInfo user = 11;
SetSessionParamRequest session_param = 26;
// for streaming
FragmentParallelUnitMapping parallel_unit_mapping = 12;
common.WorkerNode node = 13;
Expand Down Expand Up @@ -596,6 +598,28 @@ service SystemParamsService {
rpc SetSystemParam(SetSystemParamRequest) returns (SetSystemParamResponse);
}

message GetSessionParamsRequest {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we first unify the terms? Or is there any difference between "session config" and "session param"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use session config in frontend. But I guess it's ok to just use session params everywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But let me rename it in the next pr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, why renaming in next

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably won't want to review 200 lines of rename in this pr...


message GetSessionParamsResponse {
string params = 1;
}

message SetSessionParamRequest {
string param = 1;
// None means set to default value.
optional string value = 2;
}

message SetSessionParamResponse {
string param = 1;
}

// Used for alter system wide default parameters
service SessionParamService {
rpc GetSessionParams(GetSessionParamsRequest) returns (GetSessionParamsResponse);
rpc SetSessionParam(SetSessionParamRequest) returns (SetSessionParamResponse);
}

message GetServingVnodeMappingsRequest {}

message GetServingVnodeMappingsResponse {
Expand Down
2 changes: 1 addition & 1 deletion src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ where
}
Info::HummockSnapshot(_) => true,
Info::MetaBackupManifestId(_) => true,
Info::SystemParams(_) => true,
Info::SystemParams(_) | Info::SessionParam(_) => true,
Info::ServingParallelUnitMappings(_) => true,
Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(),
Info::HummockStats(_) => true,
Expand Down
1 change: 1 addition & 0 deletions src/common/proc_macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ pub fn derive_estimate_size(input: TokenStream) -> TokenStream {
/// `flags` options include
/// - `SETTER`: to manually write a `set_your_parameter_name` function, in which you should call `set_your_parameter_name_inner`.
/// - `REPORT`: to report the parameter through `ConfigReporter`
/// - `NO_ALTER_SYS`: disallow the parameter to be set by `alter system set`
#[proc_macro_derive(SessionConfig, attributes(parameter))]
#[proc_macro_error]
pub fn session_config(input: TokenStream) -> TokenStream {
Expand Down
90 changes: 67 additions & 23 deletions src/common/proc_macro/src/session_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
let mut get_match_branches = vec![];
let mut reset_match_branches = vec![];
let mut show_all_list = vec![];
let mut list_all_list = vec![];
let mut alias_to_entry_name_branches = vec![];
let mut entry_name_flags = vec![];

for field in fields {
let field_ident = field.ident.expect_or_abort("Field need to be named");
Expand Down Expand Up @@ -149,7 +151,7 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
&mut self,
val: &str,
reporter: &mut impl ConfigReporter
) -> SessionConfigResult<()> {
) -> SessionConfigResult<String> {
let val_t = #parse(val).map_err(|e| {
SessionConfigError::InvalidValue {
entry: #entry_name,
Expand All @@ -158,21 +160,20 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
}
})?;

self.#set_t_func_name(val_t, reporter)?;
Ok(())
self.#set_t_func_name(val_t, reporter).map(|val| val.to_string())
}

#[doc = #set_t_func_doc]
pub fn #gen_set_func_name(
&mut self,
val: #ty,
reporter: &mut impl ConfigReporter
) -> SessionConfigResult<()> {
) -> SessionConfigResult<#ty> {
#check_hook
#report_hook

self.#field_ident = val;
Ok(())
self.#field_ident = val.clone();
Ok(val)
}

});
Expand All @@ -181,10 +182,11 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
struct_impl_reset.push(quote! {

#[allow(clippy::useless_conversion)]
pub fn #reset_func_name(&mut self, reporter: &mut impl ConfigReporter) {
pub fn #reset_func_name(&mut self, reporter: &mut impl ConfigReporter) -> String {
let val = #default;
#report_hook
self.#field_ident = val.into();
self.#field_ident.to_string()
}
});

Expand Down Expand Up @@ -224,20 +226,42 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
#entry_name => Ok(self.#reset_func_name(reporter)),
});

if !flags.contains(&"NO_SHOW_ALL") {
show_all_list.push(quote! {
VariableInfo {
name: #entry_name.to_string(),
setting: self.#field_ident.to_string(),
description : #description.to_string(),
},
let var_info = quote! {
VariableInfo {
name: #entry_name.to_string(),
setting: self.#field_ident.to_string(),
description : #description.to_string(),
},
};
list_all_list.push(var_info.clone());

});
let no_show_all = flags.contains(&"NO_SHOW_ALL");
let no_show_all_flag: TokenStream = no_show_all.to_string().parse().unwrap();
if !no_show_all {
show_all_list.push(var_info);
}

let no_alter_sys_flag: TokenStream =
flags.contains(&"NO_ALTER_SYS").to_string().parse().unwrap();
yuhao-su marked this conversation as resolved.
Show resolved Hide resolved

entry_name_flags.push(
quote! {
(#entry_name, ParamFlags {no_show_all: #no_show_all_flag, no_alter_sys: #no_alter_sys_flag})
}
);
}

let struct_ident = input.ident;
quote! {
use std::collections::HashMap;
use std::sync::LazyLock;
static PARAM_NAME_FLAGS: LazyLock<HashMap<&'static str, ParamFlags>> = LazyLock::new(|| HashMap::from([#(#entry_name_flags, )*]));

struct ParamFlags {
no_show_all: bool,
no_alter_sys: bool,
}

impl Default for #struct_ident {
#[allow(clippy::useless_conversion)]
fn default() -> Self {
Expand All @@ -252,11 +276,11 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
Default::default()
}

fn alias_to_entry_name(key_name: &str) -> &str {
pub fn alias_to_entry_name(key_name: &str) -> String {
match key_name {
#(#alias_to_entry_name_branches)*
_ => key_name,
}
}.to_ascii_lowercase()
}

#(#struct_impl_get)*
Expand All @@ -266,9 +290,9 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
#(#struct_impl_reset)*

/// Set a parameter given it's name and value string.
pub fn set(&mut self, key_name: &str, value: String, reporter: &mut impl ConfigReporter) -> SessionConfigResult<()> {
pub fn set(&mut self, key_name: &str, value: String, reporter: &mut impl ConfigReporter) -> SessionConfigResult<String> {
let key_name = Self::alias_to_entry_name(key_name);
match key_name.to_ascii_lowercase().as_ref() {
match key_name.as_ref() {
#(#set_match_branches)*
_ => Err(SessionConfigError::UnrecognizedEntry(key_name.to_string())),
}
Expand All @@ -277,27 +301,47 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
/// Get a parameter by it's name.
pub fn get(&self, key_name: &str) -> SessionConfigResult<String> {
let key_name = Self::alias_to_entry_name(key_name);
match key_name.to_ascii_lowercase().as_ref() {
match key_name.as_ref() {
#(#get_match_branches)*
_ => Err(SessionConfigError::UnrecognizedEntry(key_name.to_string())),
}
}

/// Reset a parameter by it's name.
pub fn reset(&mut self, key_name: &str, reporter: &mut impl ConfigReporter) -> SessionConfigResult<()> {
pub fn reset(&mut self, key_name: &str, reporter: &mut impl ConfigReporter) -> SessionConfigResult<String> {
let key_name = Self::alias_to_entry_name(key_name);
match key_name.to_ascii_lowercase().as_ref() {
match key_name.as_ref() {
#(#reset_match_branches)*
_ => Err(SessionConfigError::UnrecognizedEntry(key_name.to_string())),
}
}

/// Show all parameters.
/// Show all parameters except those specified `NO_SHOW_ALL`.
pub fn show_all(&self) -> Vec<VariableInfo> {
vec![
#(#show_all_list)*
]
}

/// List all parameters
pub fn list_all(&self) -> Vec<VariableInfo> {
Comment on lines +319 to +327
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference of these two methods is subtle. Is NO_SHOW_ALL essentially hidden? 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, which set of parameters should be attributed with NO_SHOW_ALL and why do they need this? Is there any documentation from Postgres that we can refer to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vec![
#(#list_all_list)*
]
}

/// Check if `SessionConfig` has a parameter.
pub fn contains_param(key_name: &str) -> bool {
let key_name = Self::alias_to_entry_name(key_name);
PARAM_NAME_FLAGS.contains_key(key_name.as_str())
}

/// Check if `SessionConfig` has a parameter.
pub fn check_no_alter_sys(key_name: &str) -> SessionConfigResult<bool> {
let key_name = Self::alias_to_entry_name(key_name);
let flags = PARAM_NAME_FLAGS.get(key_name.as_str()).ok_or_else(|| SessionConfigError::UnrecognizedEntry(key_name.to_string()))?;
Ok(flags.no_alter_sys)
}
}
}
}
2 changes: 1 addition & 1 deletion src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub const RW_VERSION: &str = env!("CARGO_PKG_VERSION");
/// Placeholder for unknown git sha.
pub const UNKNOWN_GIT_SHA: &str = "unknown";

// The single source of truth of the pg parameters, Used in ConfigMap and current_cluster_version.
// The single source of truth of the pg parameters, Used in SessionConfig and current_cluster_version.
// The version of PostgreSQL that Risingwave claims to be.
pub const PG_VERSION: &str = "13.14.0";
/// The version of PostgreSQL that Risingwave claims to be.
Expand Down
Loading
Loading