diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index d02645385b81f..cef342b201788 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -310,6 +310,11 @@ impl ConnectorProperties { ) } + pub fn enable_split_scale_in(&self) -> bool { + // enable split scale in just for Kinesis + matches!(self, ConnectorProperties::Kinesis(_)) + } + pub fn init_from_pb_source(&mut self, source: &PbSource) { dispatch_source_prop!(self, prop, prop.init_from_pb_source(source)) } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 4608027232a24..e208222de4908 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -194,6 +194,7 @@ struct ConnectorSourceWorkerHandle { handle: JoinHandle<()>, sync_call_tx: UnboundedSender>>, splits: SharedSplitMapRef, + enable_scale_in: bool, } impl ConnectorSourceWorkerHandle { @@ -285,7 +286,9 @@ impl SourceManagerCore { *fragment_id, prev_actor_splits, &discovered_splits, - SplitDiffOptions::default(), + SplitDiffOptions { + enable_scale_in: handle.enable_scale_in, + }, ) { split_assignment.insert(*fragment_id, change); } @@ -615,6 +618,7 @@ impl SourceManager { fragment_id, empty_actor_splits, &prev_splits, + // pre-allocate splits is the first time getting splits and it does not have scale in scene SplitDiffOptions::default(), ) .unwrap_or_default(); @@ -715,6 +719,7 @@ impl SourceManager { let source_id = source.id; let connector_properties = extract_prop_from_source(&source)?; + let enable_scale_in = connector_properties.enable_split_scale_in(); let handle = runtime.spawn(async move { let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL); ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -752,6 +757,7 @@ impl SourceManager { handle, sync_call_tx, splits: current_splits_ref, + enable_scale_in, }, ); Ok(()) @@ -767,6 +773,7 @@ impl SourceManager { ) -> MetaResult<()> { let current_splits_ref = Arc::new(Mutex::new(SharedSplitMap { splits: None })); let connector_properties = extract_prop_from_source(source)?; + let enable_scale_in = connector_properties.enable_split_scale_in(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = dispatch_source_prop!(connector_properties, prop, { let mut worker = ConnectorSourceWorker::create( @@ -808,6 +815,7 @@ impl SourceManager { handle, sync_call_tx, splits: current_splits_ref, + enable_scale_in, }, );