Skip to content

Commit

Permalink
feat: allow overwrite stream_rate_control in with clause (#13009)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Nov 16, 2023
1 parent 059a840 commit 942a526
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 26 deletions.
23 changes: 23 additions & 0 deletions e2e_test/ddl/throttle.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# streaming_rate_limit also applies to create sink and create source, please refer to
# e2e_test/source/basic/kafka.slt and e2e_test/sink/kafka/create_sink.slt for this part

statement ok
create table t1 (v1 int);

# tracked in https://github.com/risingwavelabs/risingwave/issues/13474
# create with duplicate streaming_rate_limit
# statement error
# create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1;

# create with unknown fields
statement error unexpected options in WITH clause
create materialized view mv1 with (streaming_rate_limit = 1000, unknown_field = 2000) as select * from t1;

statement ok
create materialized view mv1 with (streaming_rate_limit = 1000) as select * from t1;

statement ok
drop materialized view mv1;

statement ok
drop table t1;
29 changes: 29 additions & 0 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,38 @@ create sink multiple_pk from t_kafka with (
primary_key = 'id,v_varchar'
);

# throttle option
statement ok
create sink multiple_pk_throttle from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,v_varchar',
streaming_rate_limit = 200
);

statement ok
create sink multiple_pk_throttle_1
as select * from t_kafka
with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,v_varchar',
streaming_rate_limit = 200
);

statement ok
drop sink multiple_pk;

statement ok
drop sink multiple_pk_throttle;

statement ok
drop sink multiple_pk_throttle_1;

statement error Sink primary key column not found: invalid.
create sink invalid_pk_column from t_kafka with (
connector = 'kafka',
Expand Down
13 changes: 13 additions & 0 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,16 @@ create table s28 (id bytea, PRIMARY KEY(_rw_key)) with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE BYTES

# throttle option
statement ok
create table s29 (id bytea, PRIMARY KEY(_rw_key)) with (
connector = 'kafka',
topic = 'kafka_source_format_bytes',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
streaming_rate_limit = 200
) FORMAT PLAIN ENCODE BYTES

statement ok
CREATE TABLE mongo_customers (
_id BIGINT PRIMARY KEY,
Expand Down Expand Up @@ -842,6 +852,9 @@ drop table s27
statement ok
drop table s28

statement ok
drop table s29

statement ok
DROP TABLE mongo_customers;

Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use either::Either;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::acl::AclMode;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_pb::catalog::{CreateType, PbTable};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
Expand Down Expand Up @@ -165,6 +166,13 @@ pub async fn handle_create_mv(

let (mut table, graph) = {
let context = OptimizerContext::from_handler_args(handler_args);
if !context.with_options().is_empty() {
// get other useful fields by `remove`, the logic here is to reject unknown options.
return Err(RwError::from(ProtocolError(format!(
"unexpected options in WITH clause: {:?}",
context.with_options().keys()
))));
}

let has_order_by = !query.order_by.is_empty();
if has_order_by {
Expand Down
14 changes: 12 additions & 2 deletions src/frontend/src/optimizer/optimizer_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::expr::{CorrelatedId, SessionTimezone};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::PlanNodeId;
use crate::session::SessionImpl;
use crate::WithOptions;
use crate::utils::{OverwriteOptions, WithOptions};

const RESERVED_ID_NUM: u16 = 10000;

Expand All @@ -50,6 +50,9 @@ pub struct OptimizerContext {
session_timezone: RefCell<SessionTimezone>,
/// Store expr display id.
next_expr_display_id: RefCell<usize>,
/// Store the configs can be overwritten in with clause
/// if not specified, use the value from session variable.
overwrite_options: OverwriteOptions,
}

// Still not sure if we need to introduce "on_optimization_finish" or other common callback methods,
Expand All @@ -71,10 +74,11 @@ impl OptimizerContext {
}

/// Create a new [`OptimizerContext`] from the given [`HandlerArgs`] and [`ExplainOptions`].
pub fn new(handler_args: HandlerArgs, explain_options: ExplainOptions) -> Self {
pub fn new(mut handler_args: HandlerArgs, explain_options: ExplainOptions) -> Self {
let session_timezone = RefCell::new(SessionTimezone::new(
handler_args.session.config().get_timezone().to_owned(),
));
let overwrite_options = OverwriteOptions::new(&mut handler_args);
Self {
session_ctx: handler_args.session,
next_plan_node_id: RefCell::new(RESERVED_ID_NUM.into()),
Expand All @@ -87,6 +91,7 @@ impl OptimizerContext {
with_options: handler_args.with_options,
session_timezone,
next_expr_display_id: RefCell::new(RESERVED_ID_NUM.into()),
overwrite_options,
}
}

Expand All @@ -106,6 +111,7 @@ impl OptimizerContext {
with_options: Default::default(),
session_timezone: RefCell::new(SessionTimezone::new("UTC".into())),
next_expr_display_id: RefCell::new(0),
overwrite_options: OverwriteOptions::default(),
}
.into()
}
Expand Down Expand Up @@ -189,6 +195,10 @@ impl OptimizerContext {
&self.with_options
}

pub fn overwrite_options(&self) -> &OverwriteOptions {
&self.overwrite_options
}

pub fn session_ctx(&self) -> &Arc<SessionImpl> {
&self.session_ctx
}
Expand Down
7 changes: 1 addition & 6 deletions src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,7 @@ impl StreamNode for StreamFsFetch {
.map(|c| c.to_protobuf())
.collect_vec(),
properties: source_catalog.properties.clone().into_iter().collect(),
rate_limit: self
.base
.ctx()
.session_ctx()
.config()
.get_streaming_rate_limit(),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
});
NodeBody::StreamFsFetch(StreamFsFetchNode {
node_inner: source_inner,
Expand Down
7 changes: 1 addition & 6 deletions src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,7 @@ impl StreamNode for StreamSource {
.map(|c| c.to_protobuf())
.collect_vec(),
properties: source_catalog.properties.clone().into_iter().collect(),
rate_limit: self
.base
.ctx()
.session_ctx()
.config()
.get_streaming_rate_limit(),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
});
PbNodeBody::Source(SourceNode { source_inner })
}
Expand Down
8 changes: 2 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ impl StreamTableScan {
})
.collect_vec();

// TODO: snapshot read of upstream mview
let batch_plan_node = BatchPlanNode {
table_desc: Some(self.core.table_desc.to_protobuf()),
column_ids: upstream_column_ids.clone(),
Expand All @@ -301,12 +302,7 @@ impl StreamTableScan {
// The table desc used by backfill executor
table_desc: Some(self.core.table_desc.to_protobuf()),
state_table: Some(catalog),
rate_limit: self
.base
.ctx()
.session_ctx()
.config()
.get_streaming_rate_limit(),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
..Default::default()
});

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ mod index_set;
pub use index_set::*;
pub(crate) mod group_by;
pub mod infer_stmt_row_desc;

pub mod overwrite_options;
pub use group_by::*;
pub use overwrite_options::*;

use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef};

Expand Down
52 changes: 52 additions & 0 deletions src/frontend/src/utils/overwrite_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::handler::HandlerArgs;

#[derive(Debug, Clone, Default)]
pub struct OverwriteOptions {
pub streaming_rate_limit: Option<u32>,
// ttl has been deprecated
pub ttl: Option<u32>,
}

impl OverwriteOptions {
const STREAMING_RATE_LIMIT_KEY: &'static str = "streaming_rate_limit";
const TTL_KEY: &'static str = "ttl";

pub fn new(args: &mut HandlerArgs) -> Self {
let streaming_rate_limit = {
if let Some(x) = args
.with_options
.inner_mut()
.remove(Self::STREAMING_RATE_LIMIT_KEY)
{
// FIXME(tabVersion): validate the value
Some(x.parse::<u32>().unwrap())
} else {
args.session.config().get_streaming_rate_limit()
}
};
let ttl = args
.with_options
.inner_mut()
.remove(Self::TTL_KEY)
// FIXME(tabVersion): validate the value
.map(|x| x.parse::<u32>().unwrap());
Self {
streaming_rate_limit,
ttl,
}
}
}
2 changes: 1 addition & 1 deletion src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
)
.boxed();

let rate_limit = source.get_rate_limit().cloned().ok();
let rate_limit = source.rate_limit.map(|x| x as _);
Ok(FlowControlExecutor::new(executor, rate_limit).boxed())
}
}
2 changes: 1 addition & 1 deletion src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
}
}
};
let rate_limit = source.get_rate_limit().cloned().ok();
let rate_limit = source.rate_limit.map(|x| x as _);
Ok(FlowControlExecutor::new(executor, rate_limit).boxed())
} else {
// If there is no external stream source, then no data should be persisted. We pass a
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/from_proto/stream_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ impl ExecutorBuilder for StreamScanExecutorBuilder {
}
StreamScanType::Unspecified => unreachable!(),
};
let rate_limit = node.get_rate_limit().cloned().ok();
Ok(FlowControlExecutor::new(executor, rate_limit).boxed())
Ok(FlowControlExecutor::new(executor, node.rate_limit.map(|x| x as _)).boxed())
}
}

0 comments on commit 942a526

Please sign in to comment.