diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 63bc3bee1f210..32f37d6c319be 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -9,11 +9,12 @@ services: ports: - 5432 healthcheck: - test: [ "CMD-SHELL", "pg_isready -U postgres" ] + test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 5s timeout: 5s retries: 5 - command: [ "postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=30" ] + command: + ["postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=30"] mysql: image: mysql:8.0 @@ -25,11 +26,7 @@ services: - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw healthcheck: - test: - [ - "CMD-SHELL", - "mysqladmin ping -h 127.0.0.1 -u root -p123456" - ] + test: ["CMD-SHELL", "mysqladmin ping -h 127.0.0.1 -u root -p123456"] interval: 5s timeout: 5s retries: 5 @@ -82,6 +79,7 @@ services: - mongodb - mongodb-setup - mongo_data_generator + - nats-server volumes: - ..:/risingwave @@ -107,7 +105,6 @@ services: volumes: - ..:/risingwave - rw-build-env: image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241030 volumes: @@ -185,7 +182,7 @@ services: redis-server: container_name: redis-server - image: 'redis:latest' + image: "redis:latest" expose: - 6379 ports: @@ -223,16 +220,15 @@ services: ports: - 1433:1433 environment: - ACCEPT_EULA: 'Y' - SA_PASSWORD: 'SomeTestOnly@SA' + ACCEPT_EULA: "Y" + SA_PASSWORD: "SomeTestOnly@SA" MSSQL_AGENT_ENABLED: "true" starrocks-fe-server: container_name: starrocks-fe-server image: starrocks/fe-ubuntu:3.1.7 hostname: starrocks-fe-server - command: - /opt/starrocks/fe/bin/start_fe.sh + command: /opt/starrocks/fe/bin/start_fe.sh ports: - 28030:8030 - 29020:9020 @@ -259,10 +255,10 @@ services: depends_on: - starrocks-fe-server -# # Temporary workaround for json schema registry test since redpanda only supports -# # protobuf/avro schema registry. Should be removed after the support. -# # Related tracking issue: -# # https://github.com/redpanda-data/redpanda/issues/1878 + # # Temporary workaround for json schema registry test since redpanda only supports + # # protobuf/avro schema registry. Should be removed after the support. + # # Related tracking issue: + # # https://github.com/redpanda-data/redpanda/issues/1878 schemaregistry: container_name: schemaregistry hostname: schemaregistry @@ -287,7 +283,7 @@ services: - "8080" - "6650" healthcheck: - test: [ "CMD-SHELL", "bin/pulsar-admin brokers healthcheck"] + test: ["CMD-SHELL", "bin/pulsar-admin brokers healthcheck"] interval: 5s timeout: 5s retries: 5 @@ -313,33 +309,48 @@ services: [ "bash", "-c", - "sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10" + "sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10", ] restart: "no" volumes: - ./mongodb/config-replica.js:/config-replica.js mongo_data_generator: - build: - context: . - dockerfile: ./mongodb/Dockerfile.generator - container_name: mongo_data_generator - depends_on: - - mongodb - environment: - MONGO_HOST: mongodb - MONGO_PORT: 27017 - MONGO_DB_NAME: random_data + build: + context: . + dockerfile: ./mongodb/Dockerfile.generator + container_name: mongo_data_generator + depends_on: + - mongodb + environment: + MONGO_HOST: mongodb + MONGO_PORT: 27017 + MONGO_DB_NAME: random_data mqtt-server: image: eclipse-mosquitto command: - - sh - - -c - - echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf + - sh + - -c + - echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf ports: - 1883:1883 healthcheck: - test: ["CMD-SHELL", "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0"] + test: + [ + "CMD-SHELL", + "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0", + ] interval: 10s timeout: 10s retries: 6 + nats-server: + image: nats:latest + command: ["-js"] + ports: + - "4222:4222" + - "8222:8222" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8222/healthz"] + interval: 10s + timeout: 5s + retries: 3 diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 1a9d6ef90ed66..88090c50d9aef 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -32,7 +32,7 @@ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- Install dependencies" -python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema requests +python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema nats-py requests psycopg2-binary apt-get -y install jq echo "--- e2e, inline test" diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py new file mode 100644 index 0000000000000..5f0250c3404aa --- /dev/null +++ b/e2e_test/source_inline/nats/operation.py @@ -0,0 +1,117 @@ +import asyncio +import time +import json +from nats.aio.client import Client as NATS +from nats.js.api import StreamConfig +import psycopg2 + +NATS_SERVER = "nats://nats-server:4222" + +async def create_stream(stream_name: str, subject: str): + # Create a NATS client + nc = NATS() + + try: + # Connect to the NATS server + await nc.connect(servers=[NATS_SERVER]) + + # Enable JetStream + js = nc.jetstream() + stream_config = StreamConfig( + name=stream_name, + subjects=[subject], + retention="limits", # Retention policy (limits, interest, or workqueue) + max_msgs=1000, # Maximum number of messages to retain + max_bytes=10 * 1024 * 1024, # Maximum size of messages in bytes + max_age=0, # Maximum age of messages (0 means unlimited) + storage="file", # Storage type (file or memory) + ) + + # Add the stream + await js.add_stream(stream_config) + print(f"Stream '{stream_name}' added successfully with subject '{subject}'.") + + except Exception as e: + print(f"Error: {e}") + + finally: + # Close the connection + await nc.close() + print("Disconnected from NATS server.") + +async def produce_message(stream_name: str, subject: str): + nc = NATS() + await nc.connect(servers=[NATS_SERVER]) + js = nc.jetstream() + for i in range(100): + payload = {"i": i} + await js.publish(subject, str.encode(json.dumps(payload))) + + await nc.close() + + +async def consume_message(_stream_name: str, subject: str): + nc = NATS() + await nc.connect(servers=[NATS_SERVER]) + js = nc.jetstream() + consumer = await js.pull_subscribe(subject) + for i in range(100): + msgs = await consumer.fetch(1) + for msg in msgs: + print(msg.data) + await msg.ack() + + +def validate_state_table_item(table_name: str, expect_count: int): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + # query for the internal table name and make sure it exists + with conn.cursor() as cursor: + cursor.execute(f"select name from rw_internal_table_info where job_name = '{table_name}'") + results = cursor.fetchall() + assert len(results) == 1, f"Expected exactly one internal table matching {table_name}, found {len(results)}" + internal_table_name = results[0][0] + print(f"Found internal table: {internal_table_name}") + for _ in range(10): + cursor.execute(f"SELECT * FROM {internal_table_name}") + results = cursor.fetchall() + print(f"Get items from state table: {results}") + if len(results) == expect_count: + print(f"Found {expect_count} items in {internal_table_name}") + break + print(f"Waiting for {internal_table_name} to have {expect_count} items, got {len(results)}. Retry...") + time.sleep(0.5) + +if __name__ == "__main__": + import sys + if len(sys.argv) < 2: + print("Usage: python operation.py [table_name] [expect_count]") + sys.exit(1) + + command = sys.argv[1] + if command in ["create_stream", "produce_stream"]: + if len(sys.argv) != 4: + print("Error: Both stream name and subject are required") + sys.exit(1) + stream_name = sys.argv[2] + subject = sys.argv[3] + + if command == "create_stream": + asyncio.run(create_stream(stream_name, subject)) + elif command == "produce_stream": + asyncio.run(produce_message(stream_name, subject)) + elif command == "validate_state": + if len(sys.argv) != 4: + print("Error: Both table name and expected count are required") + sys.exit(1) + table_name = sys.argv[2] + expect_count = int(sys.argv[3]) + validate_state_table_item(table_name, expect_count) + else: + print(f"Unknown command: {command}") + print("Supported commands: create_stream, produce_stream, validate_state") + sys.exit(1) diff --git a/e2e_test/source_inline/nats/test.slt.serial b/e2e_test/source_inline/nats/test.slt.serial new file mode 100644 index 0000000000000..3f4227a0d05e5 --- /dev/null +++ b/e2e_test/source_inline/nats/test.slt.serial @@ -0,0 +1,88 @@ +control substitution on + +statement ok +set streaming_use_shared_source to false; + +system ok +python3 e2e_test/source_inline/nats/operation.py create_stream "teststream" "testsubject" + +# produce 100 message of format `{"i": $i}` to the stream +system ok +python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" + +statement ok +set streaming_parallelism to 4; + +statement ok +create table t_nats ( i int ) with ( + connector = 'nats', + server_url='nats-server:4222', + subject='testsubject', + connect_mode='plain', + consumer.durable_name = 'demo1', + consumer.ack_policy = 'all', + stream='teststream', + consumer.max_ack_pending = '100000') +format plain encode json; + +statement ok +select * from t_nats; + +sleep 3s + +statement ok +flush; + +# at least once +query T +select count(*) >= 100 from t_nats; +---- +t + +system ok +python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 4 + +statement ok +alter table t_nats set PARALLELISM to 6; + +system ok +python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" + +sleep 3s + +statement ok +flush; + +query T +select count(*) >= 200 from t_nats; +---- +t + +system ok +python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 6 + +statement ok +alter table t_nats set PARALLELISM to 2; + +system ok +python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" + +sleep 3s + +statement ok +flush; + +query T +select count(*) >= 300 from t_nats; +---- +t + +system ok +python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 2 + + +statement ok +drop table t_nats; + +statement ok +set streaming_use_shared_source to true; \ No newline at end of file diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 06cd97580f6cd..ec01730ee463a 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -456,9 +456,17 @@ impl ConnectorProperties { ) } - pub fn enable_split_scale_in(&self) -> bool { + pub fn enable_drop_split(&self) -> bool { // enable split scale in just for Kinesis - matches!(self, ConnectorProperties::Kinesis(_)) + matches!( + self, + ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_) + ) + } + + /// For most connectors, this should be false. When enabled, RisingWave should not track any progress. + pub fn enable_adaptive_splits(&self) -> bool { + matches!(self, ConnectorProperties::Nats(_)) } /// Load additional info from `PbSource`. Currently only used by CDC. diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 21d0c7c3937d6..0b91d698ea97d 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -25,6 +25,7 @@ pub mod mqtt; pub mod nats; pub mod nexmark; pub mod pulsar; +mod util; use std::future::IntoFuture; @@ -47,6 +48,7 @@ use async_nats::jetstream::context::Context as JetStreamContext; pub use manager::{SourceColumnDesc, SourceColumnType}; use risingwave_common::array::{Array, ArrayRef}; use thiserror_ext::AsReport; +pub use util::fill_adaptive_split; pub use crate::source::filesystem::opendal_source::{ AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, diff --git a/src/connector/src/source/util.rs b/src/connector/src/source/util.rs new file mode 100644 index 0000000000000..ad1107bd88d6d --- /dev/null +++ b/src/connector/src/source/util.rs @@ -0,0 +1,51 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashSet}; +use std::sync::Arc; + +use crate::error::{ConnectorError, ConnectorResult}; +use crate::source::nats::split::NatsSplit; +use crate::source::SplitImpl; + +pub fn fill_adaptive_split( + split_template: &SplitImpl, + actor_in_use: &HashSet, +) -> ConnectorResult, SplitImpl>> { + // Just Nats is adaptive for now + if let SplitImpl::Nats(split) = split_template { + let mut new_splits = BTreeMap::new(); + for actor_id in actor_in_use { + let actor_id: Arc = actor_id.to_string().into(); + new_splits.insert( + actor_id.clone(), + SplitImpl::Nats(NatsSplit::new( + split.subject.clone(), + actor_id, + split.start_sequence.clone(), + )), + ); + } + tracing::debug!( + "Filled adaptive splits for Nats source, {} splits in total", + new_splits.len() + ); + Ok(new_splits) + } else { + Err(ConnectorError::from(anyhow::anyhow!( + "Unsupported split type, expect Nats SplitImpl but get {:?}", + split_template + ))) + } +} diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index db4e54168766c..00ea06341b42f 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1758,6 +1758,7 @@ impl ScaleController { let mut stream_source_actor_splits = HashMap::new(); let mut stream_source_dropped_actors = HashSet::new(); + // todo: handle adaptive splits for (fragment_id, reschedule) in reschedules { if !reschedule.actor_splits.is_empty() { stream_source_actor_splits diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index ddd65bf9352fc..de357069da30c 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -25,8 +25,8 @@ use risingwave_common::catalog::{DatabaseId, TableId}; use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_connector::error::ConnectorResult; use risingwave_connector::source::{ - ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SourceProperties, - SplitEnumerator, SplitId, SplitImpl, SplitMetaData, + fill_adaptive_split, ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, + SourceProperties, SplitEnumerator, SplitId, SplitImpl, SplitMetaData, }; use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved}; use risingwave_meta_model::SourceId; @@ -110,7 +110,8 @@ pub async fn create_source_worker_handle( let current_splits_ref = splits.clone(); let connector_properties = extract_prop_from_new_source(source)?; - let enable_scale_in = connector_properties.enable_split_scale_in(); + let enable_scale_in = connector_properties.enable_drop_split(); + let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = dispatch_source_prop!(connector_properties, prop, { let mut worker = ConnectorSourceWorker::create( @@ -142,7 +143,8 @@ pub async fn create_source_worker_handle( handle, sync_call_tx, splits, - enable_scale_in, + enable_drop_split: enable_scale_in, + enable_adaptive_splits, }) } @@ -262,13 +264,18 @@ pub struct ConnectorSourceWorkerHandle { handle: JoinHandle<()>, sync_call_tx: UnboundedSender>>, splits: SharedSplitMapRef, - enable_scale_in: bool, + enable_drop_split: bool, + enable_adaptive_splits: bool, } impl ConnectorSourceWorkerHandle { async fn discovered_splits(&self) -> Option> { self.splits.lock().await.splits.clone() } + + pub fn get_enable_adaptive_splits(&self) -> bool { + self.enable_adaptive_splits + } } pub struct SourceManagerCore { @@ -292,6 +299,36 @@ pub struct SourceManagerRunningInfo { pub actor_splits: HashMap>, } +async fn handle_discover_splits( + handle: &ConnectorSourceWorkerHandle, + source_id: SourceId, + actors: &HashSet, +) -> MetaResult, SplitImpl>> { + let Some(mut discovered_splits) = handle.discovered_splits().await else { + tracing::info!( + "The discover loop for source {} is not ready yet; we'll wait for the next run", + source_id + ); + return Ok(BTreeMap::new()); + }; + if discovered_splits.is_empty() { + tracing::warn!("No splits discovered for source {}", source_id); + } + + if handle.enable_adaptive_splits { + // Connector supporting adaptive splits returns just one split, and we need to make the number of splits equal to the number of actors in this fragment. + // Because we Risingwave consume the splits statelessly and we do not need to keep the id internally, we always use actor_id as split_id. + // And prev splits record should be dropped via CN. + + debug_assert!(handle.enable_drop_split); + debug_assert!(discovered_splits.len() == 1); + discovered_splits = + fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?; + } + + Ok(discovered_splits) +} + impl SourceManagerCore { fn new( metadata_manager: MetadataManager, @@ -317,7 +354,7 @@ impl SourceManagerCore { async fn reassign_splits(&self) -> MetaResult> { let mut split_assignment: SplitAssignment = HashMap::new(); - for (source_id, handle) in &self.managed_sources { + 'loop_source: for (source_id, handle) in &self.managed_sources { let source_fragment_ids = match self.source_fragments.get(source_id) { Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids, _ => { @@ -326,19 +363,7 @@ impl SourceManagerCore { }; let backfill_fragment_ids = self.backfill_fragments.get(source_id); - let Some(discovered_splits) = handle.discovered_splits().await else { - tracing::info!( - "The discover loop for source {} is not ready yet; we'll wait for the next run", - source_id - ); - continue; - }; - - if discovered_splits.is_empty() { - tracing::warn!("No splits discovered for source {}", source_id); - } - - for &fragment_id in source_fragment_ids { + 'loop_fragment: for &fragment_id in source_fragment_ids { let actors = match self .metadata_manager .get_running_actors_of_fragment(fragment_id) @@ -347,16 +372,22 @@ impl SourceManagerCore { Ok(actors) => { if actors.is_empty() { tracing::warn!("No actors found for fragment {}", fragment_id); - continue; + continue 'loop_fragment; } actors } Err(err) => { tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore"); - continue; + continue 'loop_fragment; } }; + let discovered_splits = handle_discover_splits(handle, *source_id, &actors).await?; + if discovered_splits.is_empty() { + // The discover loop for this source is not ready yet; we'll wait for the next run + continue 'loop_source; + } + let prev_actor_splits: HashMap<_, _> = actors .into_iter() .map(|actor_id| { @@ -375,7 +406,8 @@ impl SourceManagerCore { prev_actor_splits, &discovered_splits, SplitDiffOptions { - enable_scale_in: handle.enable_scale_in, + enable_scale_in: handle.enable_drop_split, + enable_adaptive: handle.enable_adaptive_splits, }, ) { split_assignment.insert(fragment_id, new_assignment); @@ -520,6 +552,9 @@ impl Ord for ActorSplitsAssignment { #[derive(Debug)] struct SplitDiffOptions { enable_scale_in: bool, + + /// For most connectors, this should be false. When enabled, RisingWave will not track any progress. + enable_adaptive: bool, } #[allow(clippy::derivable_impls)] @@ -527,6 +562,7 @@ impl Default for SplitDiffOptions { fn default() -> Self { SplitDiffOptions { enable_scale_in: false, + enable_adaptive: false, } } } @@ -601,7 +637,7 @@ where .filter(|split_id| !prev_split_ids.contains(split_id)) .collect(); - if opts.enable_scale_in { + if opts.enable_scale_in || opts.enable_adaptive { // if we support scale in, no more splits are discovered, and no splits are dropped, return // we need to check if discovered_split_ids is empty, because if it is empty, we need to // handle the case of scale in to zero (like deleting all objects from s3) @@ -623,7 +659,7 @@ where let mut heap = BinaryHeap::with_capacity(actor_splits.len()); for (actor_id, mut splits) in actor_splits { - if opts.enable_scale_in { + if opts.enable_scale_in || opts.enable_adaptive { splits.retain(|split| !dropped_splits.contains(&split.id())); } @@ -887,7 +923,7 @@ impl SourceManager { let mut assigned = HashMap::new(); - for (source_id, fragments) in source_fragments { + 'loop_source: for (source_id, fragments) in source_fragments { let handle = core .managed_sources .get(&source_id) @@ -906,15 +942,8 @@ impl SourceManager { .context("failed to receive sync call response")??; } - let splits = handle.discovered_splits().await.unwrap(); - - if splits.is_empty() { - tracing::warn!("no splits detected for source {}", source_id); - continue; - } - for fragment_id in fragments { - let empty_actor_splits = table_fragments + let empty_actor_splits: HashMap> = table_fragments .fragments .get(&fragment_id) .unwrap() @@ -922,6 +951,12 @@ impl SourceManager { .iter() .map(|actor| (actor.actor_id, vec![])) .collect(); + let actor_hashset: HashSet = empty_actor_splits.keys().cloned().collect(); + let splits = handle_discover_splits(handle, source_id, &actor_hashset).await?; + if splits.is_empty() { + tracing::warn!("no splits detected for source {}", source_id); + continue 'loop_source; + } if let Some(diff) = reassign_splits( fragment_id, @@ -1051,7 +1086,8 @@ impl SourceManager { let connector_properties = extract_prop_from_existing_source(&source)?; - let enable_scale_in = connector_properties.enable_split_scale_in(); + let enable_drop_split = connector_properties.enable_drop_split(); + let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = tokio::spawn(async move { let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL); @@ -1089,7 +1125,8 @@ impl SourceManager { handle, sync_call_tx, splits, - enable_scale_in, + enable_drop_split, + enable_adaptive_splits, }, ); Ok(()) @@ -1258,6 +1295,7 @@ mod tests { let opts = SplitDiffOptions { enable_scale_in: true, + enable_adaptive: false, }; let prev_split_ids: HashSet<_> = actor_splits @@ -1303,6 +1341,7 @@ mod tests { let opts = SplitDiffOptions { enable_scale_in: true, + enable_adaptive: false, }; let diff = reassign_splits(