diff --git a/e2e_test/iceberg/main.py b/e2e_test/iceberg/main.py index 03066033c6d2..c64ce725f6c3 100644 --- a/e2e_test/iceberg/main.py +++ b/e2e_test/iceberg/main.py @@ -129,6 +129,7 @@ def drop_table(args, drop_sqls): verify_sql = test_case.get("verify_sql") print(f"verify_sql:{verify_sql}") verify_data = test_case.get("verify_data") + verify_slt = test_case.get("verify_slt") cmp_sqls = test_case.get("cmp_sqls") drop_sqls = test_case["drop_sqls"] config = configparser.ConfigParser() @@ -146,6 +147,8 @@ def drop_table(args, drop_sqls): verify_result(config, verify_sql, verify_schema, verify_data) if cmp_sqls is not None and cmp_sqls != "" and len(cmp_sqls) == 2: compare_sql(config, cmp_sqls) + if verify_slt is not None and verify_slt != "": + execute_slt(config, verify_slt) if drop_sqls is not None and drop_sqls != "": drop_table(config, drop_sqls) diff --git a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table_verify.slt b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table_verify.slt new file mode 100644 index 000000000000..74629053344b --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table_verify.slt @@ -0,0 +1,25 @@ +statement ok +CREATE SOURCE iceberg_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + catalog.type = 'storage', + warehouse.path = 's3://icebergdata/demo', + database.name = 'demo_db', + table.name = 'no_partition_append_only_table', +); + +query I +SELECT id from iceberg_source ORDER by id; +---- +1 +2 +3 +4 +5 + +statement ok +DROP SOURCE iceberg_source diff --git a/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table_verify.slt b/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table_verify.slt new file mode 100644 index 000000000000..4e6beb709f92 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table_verify.slt @@ -0,0 +1,25 @@ +statement ok +CREATE SOURCE iceberg_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + catalog.type = 'storage', + warehouse.path = 's3://icebergdata/demo', + database.name = 'demo_db', + table.name = 'partition_append_only_table', +); + +query I +SELECT id from iceberg_source ORDER by id; +---- +1 +2 +3 +4 +5 + +statement ok +DROP SOURCE iceberg_source diff --git a/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table_verify.slt b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table_verify.slt new file mode 100644 index 000000000000..9d03d99aada1 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table_verify.slt @@ -0,0 +1,25 @@ +statement ok +CREATE SOURCE iceberg_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + catalog.type = 'storage', + warehouse.path = 's3://icebergdata/demo', + database.name = 'demo_db', + table.name = 'range_partition_append_only_table', +); + +query I +SELECT id from iceberg_source ORDER by id; +---- +1 +2 +3 +4 +5 + +statement ok +DROP SOURCE iceberg_source diff --git a/e2e_test/iceberg/test_case/no_partition_append_only.toml b/e2e_test/iceberg/test_case/no_partition_append_only.toml index 58f5900586d0..7d2952c50875 100644 --- a/e2e_test/iceberg/test_case/no_partition_append_only.toml +++ b/e2e_test/iceberg/test_case/no_partition_append_only.toml @@ -33,6 +33,8 @@ verify_data = """ 5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none """ +verify_slt = 'test_case/iceberg_sink_no_partition_append_only_table_verify.slt' + drop_sqls = [ 'DROP TABLE IF EXISTS demo_db.no_partition_append_only_table', 'DROP SCHEMA IF EXISTS demo_db' diff --git a/e2e_test/iceberg/test_case/partition_append_only.toml b/e2e_test/iceberg/test_case/partition_append_only.toml index 9fd55ee0b004..b36dfb98e369 100644 --- a/e2e_test/iceberg/test_case/partition_append_only.toml +++ b/e2e_test/iceberg/test_case/partition_append_only.toml @@ -35,6 +35,8 @@ verify_data = """ 5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none """ +verify_slt = 'test_case/iceberg_sink_partition_append_only_table_verify.slt' + drop_sqls = [ 'DROP TABLE IF EXISTS demo_db.partition_append_only_table', 'DROP SCHEMA IF EXISTS demo_db' diff --git a/e2e_test/iceberg/test_case/range_partition_append_only.toml b/e2e_test/iceberg/test_case/range_partition_append_only.toml index d8c12d744584..d54407f91dc1 100644 --- a/e2e_test/iceberg/test_case/range_partition_append_only.toml +++ b/e2e_test/iceberg/test_case/range_partition_append_only.toml @@ -35,6 +35,8 @@ verify_data = """ 5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none """ +verify_slt = 'test_case/iceberg_sink_range_partition_append_only_table_verify.slt' + drop_sqls = [ 'DROP TABLE IF EXISTS demo_db.range_partition_append_only_table', 'DROP SCHEMA IF EXISTS demo_db' diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 399a312758e3..10aa371c50ae 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -254,10 +254,7 @@ public void prepareUpsert(SinkRow row) { break; case UPDATE_INSERT: if (!updateFlag) { - throw Status.FAILED_PRECONDITION - .withDescription( - "an UPDATE_DELETE should precede an UPDATE_INSERT") - .asRuntimeException(); + LOG.warn("Missing an UPDATE_DELETE precede an UPDATE_INSERT"); } jdbcDialect.bindUpsertStatement(upsertStatement, conn, tableSchema, row); updateFlag = false; @@ -364,10 +361,7 @@ public void beginEpoch(long epoch) {} @Override public Optional barrier(boolean isCheckpoint) { if (updateFlag) { - throw Status.FAILED_PRECONDITION - .withDescription( - "expected UPDATE_INSERT to complete an UPDATE operation, got `sync`") - .asRuntimeException(); + LOG.warn("expect an UPDATE_INSERT to complete an UPDATE operation, got `sync`"); } return Optional.empty(); } diff --git a/proto/catalog.proto b/proto/catalog.proto index 5b4f5ae40ff4..c9eedf93f41b 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -83,8 +83,8 @@ message StreamSourceInfo { // Options specified by user in the FORMAT ENCODE clause. map format_encode_options = 14; - // Handle the source relies on any sceret. The key is the propertity name and the value is the secret id. - map secret_ref = 16; + // Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type. + map secret_ref = 16; } message Source { @@ -180,8 +180,8 @@ message Sink { // Whether it should use background ddl or block until backfill finishes. CreateType create_type = 24; - // Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id. - map secret_ref = 25; + // Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type. + map secret_ref = 25; } message Subscription { @@ -450,3 +450,14 @@ message Secret { uint32 owner = 5; uint32 schema_id = 6; } + +message SecretRef { + enum RefAsType { + UNSPECIFIED = 0; + TEXT = 1; + // AS FILE + FILE = 2; + } + uint32 secret_id = 1; + RefAsType ref_as = 2; +} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index e317fabb6539..ba3e21253b43 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1029,8 +1029,12 @@ for_all_params!(define_system_config); /// The subsections `[storage.object_store]`. #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] pub struct ObjectStoreConfig { - #[serde(default = "default::object_store_config::object_store_set_atomic_write_dir")] - pub object_store_set_atomic_write_dir: bool, + // alias is for backward compatibility + #[serde( + default = "default::object_store_config::set_atomic_write_dir", + alias = "object_store_set_atomic_write_dir" + )] + pub set_atomic_write_dir: bool, /// Retry and timeout configuration /// Description retry strategy driven by exponential back-off @@ -1045,25 +1049,36 @@ pub struct ObjectStoreConfig { impl ObjectStoreConfig { pub fn set_atomic_write_dir(&mut self) { - self.object_store_set_atomic_write_dir = true; + self.set_atomic_write_dir = true; } } /// The subsections `[storage.object_store.s3]`. #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] pub struct S3ObjectStoreConfig { - #[serde(default = "default::object_store_config::s3::object_store_keepalive_ms")] - pub object_store_keepalive_ms: Option, - #[serde(default = "default::object_store_config::s3::object_store_recv_buffer_size")] - pub object_store_recv_buffer_size: Option, - #[serde(default = "default::object_store_config::s3::object_store_send_buffer_size")] - pub object_store_send_buffer_size: Option, - #[serde(default = "default::object_store_config::s3::object_store_nodelay")] - pub object_store_nodelay: Option, - /// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead. + // alias is for backward compatibility + #[serde( + default = "default::object_store_config::s3::keepalive_ms", + alias = "object_store_keepalive_ms" + )] + pub keepalive_ms: Option, + #[serde( + default = "default::object_store_config::s3::recv_buffer_size", + alias = "object_store_recv_buffer_size" + )] + pub recv_buffer_size: Option, + #[serde( + default = "default::object_store_config::s3::send_buffer_size", + alias = "object_store_send_buffer_size" + )] + pub send_buffer_size: Option, #[serde( - default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error" + default = "default::object_store_config::s3::nodelay", + alias = "object_store_nodelay" )] + pub nodelay: Option, + /// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead. + #[serde(default = "default::object_store_config::s3::developer::retry_unknown_service_error")] pub retry_unknown_service_error: bool, #[serde(default = "default::object_store_config::s3::identity_resolution_timeout_s")] pub identity_resolution_timeout_s: u64, @@ -1076,15 +1091,17 @@ pub struct S3ObjectStoreConfig { pub struct S3ObjectStoreDeveloperConfig { /// Whether to retry s3 sdk error from which no error metadata is provided. #[serde( - default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error" + default = "default::object_store_config::s3::developer::retry_unknown_service_error", + alias = "object_store_retry_unknown_service_error" )] - pub object_store_retry_unknown_service_error: bool, + pub retry_unknown_service_error: bool, /// An array of error codes that should be retried. /// e.g. `["SlowDown", "TooManyRequests"]` #[serde( - default = "default::object_store_config::s3::developer::object_store_retryable_service_error_codes" + default = "default::object_store_config::s3::developer::retryable_service_error_codes", + alias = "object_store_retryable_service_error_codes" )] - pub object_store_retryable_service_error_codes: Vec, + pub retryable_service_error_codes: Vec, // TODO: the following field will be deprecated after opendal is stablized #[serde(default = "default::object_store_config::s3::developer::use_opendal")] @@ -1913,7 +1930,7 @@ pub mod default { const DEFAULT_REQ_BACKOFF_MAX_DELAY_MS: u64 = 10 * 1000; // 10s const DEFAULT_REQ_MAX_RETRY_ATTEMPTS: usize = 3; - pub fn object_store_set_atomic_write_dir() -> bool { + pub fn set_atomic_write_dir() -> bool { false } @@ -2001,19 +2018,19 @@ pub mod default { const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min - pub fn object_store_keepalive_ms() -> Option { + pub fn keepalive_ms() -> Option { Some(DEFAULT_KEEPALIVE_MS) // 10min } - pub fn object_store_recv_buffer_size() -> Option { + pub fn recv_buffer_size() -> Option { Some(1 << 21) // 2m } - pub fn object_store_send_buffer_size() -> Option { + pub fn send_buffer_size() -> Option { None } - pub fn object_store_nodelay() -> Option { + pub fn nodelay() -> Option { Some(true) } @@ -2026,11 +2043,11 @@ pub mod default { const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3"; - pub fn object_store_retry_unknown_service_error() -> bool { + pub fn retry_unknown_service_error() -> bool { false } - pub fn object_store_retryable_service_error_codes() -> Vec { + pub fn retryable_service_error_codes() -> Vec { vec!["SlowDown".into(), "TooManyRequests".into()] } @@ -2349,4 +2366,99 @@ mod tests { } } } + + #[test] + fn test_object_store_configs_backward_compatibility() { + // Define configs with the old name and make sure it still works + { + let config: RwConfig = toml::from_str( + r#" + [storage.object_store] + object_store_set_atomic_write_dir = true + + [storage.object_store.s3] + object_store_keepalive_ms = 1 + object_store_send_buffer_size = 1 + object_store_recv_buffer_size = 1 + object_store_nodelay = false + + [storage.object_store.s3.developer] + object_store_retry_unknown_service_error = true + object_store_retryable_service_error_codes = ['dummy'] + + + "#, + ) + .unwrap(); + + assert!(config.storage.object_store.set_atomic_write_dir); + assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1)); + assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1)); + assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1)); + assert_eq!(config.storage.object_store.s3.nodelay, Some(false)); + assert!( + config + .storage + .object_store + .s3 + .developer + .retry_unknown_service_error + ); + assert_eq!( + config + .storage + .object_store + .s3 + .developer + .retryable_service_error_codes, + vec!["dummy".to_string()] + ); + } + + // Define configs with the new name and make sure it works + { + let config: RwConfig = toml::from_str( + r#" + [storage.object_store] + set_atomic_write_dir = true + + [storage.object_store.s3] + keepalive_ms = 1 + send_buffer_size = 1 + recv_buffer_size = 1 + nodelay = false + + [storage.object_store.s3.developer] + retry_unknown_service_error = true + retryable_service_error_codes = ['dummy'] + + + "#, + ) + .unwrap(); + + assert!(config.storage.object_store.set_atomic_write_dir); + assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1)); + assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1)); + assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1)); + assert_eq!(config.storage.object_store.s3.nodelay, Some(false)); + assert!( + config + .storage + .object_store + .s3 + .developer + .retry_unknown_service_error + ); + assert_eq!( + config + .storage + .object_store + .s3 + .developer + .retryable_service_error_codes, + vec!["dummy".to_string()] + ); + } + } } diff --git a/src/config/example.toml b/src/config/example.toml index b35590c85059..27bbea13ade1 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -190,7 +190,7 @@ recent_filter_layers = 6 recent_filter_rotate_interval_ms = 10000 [storage.object_store] -object_store_set_atomic_write_dir = false +set_atomic_write_dir = false [storage.object_store.retry] req_backoff_interval_ms = 1000 @@ -214,15 +214,15 @@ list_attempt_timeout_ms = 600000 list_retry_attempts = 3 [storage.object_store.s3] -object_store_keepalive_ms = 600000 -object_store_recv_buffer_size = 2097152 -object_store_nodelay = true +keepalive_ms = 600000 +recv_buffer_size = 2097152 +nodelay = true retry_unknown_service_error = false identity_resolution_timeout_s = 5 [storage.object_store.s3.developer] -object_store_retry_unknown_service_error = false -object_store_retryable_service_error_codes = ["SlowDown", "TooManyRequests"] +retry_unknown_service_error = false +retryable_service_error_codes = ["SlowDown", "TooManyRequests"] use_opendal = false [system] diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 87ccc7b96caf..380cce6a8ebe 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -19,6 +19,7 @@ use risingwave_common::catalog::{ ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, TableId, UserId, }; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_pb::catalog::PbSecretRef; use risingwave_pb::stream_plan::PbSinkDesc; use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType}; @@ -83,7 +84,7 @@ impl SinkDesc { owner: UserId, connection_id: Option, dependent_relations: Vec, - secret_ref: BTreeMap, + secret_ref: BTreeMap, ) -> SinkCatalog { SinkCatalog { id: self.id, diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index f02eb2cdcf9e..206236970d91 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -25,7 +25,7 @@ use risingwave_common::catalog::{ use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::{ - PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus, + PbCreateType, PbSecretRef, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus, }; use super::{ @@ -339,7 +339,7 @@ pub struct SinkCatalog { pub create_type: CreateType, /// The secret reference for the sink, mapping from property name to secret id. - pub secret_ref: BTreeMap, + pub secret_ref: BTreeMap, } impl SinkCatalog { diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index bdd923f786ec..8102f03355e8 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -443,6 +443,10 @@ impl SinkImpl { pub fn is_sink_into_table(&self) -> bool { matches!(self, SinkImpl::Table(_)) } + + pub fn is_blackhole(&self) -> bool { + matches!(self, SinkImpl::BlackHole(_)) + } } pub fn build_sink(param: SinkParam) -> Result { diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index a7f839527194..93be066d4e72 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -555,12 +555,12 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an *id = *connection_rewrite.get(id).unwrap(); } for secret_id in s.secret_ref.values_mut() { - *secret_id = *secret_rewrite.get(secret_id).unwrap(); + secret_id.secret_id = *secret_rewrite.get(&secret_id.secret_id).unwrap(); } object_dependencies.extend(s.secret_ref.values().map(|id| { object_dependency::ActiveModel { id: NotSet, - oid: Set(*id as _), + oid: Set(id.secret_id as _), used_by: Set(s.id as _), } })); diff --git a/src/frontend/src/binder/bind_param.rs b/src/frontend/src/binder/bind_param.rs index 6c3be04d4ee9..b4bbaf420e0c 100644 --- a/src/frontend/src/binder/bind_param.rs +++ b/src/frontend/src/binder/bind_param.rs @@ -21,7 +21,7 @@ use risingwave_common::types::{Datum, ScalarImpl}; use super::statement::RewriteExprsRecursive; use super::BoundStatement; use crate::error::{ErrorCode, Result}; -use crate::expr::{Expr, ExprImpl, ExprRewriter, Literal}; +use crate::expr::{default_rewrite_expr, Expr, ExprImpl, ExprRewriter, Literal}; /// Rewrites parameter expressions to literals. pub(crate) struct ParamRewriter { @@ -47,22 +47,7 @@ impl ExprRewriter for ParamRewriter { if self.error.is_some() { return expr; } - match expr { - ExprImpl::InputRef(inner) => self.rewrite_input_ref(*inner), - ExprImpl::Literal(inner) => self.rewrite_literal(*inner), - ExprImpl::FunctionCall(inner) => self.rewrite_function_call(*inner), - ExprImpl::FunctionCallWithLambda(inner) => { - self.rewrite_function_call_with_lambda(*inner) - } - ExprImpl::AggCall(inner) => self.rewrite_agg_call(*inner), - ExprImpl::Subquery(inner) => self.rewrite_subquery(*inner), - ExprImpl::CorrelatedInputRef(inner) => self.rewrite_correlated_input_ref(*inner), - ExprImpl::TableFunction(inner) => self.rewrite_table_function(*inner), - ExprImpl::WindowFunction(inner) => self.rewrite_window_function(*inner), - ExprImpl::UserDefinedFunction(inner) => self.rewrite_user_defined_function(*inner), - ExprImpl::Parameter(inner) => self.rewrite_parameter(*inner), - ExprImpl::Now(inner) => self.rewrite_now(*inner), - } + default_rewrite_expr(self, expr) } fn rewrite_subquery(&mut self, mut subquery: crate::expr::Subquery) -> ExprImpl { diff --git a/src/frontend/src/expr/expr_rewriter.rs b/src/frontend/src/expr/expr_rewriter.rs index 4d5b960d654d..6300f9d5e885 100644 --- a/src/frontend/src/expr/expr_rewriter.rs +++ b/src/frontend/src/expr/expr_rewriter.rs @@ -12,33 +12,59 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::util::recursive::{tracker, Recurse}; + use super::{ AggCall, CorrelatedInputRef, ExprImpl, FunctionCall, FunctionCallWithLambda, InputRef, Literal, - Parameter, Subquery, TableFunction, UserDefinedFunction, WindowFunction, + Parameter, Subquery, TableFunction, UserDefinedFunction, WindowFunction, EXPR_DEPTH_THRESHOLD, + EXPR_TOO_DEEP_NOTICE, }; use crate::expr::Now; +use crate::session::current::notice_to_user; + +/// The default implementation of [`ExprRewriter::rewrite_expr`] that simply dispatches to other +/// methods based on the type of the expression. +/// +/// You can use this function as a helper to reduce boilerplate code when implementing the trait. +// TODO: This is essentially a mimic of `super` pattern from OO languages. Ideally, we should +// adopt the style proposed in https://github.com/risingwavelabs/risingwave/issues/13477. +pub fn default_rewrite_expr( + rewriter: &mut R, + expr: ExprImpl, +) -> ExprImpl { + // TODO: Implementors may choose to not use this function at all, in which case we will fail + // to track the recursion and grow the stack as necessary. The current approach is only a + // best-effort attempt to prevent stack overflow. + tracker!().recurse(|t| { + if t.depth_reaches(EXPR_DEPTH_THRESHOLD) { + notice_to_user(EXPR_TOO_DEEP_NOTICE); + } + + match expr { + ExprImpl::InputRef(inner) => rewriter.rewrite_input_ref(*inner), + ExprImpl::Literal(inner) => rewriter.rewrite_literal(*inner), + ExprImpl::FunctionCall(inner) => rewriter.rewrite_function_call(*inner), + ExprImpl::FunctionCallWithLambda(inner) => { + rewriter.rewrite_function_call_with_lambda(*inner) + } + ExprImpl::AggCall(inner) => rewriter.rewrite_agg_call(*inner), + ExprImpl::Subquery(inner) => rewriter.rewrite_subquery(*inner), + ExprImpl::CorrelatedInputRef(inner) => rewriter.rewrite_correlated_input_ref(*inner), + ExprImpl::TableFunction(inner) => rewriter.rewrite_table_function(*inner), + ExprImpl::WindowFunction(inner) => rewriter.rewrite_window_function(*inner), + ExprImpl::UserDefinedFunction(inner) => rewriter.rewrite_user_defined_function(*inner), + ExprImpl::Parameter(inner) => rewriter.rewrite_parameter(*inner), + ExprImpl::Now(inner) => rewriter.rewrite_now(*inner), + } + }) +} /// By default, `ExprRewriter` simply traverses the expression tree and leaves nodes unchanged. /// Implementations can override a subset of methods and perform transformation on some particular /// types of expression. pub trait ExprRewriter { fn rewrite_expr(&mut self, expr: ExprImpl) -> ExprImpl { - match expr { - ExprImpl::InputRef(inner) => self.rewrite_input_ref(*inner), - ExprImpl::Literal(inner) => self.rewrite_literal(*inner), - ExprImpl::FunctionCall(inner) => self.rewrite_function_call(*inner), - ExprImpl::FunctionCallWithLambda(inner) => { - self.rewrite_function_call_with_lambda(*inner) - } - ExprImpl::AggCall(inner) => self.rewrite_agg_call(*inner), - ExprImpl::Subquery(inner) => self.rewrite_subquery(*inner), - ExprImpl::CorrelatedInputRef(inner) => self.rewrite_correlated_input_ref(*inner), - ExprImpl::TableFunction(inner) => self.rewrite_table_function(*inner), - ExprImpl::WindowFunction(inner) => self.rewrite_window_function(*inner), - ExprImpl::UserDefinedFunction(inner) => self.rewrite_user_defined_function(*inner), - ExprImpl::Parameter(inner) => self.rewrite_parameter(*inner), - ExprImpl::Now(inner) => self.rewrite_now(*inner), - } + default_rewrite_expr(self, expr) } fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl { diff --git a/src/frontend/src/expr/expr_visitor.rs b/src/frontend/src/expr/expr_visitor.rs index 4e0484397ab9..64b5c61b565d 100644 --- a/src/frontend/src/expr/expr_visitor.rs +++ b/src/frontend/src/expr/expr_visitor.rs @@ -12,10 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::util::recursive::{tracker, Recurse}; + use super::{ AggCall, CorrelatedInputRef, ExprImpl, FunctionCall, FunctionCallWithLambda, InputRef, Literal, Now, Parameter, Subquery, TableFunction, UserDefinedFunction, WindowFunction, + EXPR_DEPTH_THRESHOLD, EXPR_TOO_DEEP_NOTICE, }; +use crate::session::current::notice_to_user; + +/// The default implementation of [`ExprVisitor::visit_expr`] that simply dispatches to other +/// methods based on the type of the expression. +/// +/// You can use this function as a helper to reduce boilerplate code when implementing the trait. +// TODO: This is essentially a mimic of `super` pattern from OO languages. Ideally, we should +// adopt the style proposed in https://github.com/risingwavelabs/risingwave/issues/13477. +pub fn default_visit_expr(visitor: &mut V, expr: &ExprImpl) { + // TODO: Implementors may choose to not use this function at all, in which case we will fail + // to track the recursion and grow the stack as necessary. The current approach is only a + // best-effort attempt to prevent stack overflow. + tracker!().recurse(|t| { + if t.depth_reaches(EXPR_DEPTH_THRESHOLD) { + notice_to_user(EXPR_TOO_DEEP_NOTICE); + } + + match expr { + ExprImpl::InputRef(inner) => visitor.visit_input_ref(inner), + ExprImpl::Literal(inner) => visitor.visit_literal(inner), + ExprImpl::FunctionCall(inner) => visitor.visit_function_call(inner), + ExprImpl::FunctionCallWithLambda(inner) => { + visitor.visit_function_call_with_lambda(inner) + } + ExprImpl::AggCall(inner) => visitor.visit_agg_call(inner), + ExprImpl::Subquery(inner) => visitor.visit_subquery(inner), + ExprImpl::CorrelatedInputRef(inner) => visitor.visit_correlated_input_ref(inner), + ExprImpl::TableFunction(inner) => visitor.visit_table_function(inner), + ExprImpl::WindowFunction(inner) => visitor.visit_window_function(inner), + ExprImpl::UserDefinedFunction(inner) => visitor.visit_user_defined_function(inner), + ExprImpl::Parameter(inner) => visitor.visit_parameter(inner), + ExprImpl::Now(inner) => visitor.visit_now(inner), + } + }) +} /// Traverse an expression tree. /// @@ -27,20 +65,7 @@ use super::{ /// subqueries are not traversed. pub trait ExprVisitor { fn visit_expr(&mut self, expr: &ExprImpl) { - match expr { - ExprImpl::InputRef(inner) => self.visit_input_ref(inner), - ExprImpl::Literal(inner) => self.visit_literal(inner), - ExprImpl::FunctionCall(inner) => self.visit_function_call(inner), - ExprImpl::FunctionCallWithLambda(inner) => self.visit_function_call_with_lambda(inner), - ExprImpl::AggCall(inner) => self.visit_agg_call(inner), - ExprImpl::Subquery(inner) => self.visit_subquery(inner), - ExprImpl::CorrelatedInputRef(inner) => self.visit_correlated_input_ref(inner), - ExprImpl::TableFunction(inner) => self.visit_table_function(inner), - ExprImpl::WindowFunction(inner) => self.visit_window_function(inner), - ExprImpl::UserDefinedFunction(inner) => self.visit_user_defined_function(inner), - ExprImpl::Parameter(inner) => self.visit_parameter(inner), - ExprImpl::Now(inner) => self.visit_now(inner), - } + default_visit_expr(self, expr) } fn visit_function_call(&mut self, func_call: &FunctionCall) { func_call diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 03be40f955d7..d14d99766bcc 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -53,8 +53,8 @@ mod utils; pub use agg_call::AggCall; pub use correlated_input_ref::{CorrelatedId, CorrelatedInputRef, Depth}; pub use expr_mutator::ExprMutator; -pub use expr_rewriter::ExprRewriter; -pub use expr_visitor::ExprVisitor; +pub use expr_rewriter::{default_rewrite_expr, ExprRewriter}; +pub use expr_visitor::{default_visit_expr, ExprVisitor}; pub use function_call::{is_row_function, FunctionCall, FunctionCallDisplay}; pub use function_call_with_lambda::FunctionCallWithLambda; pub use input_ref::{input_ref_to_column_indices, InputRef, InputRefDisplay}; @@ -74,6 +74,10 @@ pub use user_defined_function::UserDefinedFunction; pub use utils::*; pub use window_function::WindowFunction; +const EXPR_DEPTH_THRESHOLD: usize = 30; +const EXPR_TOO_DEEP_NOTICE: &str = "Some expression is too complicated. \ +Consider simplifying or splitting the query if you encounter any issues."; + /// the trait of bound expressions pub trait Expr: Into { /// Get the return type of the expr diff --git a/src/frontend/src/optimizer/plan_expr_rewriter/const_eval_rewriter.rs b/src/frontend/src/optimizer/plan_expr_rewriter/const_eval_rewriter.rs index 43b76891f356..0844bdb33a85 100644 --- a/src/frontend/src/optimizer/plan_expr_rewriter/const_eval_rewriter.rs +++ b/src/frontend/src/optimizer/plan_expr_rewriter/const_eval_rewriter.rs @@ -13,7 +13,7 @@ // limitations under the License. use crate::error::RwError; -use crate::expr::{Expr, ExprImpl, ExprRewriter, Literal}; +use crate::expr::{default_rewrite_expr, Expr, ExprImpl, ExprRewriter, Literal}; pub(crate) struct ConstEvalRewriter { pub(crate) error: Option, @@ -31,21 +31,10 @@ impl ExprRewriter for ConstEvalRewriter { expr } } + } else if let ExprImpl::Parameter(_) = expr { + unreachable!("Parameter should not appear here. It will be replaced by a literal before this step.") } else { - match expr { - ExprImpl::InputRef(inner) => self.rewrite_input_ref(*inner), - ExprImpl::Literal(inner) => self.rewrite_literal(*inner), - ExprImpl::FunctionCall(inner) => self.rewrite_function_call(*inner), - ExprImpl::FunctionCallWithLambda(inner) => self.rewrite_function_call_with_lambda(*inner), - ExprImpl::AggCall(inner) => self.rewrite_agg_call(*inner), - ExprImpl::Subquery(inner) => self.rewrite_subquery(*inner), - ExprImpl::CorrelatedInputRef(inner) => self.rewrite_correlated_input_ref(*inner), - ExprImpl::TableFunction(inner) => self.rewrite_table_function(*inner), - ExprImpl::WindowFunction(inner) => self.rewrite_window_function(*inner), - ExprImpl::UserDefinedFunction(inner) => self.rewrite_user_defined_function(*inner), - ExprImpl::Parameter(_) => unreachable!("Parameter should not appear here. It will be replaced by a literal before this step."), - ExprImpl::Now(inner) => self.rewrite_now(*inner), - } + default_rewrite_expr(self, expr) } } } diff --git a/src/frontend/src/optimizer/plan_expr_visitor/expr_counter.rs b/src/frontend/src/optimizer/plan_expr_visitor/expr_counter.rs index 68ce5b93b044..b636218338c2 100644 --- a/src/frontend/src/optimizer/plan_expr_visitor/expr_counter.rs +++ b/src/frontend/src/optimizer/plan_expr_visitor/expr_counter.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use crate::expr::{ExprImpl, ExprType, ExprVisitor, FunctionCall}; +use crate::expr::{default_visit_expr, ExprImpl, ExprType, ExprVisitor, FunctionCall}; /// `ExprCounter` is used by `CseRewriter`. #[derive(Default)] @@ -35,20 +35,7 @@ impl ExprVisitor for CseExprCounter { return; } - match expr { - ExprImpl::InputRef(inner) => self.visit_input_ref(inner), - ExprImpl::Literal(inner) => self.visit_literal(inner), - ExprImpl::FunctionCall(inner) => self.visit_function_call(inner), - ExprImpl::FunctionCallWithLambda(inner) => self.visit_function_call_with_lambda(inner), - ExprImpl::AggCall(inner) => self.visit_agg_call(inner), - ExprImpl::Subquery(inner) => self.visit_subquery(inner), - ExprImpl::CorrelatedInputRef(inner) => self.visit_correlated_input_ref(inner), - ExprImpl::TableFunction(inner) => self.visit_table_function(inner), - ExprImpl::WindowFunction(inner) => self.visit_window_function(inner), - ExprImpl::UserDefinedFunction(inner) => self.visit_user_defined_function(inner), - ExprImpl::Parameter(inner) => self.visit_parameter(inner), - ExprImpl::Now(inner) => self.visit_now(inner), - } + default_visit_expr(self, expr); } fn visit_function_call(&mut self, func_call: &FunctionCall) { diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index d7187357e3fa..2567cbf01c6e 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -696,8 +696,10 @@ impl dyn PlanNode { } } -const PLAN_DEPTH_THRESHOLD: usize = 30; -const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \ +/// Recursion depth threshold for plan node visitor to send notice to user. +pub const PLAN_DEPTH_THRESHOLD: usize = 30; +/// Notice message for plan node visitor to send to user when the depth threshold is reached. +pub const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \ Consider simplifying or splitting the query if you encounter any issues."; impl dyn PlanNode { diff --git a/src/frontend/src/optimizer/plan_rewriter/mod.rs b/src/frontend/src/optimizer/plan_rewriter/mod.rs index 81c0809bae86..360c61d3121b 100644 --- a/src/frontend/src/optimizer/plan_rewriter/mod.rs +++ b/src/frontend/src/optimizer/plan_rewriter/mod.rs @@ -56,11 +56,20 @@ macro_rules! def_rewriter { pub trait PlanRewriter { paste! { fn rewrite(&mut self, plan: PlanRef) -> PlanRef{ - match plan.node_type() { - $( - PlanNodeType::[<$convention $name>] => self.[](plan.downcast_ref::<[<$convention $name>]>().unwrap()), - )* - } + use risingwave_common::util::recursive::{tracker, Recurse}; + use crate::session::current::notice_to_user; + + tracker!().recurse(|t| { + if t.depth_reaches(PLAN_DEPTH_THRESHOLD) { + notice_to_user(PLAN_TOO_DEEP_NOTICE); + } + + match plan.node_type() { + $( + PlanNodeType::[<$convention $name>] => self.[](plan.downcast_ref::<[<$convention $name>]>().unwrap()), + )* + } + }) } $( diff --git a/src/frontend/src/optimizer/plan_visitor/mod.rs b/src/frontend/src/optimizer/plan_visitor/mod.rs index 6156454fd3e8..63a0484cfdfd 100644 --- a/src/frontend/src/optimizer/plan_visitor/mod.rs +++ b/src/frontend/src/optimizer/plan_visitor/mod.rs @@ -93,11 +93,20 @@ macro_rules! def_visitor { paste! { fn visit(&mut self, plan: PlanRef) -> Self::Result { - match plan.node_type() { - $( - PlanNodeType::[<$convention $name>] => self.[](plan.downcast_ref::<[<$convention $name>]>().unwrap()), - )* - } + use risingwave_common::util::recursive::{tracker, Recurse}; + use crate::session::current::notice_to_user; + + tracker!().recurse(|t| { + if t.depth_reaches(PLAN_DEPTH_THRESHOLD) { + notice_to_user(PLAN_TOO_DEEP_NOTICE); + } + + match plan.node_type() { + $( + PlanNodeType::[<$convention $name>] => self.[](plan.downcast_ref::<[<$convention $name>]>().unwrap()), + )* + } + }) } $( diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 01993fd24108..8f372b58b17e 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -19,6 +19,7 @@ use risingwave_connector::source::kafka::private_link::{ insert_privatelink_broker_rewrite_map, CONNECTION_NAME_KEY, PRIVATELINK_ENDPOINT_KEY, }; use risingwave_connector::WithPropertiesExt; +use risingwave_pb::catalog::PbSecretRef; use risingwave_sqlparser::ast::{ CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, CreateSubscriptionStatement, SqlOption, Statement, Value, @@ -119,7 +120,7 @@ impl WithOptions { pub(crate) fn resolve_secret_in_with_options( _with_options: &mut WithOptions, _session: &SessionImpl, -) -> RwResult> { +) -> RwResult> { // todo: implement the function and take `resolve_privatelink_in_with_option` as reference Ok(BTreeMap::new()) diff --git a/src/meta/model_v2/migration/src/m20240525_090457_secret.rs b/src/meta/model_v2/migration/src/m20240525_090457_secret.rs index f16bfca5ec03..ed23085c6657 100644 --- a/src/meta/model_v2/migration/src/m20240525_090457_secret.rs +++ b/src/meta/model_v2/migration/src/m20240525_090457_secret.rs @@ -37,12 +37,22 @@ impl MigrationTrait for Migration { ) .await?; - // Add a new column to the table + // Add a new column to the `sink` table manager .alter_table( MigrationTable::alter() .table(Sink::Table) - .add_column(ColumnDef::new(Sink::SecretRef).json_binary()) + .add_column(ColumnDef::new(Sink::SecretRef).binary()) + .to_owned(), + ) + .await?; + + // Add a new column to the `source` table + manager + .alter_table( + MigrationTable::alter() + .table(Source::Table) + .add_column(ColumnDef::new(Source::SecretRef).binary()) .to_owned(), ) .await?; @@ -60,6 +70,14 @@ impl MigrationTrait for Migration { .to_owned(), ) .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Source::Table) + .drop_column(Source::SecretRef) + .to_owned(), + ) + .await?; Ok(()) } } @@ -77,3 +95,9 @@ enum Sink { Table, SecretRef, } + +#[derive(DeriveIden)] +enum Source { + Table, + SecretRef, +} diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 11c5209bdc56..116cb66cab1d 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -14,7 +14,7 @@ use std::collections::BTreeMap; -use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus}; +use risingwave_pb::catalog::{PbCreateType, PbSecretRef, PbStreamJobStatus}; use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState; use risingwave_pb::stream_plan::PbStreamNode; use sea_orm::entity::prelude::*; @@ -258,6 +258,55 @@ macro_rules! derive_array_from_blob { }; } +macro_rules! derive_btreemap_from_blob { + ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => { + #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)] + pub struct $struct_name(#[sea_orm] Vec); + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct $field_type { + #[prost(btree_map = "string, message")] + inner: BTreeMap<$key_type, $value_type>, + } + impl Eq for $field_type {} + + impl $struct_name { + pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> { + let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap(); + data.inner + } + + fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self { + Self(prost::Message::encode_to_vec(&$field_type { inner: val })) + } + } + + impl From> for $struct_name { + fn from(value: BTreeMap<$key_type, $value_type>) -> Self { + Self::from_protobuf(value) + } + } + + impl std::fmt::Debug for $struct_name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.to_protobuf().fmt(f) + } + } + + impl Default for $struct_name { + fn default() -> Self { + Self(vec![]) + } + } + + impl sea_orm::sea_query::Nullable for $struct_name { + fn null() -> Value { + Value::Bytes(None) + } + } + }; +} + pub(crate) use {derive_array_from_blob, derive_from_blob}; derive_from_json_struct!(I32Array, Vec); @@ -286,7 +335,7 @@ impl From>> for ActorUpstreamActors { } } -derive_from_json_struct!(SecretRef, BTreeMap); +derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap); derive_from_blob!(StreamNode, PbStreamNode); derive_from_blob!(DataType, risingwave_pb::data::PbDataType); diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs index 78d0806f98a5..25d6293b0b12 100644 --- a/src/meta/model_v2/src/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -72,7 +72,7 @@ pub struct Model { pub sink_from_name: String, pub sink_format_desc: Option, pub target_table: Option, - // `secret_ref` stores a json string, mapping from property name to secret id. + // `secret_ref` stores the mapping info mapping from property name to secret id and type. pub secret_ref: Option, } diff --git a/src/meta/model_v2/src/source.rs b/src/meta/model_v2/src/source.rs index be2d2f7110ca..a90f399e4b8c 100644 --- a/src/meta/model_v2/src/source.rs +++ b/src/meta/model_v2/src/source.rs @@ -19,8 +19,8 @@ use sea_orm::ActiveValue::Set; use serde::{Deserialize, Serialize}; use crate::{ - ColumnCatalogArray, ConnectionId, I32Array, Property, SourceId, StreamSourceInfo, TableId, - WatermarkDescArray, + ColumnCatalogArray, ConnectionId, I32Array, Property, SecretRef, SourceId, StreamSourceInfo, + TableId, WatermarkDescArray, }; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] @@ -39,6 +39,8 @@ pub struct Model { pub optional_associated_table_id: Option, pub connection_id: Option, pub version: i64, + // `secret_ref` stores the mapping info mapping from property name to secret id and type. + pub secret_ref: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -101,6 +103,7 @@ impl From for ActiveModel { optional_associated_table_id: Set(optional_associated_table_id), connection_id: Set(source.connection_id.map(|id| id as _)), version: Set(source.version as _), + secret_ref: Set(None), } } } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 5320d555b94f..fb8810071f16 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -864,7 +864,7 @@ impl CatalogController { /// collected pub async fn load_all_actors(&self) -> MetaResult { let inner = self.inner.read().await; - let actor_info: Vec<(ActorId, WorkerId, FragmentId, i32, Vec)> = Actor::find() + let actor_info: Vec<(ActorId, WorkerId, FragmentId, i32, I32Array)> = Actor::find() .select_only() .column(actor::Column::ActorId) .column(actor::Column::WorkerId) @@ -880,6 +880,7 @@ impl CatalogController { let mut fragment_infos = HashMap::new(); for (actor_id, worker_id, fragment_id, type_mask, state_table_ids) in actor_info { + let state_table_ids = state_table_ids.into_inner(); match fragment_infos.entry(fragment_id as crate::model::FragmentId) { Entry::Occupied(mut entry) => { let info: &mut InflightFragmentInfo = entry.get_mut(); diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 4fb5d086a060..b6997240f772 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -201,9 +201,9 @@ impl From> for PbSource { impl From> for PbSink { fn from(value: ObjectModel) -> Self { - let mut secret_ref_map: BTreeMap = BTreeMap::new(); + let mut secret_ref_map = BTreeMap::new(); if let Some(secret_ref) = value.0.secret_ref { - secret_ref_map = secret_ref.into_inner(); + secret_ref_map = secret_ref.to_protobuf(); } Self { id: value.0.sink_id as _, diff --git a/src/object_store/src/object/opendal_engine/fs.rs b/src/object_store/src/object/opendal_engine/fs.rs index 6274e3f4b6e5..2edaaa44d6bb 100644 --- a/src/object_store/src/object/opendal_engine/fs.rs +++ b/src/object_store/src/object/opendal_engine/fs.rs @@ -34,7 +34,7 @@ impl OpendalObjectStore { // Create fs backend builder. let mut builder = Fs::default(); builder.root(&root); - if config.object_store_set_atomic_write_dir { + if config.set_atomic_write_dir { let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR); builder.atomic_write_dir(&atomic_write_dir); } diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 86d62ed48357..5ba90ad93ccb 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -106,11 +106,11 @@ impl OpendalObjectStore { pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult { let mut client_builder = reqwest::ClientBuilder::new(); - if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() { + if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() { client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms)); } - if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() { + if let Some(nodelay) = config.s3.nodelay.as_ref() { client_builder = client_builder.tcp_nodelay(*nodelay); } diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index f1f569cb7d36..3ed5fc01ba40 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -618,19 +618,19 @@ impl S3ObjectStore { let mut http = hyper::client::HttpConnector::new(); // connection config - if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() { + if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() { http.set_keepalive(Some(Duration::from_millis(*keepalive_ms))); } - if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() { + if let Some(nodelay) = config.s3.nodelay.as_ref() { http.set_nodelay(*nodelay); } - if let Some(recv_buffer_size) = config.s3.object_store_recv_buffer_size.as_ref() { + if let Some(recv_buffer_size) = config.s3.recv_buffer_size.as_ref() { http.set_recv_buffer_size(Some(*recv_buffer_size)); } - if let Some(send_buffer_size) = config.s3.object_store_send_buffer_size.as_ref() { + if let Some(send_buffer_size) = config.s3.send_buffer_size.as_ref() { http.set_send_buffer_size(Some(*send_buffer_size)); } @@ -1043,7 +1043,7 @@ where Some(SdkError::ServiceError(e)) => { let retry = match e.err().code() { None => { - if config.s3.developer.object_store_retry_unknown_service_error + if config.s3.developer.retry_unknown_service_error || config.s3.retry_unknown_service_error { tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request."); @@ -1056,7 +1056,7 @@ where if config .s3 .developer - .object_store_retryable_service_error_codes + .retryable_service_error_codes .iter() .any(|s| s.as_str().eq_ignore_ascii_case(code)) { diff --git a/src/prost/build.rs b/src/prost/build.rs index 961dbe196944..4e939f46abb6 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -66,6 +66,7 @@ fn main() -> Result<(), Box> { ".plan_common.ExternalTableDesc", ".hummock.CompactTask", ".catalog.StreamSourceInfo", + ".catalog.SecretRef", ".catalog.Source", ".catalog.Sink", ".catalog.View", @@ -111,6 +112,7 @@ fn main() -> Result<(), Box> { // The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr .type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]") .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]") + .type_attribute("catalog.SecretRef", "#[derive(Eq, Hash)]") .type_attribute("catalog.IndexColumnProperties", "#[derive(Eq, Hash)]") .type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]") .type_attribute("data.DataType", "#[derive(Eq, Hash)]") diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 41a55518158d..0336f6f542fa 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -380,7 +380,7 @@ pub async fn compact( .storage_opts .object_store_config .s3 - .object_store_recv_buffer_size + .recv_buffer_size .unwrap_or(6 * 1024 * 1024) as u64, capacity as u64, ) * compact_task.splits.len() as u64; diff --git a/src/storage/src/row_serde/value_serde.rs b/src/storage/src/row_serde/value_serde.rs index c4d4ef8b808f..3a9156ca16d1 100644 --- a/src/storage/src/row_serde/value_serde.rs +++ b/src/storage/src/row_serde/value_serde.rs @@ -97,7 +97,8 @@ impl ValueRowSerdeNew for ColumnAwareSerde { } } - let column_with_default = table_columns.iter().enumerate().filter_map(|(i, c)| { + let partial_columns = value_indices.iter().map(|idx| &table_columns[*idx]); + let column_with_default = partial_columns.enumerate().filter_map(|(i, c)| { if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { snapshot_value, expr, @@ -305,4 +306,59 @@ mod tests { // drop all columns is now allowed assert!(try_drop_invalid_columns(&row_bytes, &HashSet::new()).is_none()); } + + #[test] + fn test_deserialize_partial_columns() { + let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)]; + let row1 = OwnedRow::new(vec![ + Some(Int16(5)), + Some(Utf8("abc".into())), + Some(Utf8("ABC".into())), + ]); + let serializer = column_aware_row_encoding::Serializer::new(&column_ids); + let row_bytes = serializer.serialize(row1); + + let deserializer = column_aware_row_encoding::Deserializer::new( + &[ColumnId::new(2), ColumnId::new(0)], + Arc::from(vec![DataType::Varchar, DataType::Int16].into_boxed_slice()), + std::iter::empty(), + ); + let decoded = deserializer.deserialize(&row_bytes[..]); + assert_eq!( + decoded.unwrap(), + vec![Some(Utf8("ABC".into())), Some(Int16(5))] + ); + } + + #[test] + fn test_deserialize_partial_columns_with_default_columns() { + let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)]; + let row1 = OwnedRow::new(vec![ + Some(Int16(5)), + Some(Utf8("abc".into())), + Some(Utf8("ABC".into())), + ]); + let serializer = column_aware_row_encoding::Serializer::new(&column_ids); + let row_bytes = serializer.serialize(row1); + + // default column of ColumnId::new(3) + let default_columns = vec![(1, Some(Utf8("new column".into())))]; + + let deserializer = column_aware_row_encoding::Deserializer::new( + &[ColumnId::new(2), ColumnId::new(3), ColumnId::new(0)], + Arc::from( + vec![DataType::Varchar, DataType::Varchar, DataType::Int16].into_boxed_slice(), + ), + default_columns.into_iter(), + ); + let decoded = deserializer.deserialize(&row_bytes[..]); + assert_eq!( + decoded.unwrap(), + vec![ + Some(Utf8("ABC".into())), + Some(Utf8("new column".into())), + Some(Int16(5)) + ] + ); + } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 7dc697d84a13..e5f2801b0502 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::ops::Bound::{self, Excluded, Included, Unbounded}; -use std::ops::{Index, RangeBounds}; +use std::ops::RangeBounds; use std::sync::Arc; use auto_enums::auto_enum; @@ -30,6 +30,7 @@ use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{self, OwnedRow, Row, RowExt}; +use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::row_serde::*; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; @@ -82,7 +83,8 @@ pub struct StorageTableInner { /// Mapping from column id to column index for deserializing the row. mapping: Arc, - /// Row deserializer to deserialize the whole value in storage to a row. + /// Row deserializer to deserialize the value in storage to a row. + /// The row can be either complete or partial, depending on whether the row encoding is versioned. row_serde: Arc, /// Indices of primary key. @@ -247,29 +249,45 @@ impl StorageTableInner { } } - let output_row_in_value_indices = value_output_indices - .iter() - .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap()) - .collect_vec(); let output_row_in_key_indices = key_output_indices .iter() .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap()) .collect_vec(); let schema = Schema::new(output_columns.iter().map(Into::into).collect()); - let mapping = ColumnMapping::new(output_row_in_value_indices); - let pk_data_types = pk_indices .iter() .map(|i| table_columns[*i].data_type.clone()) .collect(); let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types); - - let row_serde = { + let (row_serde, mapping) = { if versioned { - ColumnAwareSerde::new(value_indices.into(), table_columns.into()).into() + let value_output_indices_dedup = value_output_indices + .iter() + .unique() + .copied() + .collect::>(); + let output_row_in_value_output_indices_dedup = value_output_indices + .iter() + .map(|&di| { + value_output_indices_dedup + .iter() + .position(|&pi| di == pi) + .unwrap() + }) + .collect_vec(); + let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup); + let serde = + ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into()); + (serde.into(), mapping) } else { - BasicSerde::new(value_indices.into(), table_columns.into()).into() + let output_row_in_value_indices = value_output_indices + .iter() + .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap()) + .collect_vec(); + let mapping = ColumnMapping::new(output_row_in_value_indices); + let serde = BasicSerde::new(value_indices.into(), table_columns.into()); + (serde.into(), mapping) } }; @@ -367,11 +385,10 @@ impl StorageTableInner { if let Some(value) = self.store.get(serialized_pk, epoch, read_options).await? { // Refer to [`StorageTableInnerIterInner::new`] for necessity of `validate_read_epoch`. self.store.validate_read_epoch(wait_epoch)?; - let full_row = self.row_serde.deserialize(&value)?; - let result_row_in_value = self - .mapping - .project(OwnedRow::new(full_row)) - .into_owned_row(); + + let row = self.row_serde.deserialize(&value)?; + let result_row_in_value = self.mapping.project(OwnedRow::new(row)); + match &self.key_output_indices { Some(key_output_indices) => { let result_row_in_key = @@ -386,20 +403,23 @@ impl StorageTableInner { .unwrap(); result_row_vec.push( result_row_in_value - .index(*item_position_in_value_indices) - .clone(), + .datum_at(*item_position_in_value_indices) + .to_owned_datum(), ); } else { let item_position_in_pk_indices = key_output_indices.iter().position(|p| idx == p).unwrap(); - result_row_vec - .push(result_row_in_key.index(item_position_in_pk_indices).clone()); + result_row_vec.push( + result_row_in_key + .datum_at(item_position_in_pk_indices) + .to_owned_datum(), + ); } } let result_row = OwnedRow::new(result_row_vec); Ok(Some(result_row)) } - None => Ok(Some(result_row_in_value)), + None => Ok(Some(result_row_in_value.into_owned_row())), } } else { Ok(None) @@ -875,11 +895,8 @@ impl StorageTableInnerIterInner { .await? { let (table_key, value) = (k.user_key.table_key, v); - let full_row = self.row_deserializer.deserialize(value)?; - let result_row_in_value = self - .mapping - .project(OwnedRow::new(full_row)) - .into_owned_row(); + let row = self.row_deserializer.deserialize(value)?; + let result_row_in_value = self.mapping.project(OwnedRow::new(row)); match &self.key_output_indices { Some(key_output_indices) => { let result_row_in_key = match self.pk_serializer.clone() { @@ -901,14 +918,17 @@ impl StorageTableInnerIterInner { .unwrap(); result_row_vec.push( result_row_in_value - .index(*item_position_in_value_indices) - .clone(), + .datum_at(*item_position_in_value_indices) + .to_owned_datum(), ); } else { let item_position_in_pk_indices = key_output_indices.iter().position(|p| idx == p).unwrap(); - result_row_vec - .push(result_row_in_key.index(item_position_in_pk_indices).clone()); + result_row_vec.push( + result_row_in_key + .datum_at(item_position_in_pk_indices) + .to_owned_datum(), + ); } } let row = OwnedRow::new(result_row_vec); @@ -922,7 +942,7 @@ impl StorageTableInnerIterInner { None => { yield KeyedRow { vnode_prefixed_key: table_key.copy_into(), - row: result_row_in_value, + row: result_row_in_value.into_owned_row(), } } } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index e8dfc09e9130..c0ee1330805b 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -183,6 +183,11 @@ impl SinkExecutor { let re_construct_with_sink_pk = need_advance_delete && self.sink_param.sink_type == SinkType::Upsert && !self.sink_param.downstream_pk.is_empty(); + // Don't compact chunk for blackhole sink for better benchmark performance. + let compact_chunk = !self.sink.is_blackhole(); + tracing::info!("Sink info: sink_id: {} actor_id: {}, need_advance_delete: {}, re_construct_with_sink_pk: {}", + sink_id, actor_id, need_advance_delete, re_construct_with_sink_pk); + let processed_input = Self::process_msg( input, self.sink_param.sink_type, @@ -193,6 +198,7 @@ impl SinkExecutor { self.input_data_types, self.sink_param.downstream_pk.clone(), metrics.sink_chunk_buffer_size, + compact_chunk, ); if self.sink.is_sink_into_table() { @@ -298,6 +304,7 @@ impl SinkExecutor { input_data_types: Vec, down_stream_pk: Vec, sink_chunk_buffer_size_metrics: LabelGuardedIntGauge<3>, + compact_chunk: bool, ) { // need to buffer chunks during one barrier if need_advance_delete || re_construct_with_sink_pk { @@ -362,10 +369,12 @@ impl SinkExecutor { for msg in input { match msg? { Message::Watermark(w) => yield Message::Watermark(w), - Message::Chunk(chunk) => { + Message::Chunk(mut chunk) => { // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE // V->V). - let chunk = merge_chunk_row(chunk, &stream_key); + if compact_chunk { + chunk = merge_chunk_row(chunk, &stream_key); + } match sink_type { SinkType::AppendOnly => yield Message::Chunk(chunk), SinkType::ForceAppendOnly => {