Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(session_config): system wide session config by alter system set #16062

Merged
merged 28 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,27 @@ service SystemParamsService {
rpc SetSystemParam(SetSystemParamRequest) returns (SetSystemParamResponse);
}

message GetSessionParamsRequest {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we first unify the terms? Or is there any difference between "session config" and "session param"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use session config in frontend. But I guess it's ok to just use session params everywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But let me rename it in the next pr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, why renaming in next

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably won't want to review 200 lines of rename in this pr...


message GetSessionParamsResponse {
string params = 1;
}

message SetSessionParamRequest {
string param = 1;
// None means set to default value.
optional string value = 2;
}

message SetSessionParamResponse {
string params = 1;
}

service SessionParamService {
rpc GetSessionParams(GetSessionParamsRequest) returns (GetSessionParamsResponse);
rpc SetSessionParam(SetSessionParamRequest) returns (SetSessionParamResponse);
}

message GetServingVnodeMappingsRequest {}

message GetServingVnodeMappingsResponse {
Expand Down
38 changes: 29 additions & 9 deletions src/common/proc_macro/src/session_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
let mut get_match_branches = vec![];
let mut reset_match_branches = vec![];
let mut show_all_list = vec![];
let mut list_all_list = vec![];
let mut alias_to_entry_name_branches = vec![];
let mut entry_name_list = vec![];

for field in fields {
let field_ident = field.ident.expect_or_abort("Field need to be named");
Expand Down Expand Up @@ -224,20 +226,26 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
#entry_name => Ok(self.#reset_func_name(reporter)),
});

let var_info = quote! {
VariableInfo {
name: #entry_name.to_string(),
setting: self.#field_ident.to_string(),
description : #description.to_string(),
},
};
list_all_list.push(var_info.clone());
if !flags.contains(&"NO_SHOW_ALL") {
show_all_list.push(quote! {
VariableInfo {
name: #entry_name.to_string(),
setting: self.#field_ident.to_string(),
description : #description.to_string(),
},

});
show_all_list.push(var_info);
}
entry_name_list.push(entry_name);
}

let struct_ident = input.ident;
quote! {
use std::collections::HashSet;
use std::sync::LazyLock;
static PARAM_NAMES: LazyLock<HashSet<&'static str>> = LazyLock::new(|| HashSet::from([#(#entry_name_list, )*]));

impl Default for #struct_ident {
#[allow(clippy::useless_conversion)]
fn default() -> Self {
Expand Down Expand Up @@ -292,12 +300,24 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
}
}

/// Show all parameters.
/// Show all parameters except those specified `NO_SHOW_ALL`.
pub fn show_all(&self) -> Vec<VariableInfo> {
vec![
#(#show_all_list)*
]
}

/// List all parameters
pub fn list_all(&self) -> Vec<VariableInfo> {
Comment on lines +319 to +327
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference of these two methods is subtle. Is NO_SHOW_ALL essentially hidden? 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, which set of parameters should be attributed with NO_SHOW_ALL and why do they need this? Is there any documentation from Postgres that we can refer to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vec![
#(#list_all_list)*
]
}

/// Check if `SessionConfig` has a parameter.
pub fn has_param(key_name: &str) -> bool {
PARAM_NAMES.contains(key_name)
}
}
}
}
2 changes: 1 addition & 1 deletion src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub const RW_VERSION: &str = env!("CARGO_PKG_VERSION");
/// Placeholder for unknown git sha.
pub const UNKNOWN_GIT_SHA: &str = "unknown";

// The single source of truth of the pg parameters, Used in ConfigMap and current_cluster_version.
// The single source of truth of the pg parameters, Used in SessionConfig and current_cluster_version.
// The version of PostgreSQL that Risingwave claims to be.
pub const PG_VERSION: &str = "13.14.0";
/// The version of PostgreSQL that Risingwave claims to be.
Expand Down
36 changes: 32 additions & 4 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ mod visibility_mode;
use chrono_tz::Tz;
pub use over_window::OverWindowCachePolicy;
pub use query_mode::QueryMode;
use risingwave_common_proc_macro::SessionConfig;
use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use thiserror::Error;

use self::non_zero64::ConfigNonZeroU64;
Expand All @@ -50,12 +52,14 @@ pub enum SessionConfigError {

type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;

#[serde_as]
/// This is the Session Config of RisingWave.
#[derive(SessionConfig)]
pub struct ConfigMap {
#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
pub struct SessionConfig {
/// If `RW_IMPLICIT_FLUSH` is on, then every INSERT/UPDATE/DELETE statement will block
/// until the entire dataflow is refreshed. In other words, every related table & MV will
/// be able to see the write.
#[serde(rename = "rw_implicit_flush")]
#[parameter(default = false, rename = "rw_implicit_flush")]
implicit_flush: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 renames here 😄. How about just renaming the field to rw_implicit_flush

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this will cause too many unrelated changes to this pr. Maybe later.


Expand All @@ -67,6 +71,7 @@ pub struct ConfigMap {
/// A temporary config variable to force query running in either local or distributed mode.
/// The default value is auto which means let the system decide to run batch queries in local
/// or distributed mode automatically.
#[serde_as(as = "DisplayFromStr")]
#[parameter(default = QueryMode::default())]
query_mode: QueryMode,

Expand All @@ -82,20 +87,24 @@ pub struct ConfigMap {

/// It is typically set by an application upon connection to the server.
/// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-DATESTYLE>
#[serde(rename = "datestyle")]
#[parameter(default = "", rename = "datestyle")]
date_style: String,

/// Force the use of lookup join instead of hash join when possible for local batch execution.
#[serde(rename = "rw_batch_enable_lookup_join")]
#[parameter(default = true, rename = "rw_batch_enable_lookup_join")]
batch_enable_lookup_join: bool,

/// Enable usage of sortAgg instead of hash agg when order property is satisfied in batch
/// execution
#[serde(rename = "rw_batch_enable_sort_agg")]
#[parameter(default = true, rename = "rw_batch_enable_sort_agg")]
batch_enable_sort_agg: bool,

/// Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).
/// No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.
#[serde(rename = "batch_enable_distributed_dml")]
#[parameter(default = false, rename = "batch_enable_distributed_dml")]
batch_enable_distributed_dml: bool,

Expand All @@ -106,19 +115,23 @@ pub struct ConfigMap {
/// Sets the order in which schemas are searched when an object (table, data type, function, etc.)
/// is referenced by a simple name with no schema specified.
/// See <https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SEARCH-PATH>
#[serde_as(as = "DisplayFromStr")]
#[parameter(default = SearchPath::default())]
search_path: SearchPath,

/// If `VISIBILITY_MODE` is all, we will support querying data without checkpoint.
#[serde_as(as = "DisplayFromStr")]
#[parameter(default = VisibilityMode::default())]
visibility_mode: VisibilityMode,

/// See <https://www.postgresql.org/docs/current/transaction-iso.html>
#[serde_as(as = "DisplayFromStr")]
#[parameter(default = IsolationLevel::default())]
transaction_isolation: IsolationLevel,

/// Select as of specific epoch.
/// Sets the historical epoch for querying data. If 0, querying latest data.
#[serde_as(as = "DisplayFromStr")]
#[parameter(default = ConfigNonZeroU64::default())]
query_epoch: ConfigNonZeroU64,

Expand All @@ -128,14 +141,17 @@ pub struct ConfigMap {

/// If `STREAMING_PARALLELISM` is non-zero, CREATE MATERIALIZED VIEW/TABLE/INDEX will use it as
/// streaming parallelism.
#[serde_as(as = "DisplayFromStr")]
#[parameter(default = ConfigNonZeroU64::default())]
streaming_parallelism: ConfigNonZeroU64,

/// Enable delta join for streaming queries. Defaults to false.
#[serde(rename = "rw_streaming_enable_delta_join")]
st1page marked this conversation as resolved.
Show resolved Hide resolved
#[parameter(default = false, rename = "rw_streaming_enable_delta_join")]
streaming_enable_delta_join: bool,

/// Enable bushy join for streaming queries. Defaults to true.
#[serde(rename = "rw_streaming_enable_bushy_join")]
#[parameter(default = true, rename = "rw_streaming_enable_bushy_join")]
streaming_enable_bushy_join: bool,

Expand All @@ -144,39 +160,47 @@ pub struct ConfigMap {
streaming_use_arrangement_backfill: bool,

/// Allow `jsonb` in stream key
#[serde(rename = "rw_streaming_allow_jsonb_in_stream_key")]
#[parameter(default = false, rename = "rw_streaming_allow_jsonb_in_stream_key")]
streaming_allow_jsonb_in_stream_key: bool,

/// Enable join ordering for streaming and batch queries. Defaults to true.
#[serde(rename = "rw_enable_join_ordering")]
#[parameter(default = true, rename = "rw_enable_join_ordering")]
enable_join_ordering: bool,

/// Enable two phase agg optimization. Defaults to true.
/// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false.
#[serde(rename = "rw_enable_two_phase_agg")]
#[parameter(default = true, flags = "SETTER", rename = "rw_enable_two_phase_agg")]
enable_two_phase_agg: bool,

/// Force two phase agg optimization whenever there's a choice between
/// optimizations. Defaults to false.
/// Setting this to true will always set `ENABLE_TWO_PHASE_AGG` to false.
#[serde(rename = "rw_force_two_phase_agg")]
#[parameter(default = false, flags = "SETTER", rename = "rw_force_two_phase_agg")]
force_two_phase_agg: bool,

/// Enable sharing of common sub-plans.
/// This means that DAG structured query plans can be constructed,
#[serde(rename = "rw_enable_share_plan")]
#[parameter(default = true, rename = "rw_enable_share_plan")]
/// rather than only tree structured query plans.
enable_share_plan: bool,

/// Enable split distinct agg
#[serde(rename = "rw_force_split_distinct_agg")]
#[parameter(default = false, rename = "rw_force_split_distinct_agg")]
force_split_distinct_agg: bool,

/// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
#[serde(rename = "intervalstyle")]
#[parameter(default = "", rename = "intervalstyle")]
interval_style: String,

/// If `BATCH_PARALLELISM` is non-zero, batch queries will use this parallelism.
#[serde_as(as = "DisplayFromStr")]
#[parameter(default = ConfigNonZeroU64::default())]
batch_parallelism: ConfigNonZeroU64,

Expand All @@ -197,6 +221,7 @@ pub struct ConfigMap {
client_encoding: String,

/// Enable decoupling sink and internal streaming graph or not
#[serde_as(as = "DisplayFromStr")]
#[parameter(default = SinkDecouple::default())]
sink_decouple: SinkDecouple,

Expand Down Expand Up @@ -231,11 +256,14 @@ pub struct ConfigMap {
standard_conforming_strings: String,

/// Set streaming rate limit (rows per second) for each parallelism for mv backfilling
#[serde_as(as = "DisplayFromStr")]
#[parameter(default = ConfigNonZeroU64::default())]
streaming_rate_limit: ConfigNonZeroU64,

/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
#[serde_as(as = "DisplayFromStr")]
#[serde(rename = "rw_streaming_over_window_cache_policy")]
#[parameter(default = OverWindowCachePolicy::default(), rename = "rw_streaming_over_window_cache_policy")]
streaming_over_window_cache_policy: OverWindowCachePolicy,

Expand Down Expand Up @@ -275,7 +303,7 @@ fn check_bytea_output(val: &str) -> Result<(), String> {
}
}

impl ConfigMap {
impl SessionConfig {
pub fn set_force_two_phase_agg(
&mut self,
val: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/session_config/search_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub const USER_NAME_WILD_CARD: &str = "\"$user\"";
/// valid schema in `search_path`.
///
/// 3. when we `create` a `index` or `sink`, it will use the schema of the associated table.
#[derive(Clone)]
#[derive(Clone, Debug, PartialEq)]
pub struct SearchPath {
origin_str: String,
/// The path will implicitly includes `rw_catalog` and `pg_catalog` if user does specify them.
Expand Down
16 changes: 11 additions & 5 deletions src/common/src/session_config/transaction_isolation_level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
use std::fmt::Formatter;
use std::str::FromStr;

use crate::error::{bail_not_implemented, NotImplemented};

#[derive(Copy, Default, Debug, Clone, PartialEq, Eq)]
// Some variants are never constructed so allow dead code here.
#[allow(dead_code)]
Expand All @@ -29,10 +27,18 @@ pub enum IsolationLevel {
}

impl FromStr for IsolationLevel {
type Err = NotImplemented;
type Err = &'static str;

fn from_str(_s: &str) -> Result<Self, Self::Err> {
bail_not_implemented!(issue = 10736, "isolation level");
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply bail will cause deserialization fail

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should ignore the case.

"read committed" => Ok(Self::ReadCommitted),
"read uncommitted" => Ok(Self::ReadUncommitted),
"repeatable read" => Ok(Self::RepeatableRead),
"serializable" => Ok(Self::Serializable),
_ => Err(
"expect one of [read committed, read uncommitted, repeatable read, serializable]",
),
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;

use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::session_config::{ConfigMap, SearchPath};
use risingwave_common::session_config::{SearchPath, SessionConfig};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_sqlparser::ast::{
Expand Down Expand Up @@ -107,7 +107,7 @@ pub struct Binder {
/// and so on.
next_share_id: ShareId,

session_config: Arc<RwLock<ConfigMap>>,
session_config: Arc<RwLock<SessionConfig>>,

search_path: SearchPath,
/// The type of binding statement.
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_common::catalog::{
MAX_SYS_CATALOG_NUM, SYS_CATALOG_START_ID,
};
use risingwave_common::error::BoxedError;
use risingwave_common::session_config::ConfigMap;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::types::DataType;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
Expand Down Expand Up @@ -110,7 +110,7 @@ pub struct SysCatalogReaderImpl {
// Read auth context.
auth_context: Arc<AuthContext>,
// Read config.
config: Arc<RwLock<ConfigMap>>,
config: Arc<RwLock<SessionConfig>>,
// Read system params.
system_params: SystemParamsReaderRef,
}
Expand All @@ -122,7 +122,7 @@ impl SysCatalogReaderImpl {
worker_node_manager: WorkerNodeManagerRef,
meta_client: Arc<dyn FrontendMetaClient>,
auth_context: Arc<AuthContext>,
config: Arc<RwLock<ConfigMap>>,
config: Arc<RwLock<SessionConfig>>,
system_params: SystemParamsReaderRef,
) -> Self {
Self {
Expand Down
Loading
Loading