From 03e6283cfee9ac316032cd1b2de3bd1c2a7ec34e Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Mon, 29 Apr 2024 13:36:40 +0800 Subject: [PATCH] feat(frontend): support alter rate limit in mv, source, table with source (#16399) --- ci/scripts/e2e-source-test.sh | 4 + .../basic/alter/rate_limit_source_kafka.slt | 130 ++++++++++++++++++ .../basic/alter/rate_limit_table_kafka.slt | 88 ++++++++++++ .../rate_limit/alter_rate_limit.slt.ignore | 35 +++++ proto/meta.proto | 1 + src/frontend/src/catalog/root_catalog.rs | 1 + .../src/handler/alter_streaming_rate_limit.rs | 89 ++++++++++++ src/frontend/src/handler/mod.rs | 39 ++++++ src/frontend/src/meta_client.rs | 21 ++- src/frontend/src/test_utils.rs | 11 +- src/meta/service/src/stream_service.rs | 2 +- src/sqlparser/src/ast/ddl.rs | 18 +++ src/sqlparser/src/keywords.rs | 1 + src/sqlparser/src/parser.rs | 45 +++++- src/tests/simulation/src/slt.rs | 1 + 15 files changed, 480 insertions(+), 6 deletions(-) create mode 100644 e2e_test/source/basic/alter/rate_limit_source_kafka.slt create mode 100644 e2e_test/source/basic/alter/rate_limit_table_kafka.slt create mode 100644 e2e_test/streaming/rate_limit/alter_rate_limit.slt.ignore create mode 100644 src/frontend/src/handler/alter_streaming_rate_limit.rs diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 5b776d91bd35..e91438402a61 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -156,6 +156,10 @@ risedev slt './e2e_test/source/basic/*.slt' risedev slt './e2e_test/source/basic/old_row_format_syntax/*.slt' risedev slt './e2e_test/source/basic/alter/kafka.slt' +echo "--- e2e, kafka alter source rate limit" +risedev slt './e2e_test/source/basic/alter/rate_limit_source_kafka.slt' +risedev slt './e2e_test/source/basic/alter/rate_limit_table_kafka.slt' + echo "--- e2e, kafka alter source" chmod +x ./scripts/source/prepare_data_after_alter.sh ./scripts/source/prepare_data_after_alter.sh 2 diff --git a/e2e_test/source/basic/alter/rate_limit_source_kafka.slt b/e2e_test/source/basic/alter/rate_limit_source_kafka.slt new file mode 100644 index 000000000000..7d991083345a --- /dev/null +++ b/e2e_test/source/basic/alter/rate_limit_source_kafka.slt @@ -0,0 +1,130 @@ +############## Create kafka seed data + +statement ok +create table kafka_seed_data (v1 int); + +statement ok +insert into kafka_seed_data select * from generate_series(1, 1000); + +############## Sink into kafka + +statement ok +create sink kafka_sink +from + kafka_seed_data with ( + properties.bootstrap.server = 'message_queue:29092', + topic = 'kafka_source', + type = 'append-only', + force_append_only='true', + connector = 'kafka' +); + +############## Source from kafka (rate_limit = 0) + +# Wait for the topic to create +skipif in-memory +sleep 5s + +statement ok +create source kafka_source (v1 int) with ( + connector = 'kafka', + topic = 'kafka_source', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest', +) FORMAT PLAIN ENCODE JSON + +statement ok +flush; + +############## Check data + +skipif in-memory +sleep 3s + +############## Create MV on source + +statement ok +SET STREAMING_RATE_LIMIT=0; + +statement ok +create materialized view rl_mv1 as select count(*) from kafka_source; + +statement ok +create materialized view rl_mv2 as select count(*) from kafka_source; + +statement ok +create materialized view rl_mv3 as select count(*) from kafka_source; + +statement ok +SET STREAMING_RATE_LIMIT=default; + +############## MVs should have 0 records, since source has (rate_limit = 0) + +statement ok +flush; + +query I +select * from rl_mv1; +---- +0 + +query I +select * from rl_mv2; +---- +0 + +query I +select * from rl_mv3; +---- +0 + +############## Alter Source (rate_limit = 0 --> rate_limit = 1000) + +skipif in-memory +query I +alter source kafka_source set streaming_rate_limit to 1000; + +skipif in-memory +query I +alter source kafka_source set streaming_rate_limit to default; + +skipif in-memory +sleep 3s + +skipif in-memory +query I +select count(*) > 0 from rl_mv1; +---- +t + +skipif in-memory +query I +select count(*) > 0 from rl_mv2; +---- +t + +skipif in-memory +query I +select count(*) > 0 from rl_mv3; +---- +t + +############## Cleanup + +statement ok +drop materialized view rl_mv1; + +statement ok +drop materialized view rl_mv2; + +statement ok +drop materialized view rl_mv3; + +statement ok +drop source kafka_source; + +statement ok +drop sink kafka_sink; + +statement ok +drop table kafka_seed_data; \ No newline at end of file diff --git a/e2e_test/source/basic/alter/rate_limit_table_kafka.slt b/e2e_test/source/basic/alter/rate_limit_table_kafka.slt new file mode 100644 index 000000000000..bf1fd6672d6e --- /dev/null +++ b/e2e_test/source/basic/alter/rate_limit_table_kafka.slt @@ -0,0 +1,88 @@ +############## Create kafka seed data + +statement ok +create table kafka_seed_data (v1 int); + +statement ok +insert into kafka_seed_data select * from generate_series(1, 1000); + +############## Sink into kafka + +statement ok +create sink kafka_sink +from + kafka_seed_data with ( + properties.bootstrap.server = 'message_queue:29092', + topic = 'kafka_source', + type = 'append-only', + force_append_only='true', + connector = 'kafka' +); + +############## Source from kafka (rate_limit = 0) + +statement ok +create table kafka_source (v1 int) with ( + connector = 'kafka', + topic = 'kafka_source', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest', + streaming_rate_limit = 0 +) FORMAT PLAIN ENCODE JSON + +statement ok +flush; + +############## Check data + +skipif in-memory +sleep 3s + +skipif in-memory +query I +select count(*) from kafka_source; +---- +0 + +############## Can still insert data when rate limit = 0 + +statement ok +insert into kafka_source values(1); + +statement ok +flush; + +query I +select count(*) from kafka_source; +---- +1 + +############## Alter source (rate_limit = 0 --> rate_limit = 1000) + +skipif in-memory +query I +alter table kafka_source set streaming_rate_limit to 1000; + +skipif in-memory +query I +alter table kafka_source set streaming_rate_limit to default; + +skipif in-memory +sleep 3s + +skipif in-memory +query I +select count(*) > 1 from kafka_source; +---- +t + +############## Cleanup + +statement ok +drop table kafka_source; + +statement ok +drop sink kafka_sink; + +statement ok +drop table kafka_seed_data; \ No newline at end of file diff --git a/e2e_test/streaming/rate_limit/alter_rate_limit.slt.ignore b/e2e_test/streaming/rate_limit/alter_rate_limit.slt.ignore new file mode 100644 index 000000000000..7afe279a80f1 --- /dev/null +++ b/e2e_test/streaming/rate_limit/alter_rate_limit.slt.ignore @@ -0,0 +1,35 @@ +-- This test is ignored until alter mv rate limit is fully supported. + +statement ok +CREATE TABLE t (v1 int); + +statement ok +INSERT INTO t VALUES (1); + +statement ok +flush; + +statement ok +SET BACKGROUND_DDL=true; + +statement ok +CREATE MATERIALIZED VIEW streaming_rate_limit_0 with ( streaming_rate_limit = 0 ) AS SELECT * FROM t; + +skipif in-memory +sleep 1s + +query I +select progress from rw_ddl_progress; +---- +0.00% + +statement ok +ALTER MATERIALIZED VIEW streaming_rate_limit_0 SET STREAMING_RATE_LIMIT = 1; + +statement ok +wait; + +query I +select * from streaming_rate_limit_0; +---- +1 \ No newline at end of file diff --git a/proto/meta.proto b/proto/meta.proto index dadc5b364c62..149b2940e2a2 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -254,6 +254,7 @@ enum ThrottleTarget { THROTTLE_TARGET_UNSPECIFIED = 0; SOURCE = 1; MV = 2; + TABLE_WITH_SOURCE = 3; } message ApplyThrottleRequest { diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index 88e75f4ad90d..7a99c199446d 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -597,6 +597,7 @@ impl Catalog { )) } + /// Used to get `TableCatalog` for Materialized Views, Tables and Indexes. pub fn get_table_by_name<'a>( &self, db_name: &str, diff --git a/src/frontend/src/handler/alter_streaming_rate_limit.rs b/src/frontend/src/handler/alter_streaming_rate_limit.rs new file mode 100644 index 000000000000..431676362877 --- /dev/null +++ b/src/frontend/src/handler/alter_streaming_rate_limit.rs @@ -0,0 +1,89 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail; +use risingwave_pb::meta::PbThrottleTarget; +use risingwave_sqlparser::ast::ObjectName; + +use super::{HandlerArgs, RwPgResponse}; +use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::table_catalog::TableType; +use crate::error::{ErrorCode, Result}; +use crate::Binder; + +pub async fn handle_alter_streaming_rate_limit( + handler_args: HandlerArgs, + kind: PbThrottleTarget, + table_name: ObjectName, + rate_limit: i32, +) -> Result { + let session = handler_args.session; + let db_name = session.database(); + let (schema_name, real_table_name) = + Binder::resolve_schema_qualified_name(db_name, table_name.clone())?; + let search_path = session.config().search_path(); + let user_name = &session.auth_context().user_name; + + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let (stmt_type, id) = match kind { + PbThrottleTarget::Mv => { + let reader = session.env().catalog_reader().read_guard(); + let (table, schema_name) = + reader.get_table_by_name(db_name, schema_path, &real_table_name)?; + if table.table_type != TableType::MaterializedView { + return Err(ErrorCode::InvalidInputSyntax(format!( + "\"{table_name}\" is not a materialized view", + )) + .into()); + } + session.check_privilege_for_drop_alter(schema_name, &**table)?; + (StatementType::ALTER_MATERIALIZED_VIEW, table.id.table_id) + } + PbThrottleTarget::Source => { + let reader = session.env().catalog_reader().read_guard(); + let (source, schema_name) = + reader.get_source_by_name(db_name, schema_path, &real_table_name)?; + session.check_privilege_for_drop_alter(schema_name, &**source)?; + (StatementType::ALTER_SOURCE, source.id) + } + PbThrottleTarget::TableWithSource => { + let reader = session.env().catalog_reader().read_guard(); + let (table, schema_name) = + reader.get_table_by_name(db_name, schema_path, &real_table_name)?; + session.check_privilege_for_drop_alter(schema_name, &**table)?; + // Get the corresponding source catalog. + let source_id = if let Some(id) = table.associated_source_id { + id.table_id() + } else { + bail!("ALTER STREAMING_RATE_LIMIT is not for table without source") + }; + (StatementType::ALTER_SOURCE, source_id) + } + _ => bail!("Unsupported throttle target: {:?}", kind), + }; + + let meta_client = session.env().meta_client(); + + let rate_limit = if rate_limit < 0 { + None + } else { + Some(rate_limit as u32) + }; + + meta_client.apply_throttle(kind, id, rate_limit).await?; + + Ok(PgResponse::empty_result(stmt_type)) +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 80c91485c7a7..794a5a712505 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -27,6 +27,7 @@ use pgwire::types::{Format, Row}; use risingwave_common::bail_not_implemented; use risingwave_common::types::Fields; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_pb::meta::PbThrottleTarget; use risingwave_sqlparser::ast::*; use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt}; @@ -44,6 +45,7 @@ mod alter_rename; mod alter_set_schema; mod alter_source_column; mod alter_source_with_sr; +mod alter_streaming_rate_limit; mod alter_system; mod alter_table_column; mod alter_table_with_sr; @@ -646,6 +648,18 @@ pub async fn handle( name, operation: AlterTableOperation::RefreshSchema, } => alter_table_with_sr::handle_refresh_schema(handler_args, name).await, + Statement::AlterTable { + name, + operation: AlterTableOperation::SetStreamingRateLimit { rate_limit }, + } => { + alter_streaming_rate_limit::handle_alter_streaming_rate_limit( + handler_args, + PbThrottleTarget::TableWithSource, + name, + rate_limit, + ) + .await + } Statement::AlterIndex { name, operation: AlterIndexOperation::RenameIndex { index_name }, @@ -750,6 +764,19 @@ pub async fn handle( .await } } + Statement::AlterView { + materialized, + name, + operation: AlterViewOperation::SetStreamingRateLimit { rate_limit }, + } if materialized => { + alter_streaming_rate_limit::handle_alter_streaming_rate_limit( + handler_args, + PbThrottleTarget::Mv, + name, + rate_limit, + ) + .await + } Statement::AlterSink { name, operation: AlterSinkOperation::RenameSink { sink_name }, @@ -887,6 +914,18 @@ pub async fn handle( name, operation: AlterSourceOperation::RefreshSchema, } => alter_source_with_sr::handler_refresh_schema(handler_args, name).await, + Statement::AlterSource { + name, + operation: AlterSourceOperation::SetStreamingRateLimit { rate_limit }, + } => { + alter_streaming_rate_limit::handle_alter_streaming_rate_limit( + handler_args, + PbThrottleTarget::Source, + name, + rate_limit, + ) + .await + } Statement::AlterFunction { name, args, diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 3cc7e22cf8b2..70b53cb82f9b 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -33,7 +33,7 @@ use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistributi use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; -use risingwave_pb::meta::EventLog; +use risingwave_pb::meta::{EventLog, PbThrottleTarget}; use risingwave_rpc_client::error::Result; use risingwave_rpc_client::{HummockMetaClient, MetaClient}; @@ -117,6 +117,13 @@ pub trait FrontendMetaClient: Send + Sync { async fn list_all_nodes(&self) -> Result>; async fn list_compact_task_progress(&self) -> Result>; + + async fn apply_throttle( + &self, + kind: PbThrottleTarget, + id: u32, + rate_limit: Option, + ) -> Result<()>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -293,4 +300,16 @@ impl FrontendMetaClient for FrontendMetaClientImpl { async fn list_compact_task_progress(&self) -> Result> { self.0.list_compact_task_progress().await } + + async fn apply_throttle( + &self, + kind: PbThrottleTarget, + id: u32, + rate_limit: Option, + ) -> Result<()> { + self.0 + .apply_throttle(kind, id, rate_limit) + .await + .map(|_| ()) + } } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index bda668379725..47a9900752ba 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -55,7 +55,7 @@ use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistributi use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; -use risingwave_pb::meta::{EventLog, PbTableParallelism, SystemParams}; +use risingwave_pb::meta::{EventLog, PbTableParallelism, PbThrottleTarget, SystemParams}; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_pb::user::update_user_request::UpdateField; use risingwave_pb::user::{GrantPrivilege, UserInfo}; @@ -1059,6 +1059,15 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn recover(&self) -> RpcResult<()> { unimplemented!() } + + async fn apply_throttle( + &self, + _kind: PbThrottleTarget, + _id: u32, + _rate_limit: Option, + ) -> RpcResult<()> { + unimplemented!() + } } #[cfg(test)] diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index cf9a8b1a3e48..be520132b167 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -108,7 +108,7 @@ impl StreamManagerService for StreamServiceImpl { let request = request.into_inner(); let actor_to_apply = match request.kind() { - ThrottleTarget::Source => { + ThrottleTarget::Source | ThrottleTarget::TableWithSource => { self.metadata_manager .update_source_rate_limit_by_source_id(request.id as SourceId, request.rate) .await? diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index a166182ac78e..3230ab99941b 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -104,6 +104,10 @@ pub enum AlterTableOperation { deferred: bool, }, RefreshSchema, + /// `SET STREAMING_RATE_LIMIT TO ` + SetStreamingRateLimit { + rate_limit: i32, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -138,6 +142,10 @@ pub enum AlterViewOperation { parallelism: SetVariableValue, deferred: bool, }, + /// `SET STREAMING_RATE_LIMIT TO ` + SetStreamingRateLimit { + rate_limit: i32, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -190,6 +198,7 @@ pub enum AlterSourceOperation { SetSchema { new_schema_name: ObjectName }, FormatEncode { connector_schema: ConnectorSchema }, RefreshSchema, + SetStreamingRateLimit { rate_limit: i32 }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -301,6 +310,9 @@ impl fmt::Display for AlterTableOperation { AlterTableOperation::RefreshSchema => { write!(f, "REFRESH SCHEMA") } + AlterTableOperation::SetStreamingRateLimit { rate_limit } => { + write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit) + } } } } @@ -349,6 +361,9 @@ impl fmt::Display for AlterViewOperation { if *deferred { " DEFERRED" } else { "" } ) } + AlterViewOperation::SetStreamingRateLimit { rate_limit } => { + write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit) + } } } } @@ -428,6 +443,9 @@ impl fmt::Display for AlterSourceOperation { AlterSourceOperation::RefreshSchema => { write!(f, "REFRESH SCHEMA") } + AlterSourceOperation::SetStreamingRateLimit { rate_limit } => { + write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit) + } } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 5647fde58e4e..8016eae40655 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -487,6 +487,7 @@ define_keywords!( STDDEV_SAMP, STDIN, STORED, + STREAMING_RATE_LIMIT, STRING, STRUCT, SUBMULTISET, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 2cc74c8d0a39..1d820ce53ff7 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3260,8 +3260,13 @@ impl Parser { parallelism: value, deferred, } + } else if let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? { + AlterTableOperation::SetStreamingRateLimit { rate_limit } } else { - return self.expected("SCHEMA/PARALLELISM after SET", self.peek_token()); + return self.expected( + "SCHEMA/PARALLELISM/STREAMING_RATE_LIMIT after SET", + self.peek_token(), + ); } } else if self.parse_keyword(Keyword::DROP) { let _ = self.parse_keyword(Keyword::COLUMN); @@ -3318,6 +3323,31 @@ impl Parser { }) } + /// STREAMING_RATE_LIMIT = default | NUMBER + /// STREAMING_RATE_LIMIT TO default | NUMBER + pub fn parse_alter_streaming_rate_limit(&mut self) -> Result, ParserError> { + if !self.parse_keyword(Keyword::STREAMING_RATE_LIMIT) { + return Ok(None); + } + if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() { + return self.expected( + "TO or = after ALTER TABLE SET STREAMING_RATE_LIMIT", + self.peek_token(), + ); + } + let rate_limit = if self.parse_keyword(Keyword::DEFAULT) { + -1 + } else { + let s = self.parse_number_value()?; + if let Ok(n) = s.parse::() { + n + } else { + return self.expected("number or DEFAULT", self.peek_token()); + } + }; + Ok(Some(rate_limit)) + } + pub fn parse_alter_index(&mut self) -> Result { let index_name = self.parse_object_name()?; let operation = if self.parse_keyword(Keyword::RENAME) { @@ -3397,8 +3427,15 @@ impl Parser { parallelism: value, deferred, } + } else if materialized + && let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? + { + AlterViewOperation::SetStreamingRateLimit { rate_limit } } else { - return self.expected("SCHEMA/PARALLELISM after SET", self.peek_token()); + return self.expected( + "SCHEMA/PARALLELISM/STREAMING_RATE_LIMIT after SET", + self.peek_token(), + ); } } else { return self.expected( @@ -3547,6 +3584,8 @@ impl Parser { AlterSourceOperation::SetSchema { new_schema_name: schema_name, } + } else if let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? { + AlterSourceOperation::SetStreamingRateLimit { rate_limit } } else { return self.expected("SCHEMA after SET", self.peek_token()); } @@ -3557,7 +3596,7 @@ impl Parser { AlterSourceOperation::RefreshSchema } else { return self.expected( - "RENAME, ADD COLUMN or OWNER TO or SET after ALTER SOURCE", + "RENAME, ADD COLUMN, OWNER TO, SET or STREAMING_RATE_LIMIT after ALTER SOURCE", self.peek_token(), ); }; diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index ec1aca82d36c..89e05b974e27 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -488,6 +488,7 @@ fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile { .expect("failed to get schema path"); let content = content .replace("127.0.0.1:29092", "192.168.11.1:29092") + .replace("localhost:29092", "192.168.11.1:29092") .replace( "/risingwave/avro-simple-schema.avsc", simple_avsc_full_path.to_str().unwrap(),