From 4a8b0140313dabc2622fe649f2cd7f1a764413a5 Mon Sep 17 00:00:00 2001 From: tabversion Date: Thu, 21 Nov 2024 22:04:29 +0800 Subject: [PATCH 01/23] update --- src/connector/src/source/base.rs | 5 +++ src/connector/src/source/mod.rs | 2 + src/connector/src/source/util.rs | 54 +++++++++++++++++++++++++++ src/meta/src/stream/scale.rs | 1 + src/meta/src/stream/source_manager.rs | 33 +++++++++++++--- 5 files changed, 90 insertions(+), 5 deletions(-) create mode 100644 src/connector/src/source/util.rs diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index e031a85a34d62..83ef077e0813f 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -457,6 +457,11 @@ impl ConnectorProperties { matches!(self, ConnectorProperties::Kinesis(_)) } + /// For most connectors, this should be false. When enabled, RisingWave should not track any progress. + pub fn enable_adaptive_splits(&self) -> bool { + matches!(self, ConnectorProperties::Nats(_)) + } + /// Load additional info from `PbSource`. Currently only used by CDC. pub fn init_from_pb_source(&mut self, source: &PbSource) { dispatch_source_prop!(self, prop, prop.init_from_pb_source(source)) diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 899fc2a2379f5..2914d06b16429 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -25,6 +25,7 @@ pub mod mqtt; pub mod nats; pub mod nexmark; pub mod pulsar; +mod util; use std::future::IntoFuture; @@ -47,6 +48,7 @@ use async_nats::jetstream::context::Context as JetStreamContext; pub use manager::{SourceColumnDesc, SourceColumnType}; use risingwave_common::array::{Array, ArrayRef}; use thiserror_ext::AsReport; +pub use util::fill_adaptive_split; pub use crate::source::filesystem::opendal_source::{ AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, diff --git a/src/connector/src/source/util.rs b/src/connector/src/source/util.rs new file mode 100644 index 0000000000000..6265e12a14e70 --- /dev/null +++ b/src/connector/src/source/util.rs @@ -0,0 +1,54 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashSet}; +use std::sync::Arc; + +use crate::error::{ConnectorError, ConnectorResult}; +use crate::source::nats::split::NatsSplit; +use crate::source::SplitImpl; + +pub fn fill_adaptive_split( + discovered_splits: BTreeMap, SplitImpl>, + actor_in_use: &HashSet, +) -> ConnectorResult, SplitImpl>> { + debug_assert!(discovered_splits.len() == 1); + let split = discovered_splits.values().next().unwrap(); + + // Just Nats is adaptive for now + if let SplitImpl::Nats(split) = split { + let mut new_splits = BTreeMap::new(); + for actor_id in actor_in_use { + let actor_id: Arc = actor_id.to_string().into(); + new_splits.insert( + actor_id.clone(), + SplitImpl::Nats(NatsSplit::new( + split.subject.clone(), + actor_id, + split.start_sequence.clone(), + )), + ); + } + tracing::info!( + "Filled adaptive splits for Nats source, {} splits in total", + new_splits.len() + ); + Ok(new_splits) + } else { + Err(ConnectorError::from(anyhow::anyhow!( + "Unsupported split type, expect Nats SplitImpl but get {:?}", + split + ))) + } +} diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index d6f1d54b73849..26d357a18f573 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1758,6 +1758,7 @@ impl ScaleController { let mut stream_source_actor_splits = HashMap::new(); let mut stream_source_dropped_actors = HashSet::new(); + // todo: handle adaptive splits for (fragment_id, reschedule) in reschedules { if !reschedule.actor_splits.is_empty() { stream_source_actor_splits diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index be962a63ff80f..53609c75a8d60 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -25,8 +25,8 @@ use risingwave_common::catalog::{DatabaseId, TableId}; use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_connector::error::ConnectorResult; use risingwave_connector::source::{ - ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SourceProperties, - SplitEnumerator, SplitId, SplitImpl, SplitMetaData, + fill_adaptive_split, ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, + SourceProperties, SplitEnumerator, SplitId, SplitImpl, SplitMetaData, }; use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved}; use risingwave_meta_model::SourceId; @@ -111,6 +111,7 @@ pub async fn create_source_worker_handle( let connector_properties = extract_prop_from_new_source(source)?; let enable_scale_in = connector_properties.enable_split_scale_in(); + let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = dispatch_source_prop!(connector_properties, prop, { let mut worker = ConnectorSourceWorker::create( @@ -143,6 +144,7 @@ pub async fn create_source_worker_handle( sync_call_tx, splits, enable_scale_in, + enable_adaptive_splits, }) } @@ -263,12 +265,17 @@ pub struct ConnectorSourceWorkerHandle { sync_call_tx: UnboundedSender>>, splits: SharedSplitMapRef, enable_scale_in: bool, + enable_adaptive_splits: bool, } impl ConnectorSourceWorkerHandle { async fn discovered_splits(&self) -> Option> { self.splits.lock().await.splits.clone() } + + pub fn get_enable_adaptive_splits(&self) -> bool { + self.enable_adaptive_splits + } } pub struct SourceManagerCore { @@ -326,7 +333,7 @@ impl SourceManagerCore { }; let backfill_fragment_ids = self.backfill_fragments.get(source_id); - let Some(discovered_splits) = handle.discovered_splits().await else { + let Some(mut discovered_splits) = handle.discovered_splits().await else { tracing::info!( "The discover loop for source {} is not ready yet; we'll wait for the next run", source_id @@ -357,6 +364,13 @@ impl SourceManagerCore { } }; + if handle.enable_adaptive_splits { + // Connector supporting adaptive splits returns just one split, and we need to make the number of splits equal to the number of actors in this fragment. + // Because we Risingwave consume the splits statelessly and we do not need to keep the id internally, we always use actor_id as split_id. + // And prev splits record should be dropped via CN. + discovered_splits = fill_adaptive_split(discovered_splits, &actors)?; + } + let prev_actor_splits: HashMap<_, _> = actors .into_iter() .map(|actor_id| { @@ -376,6 +390,7 @@ impl SourceManagerCore { &discovered_splits, SplitDiffOptions { enable_scale_in: handle.enable_scale_in, + enable_adaptive: handle.enable_adaptive_splits, }, ) { split_assignment.insert(fragment_id, new_assignment); @@ -520,6 +535,9 @@ impl Ord for ActorSplitsAssignment { #[derive(Debug)] struct SplitDiffOptions { enable_scale_in: bool, + + /// For most connectors, this should be false. When enabled, RisingWave will not track any progress. + enable_adaptive: bool, } #[allow(clippy::derivable_impls)] @@ -527,6 +545,7 @@ impl Default for SplitDiffOptions { fn default() -> Self { SplitDiffOptions { enable_scale_in: false, + enable_adaptive: false, } } } @@ -601,7 +620,7 @@ where .filter(|split_id| !prev_split_ids.contains(split_id)) .collect(); - if opts.enable_scale_in { + if opts.enable_scale_in || opts.enable_adaptive { // if we support scale in, no more splits are discovered, and no splits are dropped, return // we need to check if discovered_split_ids is empty, because if it is empty, we need to // handle the case of scale in to zero (like deleting all objects from s3) @@ -623,7 +642,7 @@ where let mut heap = BinaryHeap::with_capacity(actor_splits.len()); for (actor_id, mut splits) in actor_splits { - if opts.enable_scale_in { + if opts.enable_scale_in || opts.enable_adaptive { splits.retain(|split| !dropped_splits.contains(&split.id())); } @@ -1054,6 +1073,7 @@ impl SourceManager { let connector_properties = extract_prop_from_existing_source(&source)?; let enable_scale_in = connector_properties.enable_split_scale_in(); + let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = tokio::spawn(async move { let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL); @@ -1092,6 +1112,7 @@ impl SourceManager { sync_call_tx, splits, enable_scale_in, + enable_adaptive_splits, }, ); Ok(()) @@ -1260,6 +1281,7 @@ mod tests { let opts = SplitDiffOptions { enable_scale_in: true, + enable_adaptive: false, }; let prev_split_ids: HashSet<_> = actor_splits @@ -1305,6 +1327,7 @@ mod tests { let opts = SplitDiffOptions { enable_scale_in: true, + enable_adaptive: false, }; let diff = reassign_splits( From fd56f42bb0456feaba6b1ab977131e500da0f7ed Mon Sep 17 00:00:00 2001 From: tabversion Date: Thu, 28 Nov 2024 12:14:15 +0800 Subject: [PATCH 02/23] rename --- src/connector/src/source/base.rs | 7 +++++-- src/meta/src/stream/source_manager.rs | 12 ++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 83ef077e0813f..bbad1838b18a2 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -452,9 +452,12 @@ impl ConnectorProperties { ) } - pub fn enable_split_scale_in(&self) -> bool { + pub fn enable_drop_split(&self) -> bool { // enable split scale in just for Kinesis - matches!(self, ConnectorProperties::Kinesis(_)) + matches!( + self, + ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_) + ) } /// For most connectors, this should be false. When enabled, RisingWave should not track any progress. diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 35f716b8df27d..52967e03d8b5a 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -110,7 +110,7 @@ pub async fn create_source_worker_handle( let current_splits_ref = splits.clone(); let connector_properties = extract_prop_from_new_source(source)?; - let enable_scale_in = connector_properties.enable_split_scale_in(); + let enable_scale_in = connector_properties.enable_drop_split(); let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = dispatch_source_prop!(connector_properties, prop, { @@ -143,7 +143,7 @@ pub async fn create_source_worker_handle( handle, sync_call_tx, splits, - enable_scale_in, + enable_drop_split: enable_scale_in, enable_adaptive_splits, }) } @@ -264,7 +264,7 @@ pub struct ConnectorSourceWorkerHandle { handle: JoinHandle<()>, sync_call_tx: UnboundedSender>>, splits: SharedSplitMapRef, - enable_scale_in: bool, + enable_drop_split: bool, enable_adaptive_splits: bool, } @@ -389,7 +389,7 @@ impl SourceManagerCore { prev_actor_splits, &discovered_splits, SplitDiffOptions { - enable_scale_in: handle.enable_scale_in, + enable_scale_in: handle.enable_drop_split, enable_adaptive: handle.enable_adaptive_splits, }, ) { @@ -1072,7 +1072,7 @@ impl SourceManager { let connector_properties = extract_prop_from_existing_source(&source)?; - let enable_scale_in = connector_properties.enable_split_scale_in(); + let enable_drop_split = connector_properties.enable_drop_split(); let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = tokio::spawn(async move { @@ -1111,7 +1111,7 @@ impl SourceManager { handle, sync_call_tx, splits, - enable_scale_in, + enable_drop_split, enable_adaptive_splits, }, ); From 2ece36f6a43c57837c74a73c4235e1a35161cf5c Mon Sep 17 00:00:00 2001 From: tabversion Date: Thu, 28 Nov 2024 14:24:21 +0800 Subject: [PATCH 03/23] handle init run --- src/connector/src/source/util.rs | 11 ++-- src/meta/src/stream/source_manager.rs | 76 ++++++++++++++++----------- 2 files changed, 50 insertions(+), 37 deletions(-) diff --git a/src/connector/src/source/util.rs b/src/connector/src/source/util.rs index 6265e12a14e70..ad1107bd88d6d 100644 --- a/src/connector/src/source/util.rs +++ b/src/connector/src/source/util.rs @@ -20,14 +20,11 @@ use crate::source::nats::split::NatsSplit; use crate::source::SplitImpl; pub fn fill_adaptive_split( - discovered_splits: BTreeMap, SplitImpl>, + split_template: &SplitImpl, actor_in_use: &HashSet, ) -> ConnectorResult, SplitImpl>> { - debug_assert!(discovered_splits.len() == 1); - let split = discovered_splits.values().next().unwrap(); - // Just Nats is adaptive for now - if let SplitImpl::Nats(split) = split { + if let SplitImpl::Nats(split) = split_template { let mut new_splits = BTreeMap::new(); for actor_id in actor_in_use { let actor_id: Arc = actor_id.to_string().into(); @@ -40,7 +37,7 @@ pub fn fill_adaptive_split( )), ); } - tracing::info!( + tracing::debug!( "Filled adaptive splits for Nats source, {} splits in total", new_splits.len() ); @@ -48,7 +45,7 @@ pub fn fill_adaptive_split( } else { Err(ConnectorError::from(anyhow::anyhow!( "Unsupported split type, expect Nats SplitImpl but get {:?}", - split + split_template ))) } } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 52967e03d8b5a..021b9f0699154 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -299,6 +299,36 @@ pub struct SourceManagerRunningInfo { pub actor_splits: HashMap>, } +async fn handle_discover_splits( + handle: &ConnectorSourceWorkerHandle, + source_id: SourceId, + actors: &HashSet, +) -> MetaResult, SplitImpl>> { + let Some(mut discovered_splits) = handle.discovered_splits().await else { + tracing::info!( + "The discover loop for source {} is not ready yet; we'll wait for the next run", + source_id + ); + return Ok(BTreeMap::new()); + }; + if discovered_splits.is_empty() { + tracing::warn!("No splits discovered for source {}", source_id); + } + + if handle.enable_adaptive_splits { + // Connector supporting adaptive splits returns just one split, and we need to make the number of splits equal to the number of actors in this fragment. + // Because we Risingwave consume the splits statelessly and we do not need to keep the id internally, we always use actor_id as split_id. + // And prev splits record should be dropped via CN. + + debug_assert!(handle.enable_drop_split); + debug_assert!(discovered_splits.len() == 1); + discovered_splits = + fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?; + } + + Ok(discovered_splits) +} + impl SourceManagerCore { fn new( metadata_manager: MetadataManager, @@ -324,7 +354,7 @@ impl SourceManagerCore { async fn reassign_splits(&self) -> MetaResult> { let mut split_assignment: SplitAssignment = HashMap::new(); - for (source_id, handle) in &self.managed_sources { + 'loop_source: for (source_id, handle) in &self.managed_sources { let source_fragment_ids = match self.source_fragments.get(source_id) { Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids, _ => { @@ -333,19 +363,7 @@ impl SourceManagerCore { }; let backfill_fragment_ids = self.backfill_fragments.get(source_id); - let Some(mut discovered_splits) = handle.discovered_splits().await else { - tracing::info!( - "The discover loop for source {} is not ready yet; we'll wait for the next run", - source_id - ); - continue; - }; - - if discovered_splits.is_empty() { - tracing::warn!("No splits discovered for source {}", source_id); - } - - for &fragment_id in source_fragment_ids { + 'loop_fragment: for &fragment_id in source_fragment_ids { let actors = match self .metadata_manager .get_running_actors_of_fragment(fragment_id) @@ -354,21 +372,20 @@ impl SourceManagerCore { Ok(actors) => { if actors.is_empty() { tracing::warn!("No actors found for fragment {}", fragment_id); - continue; + continue 'loop_fragment; } actors } Err(err) => { tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore"); - continue; + continue 'loop_fragment; } }; - if handle.enable_adaptive_splits { - // Connector supporting adaptive splits returns just one split, and we need to make the number of splits equal to the number of actors in this fragment. - // Because we Risingwave consume the splits statelessly and we do not need to keep the id internally, we always use actor_id as split_id. - // And prev splits record should be dropped via CN. - discovered_splits = fill_adaptive_split(discovered_splits, &actors)?; + let discovered_splits = handle_discover_splits(handle, *source_id, &actors).await?; + if discovered_splits.is_empty() { + // The discover loop for this source is not ready yet; we'll wait for the next run + continue 'loop_source; } let prev_actor_splits: HashMap<_, _> = actors @@ -908,7 +925,7 @@ impl SourceManager { let mut assigned = HashMap::new(); - for (source_id, fragments) in source_fragments { + 'loop_source: for (source_id, fragments) in source_fragments { let handle = core .managed_sources .get(&source_id) @@ -927,15 +944,8 @@ impl SourceManager { .context("failed to receive sync call response")??; } - let splits = handle.discovered_splits().await.unwrap(); - - if splits.is_empty() { - tracing::warn!("no splits detected for source {}", source_id); - continue; - } - for fragment_id in fragments { - let empty_actor_splits = table_fragments + let empty_actor_splits: HashMap> = table_fragments .fragments .get(&fragment_id) .unwrap() @@ -943,6 +953,12 @@ impl SourceManager { .iter() .map(|actor| (actor.actor_id, vec![])) .collect(); + let actor_hashset: HashSet = empty_actor_splits.keys().cloned().collect(); + let splits = handle_discover_splits(handle, source_id, &actor_hashset).await?; + if splits.is_empty() { + tracing::warn!("no splits detected for source {}", source_id); + continue 'loop_source; + } if let Some(diff) = reassign_splits( fragment_id, From 12b45877d3c2eb6f7be15a28185499743cd6dcb0 Mon Sep 17 00:00:00 2001 From: tabversion Date: Thu, 28 Nov 2024 22:43:00 +0800 Subject: [PATCH 04/23] test case Signed-off-by: tabversion --- ci/docker-compose.yml | 72 +++++++-------- ci/scripts/e2e-source-test.sh | 2 +- e2e_test/source_inline/nats/operation.py | 107 +++++++++++++++++++++++ e2e_test/source_inline/nats/test.slt | 74 ++++++++++++++++ 4 files changed, 218 insertions(+), 37 deletions(-) create mode 100644 e2e_test/source_inline/nats/operation.py create mode 100644 e2e_test/source_inline/nats/test.slt diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 63bc3bee1f210..241ee58c003d6 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -25,11 +25,7 @@ services: - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw healthcheck: - test: - [ - "CMD-SHELL", - "mysqladmin ping -h 127.0.0.1 -u root -p123456" - ] + test: [ "CMD-SHELL", "mysqladmin ping -h 127.0.0.1 -u root -p123456" ] interval: 5s timeout: 5s retries: 5 @@ -107,7 +103,6 @@ services: volumes: - ..:/risingwave - rw-build-env: image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241030 volumes: @@ -191,7 +186,7 @@ services: ports: - 6378:6379 healthcheck: - test: ["CMD", "redis-cli", "ping"] + test: [ "CMD", "redis-cli", "ping" ] interval: 5s timeout: 30s retries: 50 @@ -211,7 +206,7 @@ services: - 8030:8030 - 8040:8040 healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:9030"] + test: [ "CMD", "curl", "-f", "http://localhost:9030" ] interval: 5s timeout: 5s retries: 30 @@ -231,14 +226,13 @@ services: container_name: starrocks-fe-server image: starrocks/fe-ubuntu:3.1.7 hostname: starrocks-fe-server - command: - /opt/starrocks/fe/bin/start_fe.sh + command: /opt/starrocks/fe/bin/start_fe.sh ports: - 28030:8030 - 29020:9020 - 29030:9030 healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:9030"] + test: [ "CMD", "curl", "-f", "http://localhost:9030" ] interval: 5s timeout: 5s retries: 30 @@ -259,10 +253,10 @@ services: depends_on: - starrocks-fe-server -# # Temporary workaround for json schema registry test since redpanda only supports -# # protobuf/avro schema registry. Should be removed after the support. -# # Related tracking issue: -# # https://github.com/redpanda-data/redpanda/issues/1878 + # # Temporary workaround for json schema registry test since redpanda only supports + # # protobuf/avro schema registry. Should be removed after the support. + # # Related tracking issue: + # # https://github.com/redpanda-data/redpanda/issues/1878 schemaregistry: container_name: schemaregistry hostname: schemaregistry @@ -287,7 +281,7 @@ services: - "8080" - "6650" healthcheck: - test: [ "CMD-SHELL", "bin/pulsar-admin brokers healthcheck"] + test: [ "CMD-SHELL", "bin/pulsar-admin brokers healthcheck" ] interval: 5s timeout: 5s retries: 5 @@ -309,37 +303,43 @@ services: container_name: mongodb-setup depends_on: - mongodb - entrypoint: - [ - "bash", - "-c", - "sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10" - ] + entrypoint: [ "bash", "-c", "sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10" ] restart: "no" volumes: - ./mongodb/config-replica.js:/config-replica.js mongo_data_generator: - build: - context: . - dockerfile: ./mongodb/Dockerfile.generator - container_name: mongo_data_generator - depends_on: - - mongodb - environment: - MONGO_HOST: mongodb - MONGO_PORT: 27017 - MONGO_DB_NAME: random_data + build: + context: . + dockerfile: ./mongodb/Dockerfile.generator + container_name: mongo_data_generator + depends_on: + - mongodb + environment: + MONGO_HOST: mongodb + MONGO_PORT: 27017 + MONGO_DB_NAME: random_data mqtt-server: image: eclipse-mosquitto command: - - sh - - -c - - echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf + - sh + - -c + - echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf ports: - 1883:1883 healthcheck: - test: ["CMD-SHELL", "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0"] + test: [ "CMD-SHELL", "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0" ] interval: 10s timeout: 10s retries: 6 + nats: + image: nats:latest + command: [ "-js" ] + ports: + - "4222:4222" + - "8222:8222" + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:8222/healthz" ] + interval: 10s + timeout: 5s + retries: 3 diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 4491db5633ea8..c5d723b6533ca 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -32,7 +32,7 @@ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- Install dependencies" -python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema +python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema nats-py apt-get -y install jq echo "--- e2e, inline test" diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py new file mode 100644 index 0000000000000..49d11ea4b6fb3 --- /dev/null +++ b/e2e_test/source_inline/nats/operation.py @@ -0,0 +1,107 @@ +import asyncio +import json +from nats.aio.client import Client as NATS +from nats.js.api import StreamConfig +import psycopg2 + +NATS_SERVER = "nats://localhost:4222" + +async def create_stream(stream_name: str, subject: str): + # Create a NATS client + nc = NATS() + + try: + # Connect to the NATS server + await nc.connect(servers=[NATS_SERVER]) + + # Enable JetStream + js = nc.jetstream() + stream_config = StreamConfig( + name=stream_name, + subjects=[subject], + retention="limits", # Retention policy (limits, interest, or workqueue) + max_msgs=1000, # Maximum number of messages to retain + max_bytes=10 * 1024 * 1024, # Maximum size of messages in bytes + max_age=0, # Maximum age of messages (0 means unlimited) + storage="file", # Storage type (file or memory) + ) + + # Add the stream + await js.add_stream(stream_config) + print(f"Stream '{stream_name}' added successfully with subject '{subject}'.") + + except Exception as e: + print(f"Error: {e}") + + finally: + # Close the connection + await nc.close() + print("Disconnected from NATS server.") + +async def produce_message(stream_name: str, subject: str): + nc = NATS() + await nc.connect(servers=[NATS_SERVER]) + js = nc.jetstream() + for i in range(100): + payload = {"i": i} + await js.publish(subject, str.encode(json.dumps(payload))) + + await nc.close() + +async def consume_message(stream_name: str, subject: str): + nc = NATS() + await nc.connect(servers=[NATS_SERVER]) + js = nc.jetstream() + consumer = await js.pull_subscribe(subject) + for i in range(100): + msgs = await consumer.fetch(1) + for msg in msgs: + print(msg.data) + await msg.ack() + +def validate_state_table_item(table_name: str, expect_count: int): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + # query for the internal table name and make sure it exists + with conn.cursor() as cursor: + cursor.execute(f"select name from rw_catalog.rw_internal_tables where name ILIKE '%{table_name}%'") + results = cursor.fetchall() + assert len(results) == 1, f"Expected exactly one internal table matching {table_name}, found {len(results)}" + internal_table_name = results[0][0] + print(f"Found internal table: {internal_table_name}") + cursor.execute(f"SELECT * FROM {internal_table_name}") + assert cursor.rowcount == expect_count + +if __name__ == "__main__": + import sys + if len(sys.argv) < 2: + print("Usage: python operation.py [table_name] [expect_count]") + sys.exit(1) + + command = sys.argv[1] + if command in ["create_stream", "produce_stream"]: + if len(sys.argv) != 4: + print("Error: Both stream name and subject are required") + sys.exit(1) + stream_name = sys.argv[2] + subject = sys.argv[3] + + if command == "create_stream": + asyncio.run(create_stream(stream_name, subject)) + elif command == "produce_stream": + asyncio.run(produce_message(stream_name, subject)) + elif command == "validate_state": + if len(sys.argv) != 5: + print("Error: Both table name and expected count are required") + sys.exit(1) + table_name = sys.argv[2] + expect_count = int(sys.argv[3]) + validate_state_table_item(table_name, expect_count) + else: + print(f"Unknown command: {command}") + print("Supported commands: create_stream, produce_stream, validate_state") + sys.exit(1) diff --git a/e2e_test/source_inline/nats/test.slt b/e2e_test/source_inline/nats/test.slt new file mode 100644 index 0000000000000..4605cbbd4dfcd --- /dev/null +++ b/e2e_test/source_inline/nats/test.slt @@ -0,0 +1,74 @@ +control substitution on + +statement ok +set streaming_use_shared_source to false; + +system ok +python e2e_test/source_inline/nats/operation.py create_stream "teststream" "testsubject" + +# produce 100 message of format `{"i": $i}` to the stream +system ok +python e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" + +statement ok +set streaming_parallelism to 4; + +statement ok +create table t_nats (i int ) with ( + connector = 'nats', + server_url='nats:4222', + subject='testsubject', + connect_mode='plain', + consumer.durable_name = 'demo1', + consumer.ack_policy = 'all', + stream='teststream1', + consumer.max_ack_pending = '100000') +format plain encode json; + +sleep 1s + +# at least once +query T +select count(*) >= 100 from t_nats; +--- +t + +system ok +python e2e_test/source_inline/nats/operation.py validate_state "t_nats" 4 + +statement ok +alter table t_nats set streaming_parallelism to 6; + +system ok +python e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" + +sleep 3s + +query T +select count(*) >= 200 from t_nats; +--- +t + +system ok +python e2e_test/source_inline/nats/operation.py validate_state "t_nats" 6 + +statement ok +alter table t_nats set streaming_parallelism to 2; + +system ok +python e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" + +query T +select count(*) >= 300 from t_nats; +--- +t + +system ok +python e2e_test/source_inline/nats/operation.py validate_state "t_nats" 2 + + +statement ok +drop table t_nats; + +statement ok +set streaming_use_shared_source to true; \ No newline at end of file From 9a6e6398042db3be2da0e5630fe21d6ed6096b17 Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 29 Nov 2024 11:27:40 +0800 Subject: [PATCH 05/23] fix --- e2e_test/source_inline/nats/operation.py | 8 +++++--- e2e_test/source_inline/nats/test.slt | 14 +++++++------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py index 49d11ea4b6fb3..fbffa2035d895 100644 --- a/e2e_test/source_inline/nats/operation.py +++ b/e2e_test/source_inline/nats/operation.py @@ -47,7 +47,8 @@ async def produce_message(stream_name: str, subject: str): await js.publish(subject, str.encode(json.dumps(payload))) await nc.close() - + + async def consume_message(stream_name: str, subject: str): nc = NATS() await nc.connect(servers=[NATS_SERVER]) @@ -58,7 +59,8 @@ async def consume_message(stream_name: str, subject: str): for msg in msgs: print(msg.data) await msg.ack() - + + def validate_state_table_item(table_name: str, expect_count: int): conn = psycopg2.connect( host="localhost", @@ -89,7 +91,7 @@ def validate_state_table_item(table_name: str, expect_count: int): sys.exit(1) stream_name = sys.argv[2] subject = sys.argv[3] - + if command == "create_stream": asyncio.run(create_stream(stream_name, subject)) elif command == "produce_stream": diff --git a/e2e_test/source_inline/nats/test.slt b/e2e_test/source_inline/nats/test.slt index 4605cbbd4dfcd..e113a470f6409 100644 --- a/e2e_test/source_inline/nats/test.slt +++ b/e2e_test/source_inline/nats/test.slt @@ -4,11 +4,11 @@ statement ok set streaming_use_shared_source to false; system ok -python e2e_test/source_inline/nats/operation.py create_stream "teststream" "testsubject" +python3 e2e_test/source_inline/nats/operation.py create_stream "teststream" "testsubject" # produce 100 message of format `{"i": $i}` to the stream system ok -python e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" +python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" statement ok set streaming_parallelism to 4; @@ -34,13 +34,13 @@ select count(*) >= 100 from t_nats; t system ok -python e2e_test/source_inline/nats/operation.py validate_state "t_nats" 4 +python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 4 statement ok alter table t_nats set streaming_parallelism to 6; system ok -python e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" +python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" sleep 3s @@ -50,13 +50,13 @@ select count(*) >= 200 from t_nats; t system ok -python e2e_test/source_inline/nats/operation.py validate_state "t_nats" 6 +python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 6 statement ok alter table t_nats set streaming_parallelism to 2; system ok -python e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" +python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" query T select count(*) >= 300 from t_nats; @@ -64,7 +64,7 @@ select count(*) >= 300 from t_nats; t system ok -python e2e_test/source_inline/nats/operation.py validate_state "t_nats" 2 +python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 2 statement ok From 765664a8b5105441be04dc2e9c02a9464810054c Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 29 Nov 2024 12:03:16 +0800 Subject: [PATCH 06/23] dep --- ci/scripts/e2e-source-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 8bad5b30f577b..1899c29c13dfe 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -32,7 +32,7 @@ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- Install dependencies" -python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema nats-py requests +python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema nats-py requests psycopg2 apt-get -y install jq echo "--- e2e, inline test" From 7dba3a8e0f947f5000c5c922d8afafbf1b4fc33c Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 29 Nov 2024 12:55:13 +0800 Subject: [PATCH 07/23] fix --- ci/scripts/e2e-source-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 1899c29c13dfe..88090c50d9aef 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -32,7 +32,7 @@ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- Install dependencies" -python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema nats-py requests psycopg2 +python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema nats-py requests psycopg2-binary apt-get -y install jq echo "--- e2e, inline test" From 73ef2faf85b804fb828ef8e1b122a9cebd9e7538 Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 29 Nov 2024 13:29:48 +0800 Subject: [PATCH 08/23] fix --- e2e_test/source_inline/nats/operation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py index fbffa2035d895..653107650855c 100644 --- a/e2e_test/source_inline/nats/operation.py +++ b/e2e_test/source_inline/nats/operation.py @@ -4,7 +4,7 @@ from nats.js.api import StreamConfig import psycopg2 -NATS_SERVER = "nats://localhost:4222" +NATS_SERVER = "nats://nats:4222" async def create_stream(stream_name: str, subject: str): # Create a NATS client From 06e72aed5d8b61a281afe8d5ad83d685bdc5426b Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 29 Nov 2024 13:59:11 +0800 Subject: [PATCH 09/23] fix url --- ci/docker-compose.yml | 1 + e2e_test/source_inline/nats/operation.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 241ee58c003d6..ec92932c8898d 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -335,6 +335,7 @@ services: nats: image: nats:latest command: [ "-js" ] + container_name: nats-server ports: - "4222:4222" - "8222:8222" diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py index 653107650855c..4ed2a9dca7a60 100644 --- a/e2e_test/source_inline/nats/operation.py +++ b/e2e_test/source_inline/nats/operation.py @@ -4,7 +4,7 @@ from nats.js.api import StreamConfig import psycopg2 -NATS_SERVER = "nats://nats:4222" +NATS_SERVER = "nats://nats-server:4222" async def create_stream(stream_name: str, subject: str): # Create a NATS client From f211008ca2c1e1542efcd043ac7f905b85b3bc13 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 2 Dec 2024 11:28:37 +0800 Subject: [PATCH 10/23] fix --- e2e_test/source_inline/nats/test.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/nats/test.slt b/e2e_test/source_inline/nats/test.slt index e113a470f6409..fe589a7b0bb23 100644 --- a/e2e_test/source_inline/nats/test.slt +++ b/e2e_test/source_inline/nats/test.slt @@ -16,7 +16,7 @@ set streaming_parallelism to 4; statement ok create table t_nats (i int ) with ( connector = 'nats', - server_url='nats:4222', + server_url='nats-server:4222', subject='testsubject', connect_mode='plain', consumer.durable_name = 'demo1', From 9a61c3f105d45b31cf23d0e7635de2d61e43bf3d Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 2 Dec 2024 11:29:32 +0800 Subject: [PATCH 11/23] fix --- e2e_test/source_inline/nats/operation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py index 4ed2a9dca7a60..4aa46c0fc036d 100644 --- a/e2e_test/source_inline/nats/operation.py +++ b/e2e_test/source_inline/nats/operation.py @@ -4,7 +4,7 @@ from nats.js.api import StreamConfig import psycopg2 -NATS_SERVER = "nats://nats-server:4222" +NATS_SERVER = "nats-server:4222" async def create_stream(stream_name: str, subject: str): # Create a NATS client From 9fcb01140bc43e3e18edcd9ea99cc300f554c34e Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 2 Dec 2024 11:48:58 +0800 Subject: [PATCH 12/23] fix --- e2e_test/source_inline/nats/operation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py index 4aa46c0fc036d..4ed2a9dca7a60 100644 --- a/e2e_test/source_inline/nats/operation.py +++ b/e2e_test/source_inline/nats/operation.py @@ -4,7 +4,7 @@ from nats.js.api import StreamConfig import psycopg2 -NATS_SERVER = "nats-server:4222" +NATS_SERVER = "nats://nats-server:4222" async def create_stream(stream_name: str, subject: str): # Create a NATS client From 1a40654399b739f8f07c64e4ae18c959f4943036 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 2 Dec 2024 14:41:41 +0800 Subject: [PATCH 13/23] fix --- ci/docker-compose.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index ec92932c8898d..bee16684585ef 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -332,10 +332,9 @@ services: interval: 10s timeout: 10s retries: 6 - nats: + nats-server: image: nats:latest command: [ "-js" ] - container_name: nats-server ports: - "4222:4222" - "8222:8222" From e7f335b6f78fa58f87ddfa20aee90b57bbfdba42 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 2 Dec 2024 16:50:09 +0800 Subject: [PATCH 14/23] fix --- ci/docker-compose.yml | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index bee16684585ef..32f37d6c319be 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -9,11 +9,12 @@ services: ports: - 5432 healthcheck: - test: [ "CMD-SHELL", "pg_isready -U postgres" ] + test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 5s timeout: 5s retries: 5 - command: [ "postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=30" ] + command: + ["postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=30"] mysql: image: mysql:8.0 @@ -25,7 +26,7 @@ services: - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw healthcheck: - test: [ "CMD-SHELL", "mysqladmin ping -h 127.0.0.1 -u root -p123456" ] + test: ["CMD-SHELL", "mysqladmin ping -h 127.0.0.1 -u root -p123456"] interval: 5s timeout: 5s retries: 5 @@ -78,6 +79,7 @@ services: - mongodb - mongodb-setup - mongo_data_generator + - nats-server volumes: - ..:/risingwave @@ -180,13 +182,13 @@ services: redis-server: container_name: redis-server - image: 'redis:latest' + image: "redis:latest" expose: - 6379 ports: - 6378:6379 healthcheck: - test: [ "CMD", "redis-cli", "ping" ] + test: ["CMD", "redis-cli", "ping"] interval: 5s timeout: 30s retries: 50 @@ -206,7 +208,7 @@ services: - 8030:8030 - 8040:8040 healthcheck: - test: [ "CMD", "curl", "-f", "http://localhost:9030" ] + test: ["CMD", "curl", "-f", "http://localhost:9030"] interval: 5s timeout: 5s retries: 30 @@ -218,8 +220,8 @@ services: ports: - 1433:1433 environment: - ACCEPT_EULA: 'Y' - SA_PASSWORD: 'SomeTestOnly@SA' + ACCEPT_EULA: "Y" + SA_PASSWORD: "SomeTestOnly@SA" MSSQL_AGENT_ENABLED: "true" starrocks-fe-server: @@ -232,7 +234,7 @@ services: - 29020:9020 - 29030:9030 healthcheck: - test: [ "CMD", "curl", "-f", "http://localhost:9030" ] + test: ["CMD", "curl", "-f", "http://localhost:9030"] interval: 5s timeout: 5s retries: 30 @@ -281,7 +283,7 @@ services: - "8080" - "6650" healthcheck: - test: [ "CMD-SHELL", "bin/pulsar-admin brokers healthcheck" ] + test: ["CMD-SHELL", "bin/pulsar-admin brokers healthcheck"] interval: 5s timeout: 5s retries: 5 @@ -303,7 +305,12 @@ services: container_name: mongodb-setup depends_on: - mongodb - entrypoint: [ "bash", "-c", "sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10" ] + entrypoint: + [ + "bash", + "-c", + "sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10", + ] restart: "no" volumes: - ./mongodb/config-replica.js:/config-replica.js @@ -328,18 +335,22 @@ services: ports: - 1883:1883 healthcheck: - test: [ "CMD-SHELL", "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0" ] + test: + [ + "CMD-SHELL", + "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0", + ] interval: 10s timeout: 10s retries: 6 nats-server: image: nats:latest - command: [ "-js" ] + command: ["-js"] ports: - "4222:4222" - "8222:8222" healthcheck: - test: [ "CMD", "curl", "-f", "http://localhost:8222/healthz" ] + test: ["CMD", "curl", "-f", "http://localhost:8222/healthz"] interval: 10s timeout: 5s retries: 3 From c4e13b016ad51bff6112348eb6ef7acbf9477b84 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 2 Dec 2024 17:16:42 +0800 Subject: [PATCH 15/23] fix slt --- e2e_test/source_inline/nats/test.slt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_inline/nats/test.slt b/e2e_test/source_inline/nats/test.slt index fe589a7b0bb23..36d5671a79f4d 100644 --- a/e2e_test/source_inline/nats/test.slt +++ b/e2e_test/source_inline/nats/test.slt @@ -30,7 +30,7 @@ sleep 1s # at least once query T select count(*) >= 100 from t_nats; ---- +---- t system ok @@ -46,7 +46,7 @@ sleep 3s query T select count(*) >= 200 from t_nats; ---- +---- t system ok @@ -60,7 +60,7 @@ python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "te query T select count(*) >= 300 from t_nats; ---- +---- t system ok From a611c72dab4e4cab19f9b4446e13434f7d7ac32e Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 2 Dec 2024 18:14:34 +0800 Subject: [PATCH 16/23] fix --- e2e_test/source_inline/nats/operation.py | 4 ++-- e2e_test/source_inline/nats/test.slt | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py index 4ed2a9dca7a60..640fc50acd4ed 100644 --- a/e2e_test/source_inline/nats/operation.py +++ b/e2e_test/source_inline/nats/operation.py @@ -49,7 +49,7 @@ async def produce_message(stream_name: str, subject: str): await nc.close() -async def consume_message(stream_name: str, subject: str): +async def consume_message(_stream_name: str, subject: str): nc = NATS() await nc.connect(servers=[NATS_SERVER]) js = nc.jetstream() @@ -97,7 +97,7 @@ def validate_state_table_item(table_name: str, expect_count: int): elif command == "produce_stream": asyncio.run(produce_message(stream_name, subject)) elif command == "validate_state": - if len(sys.argv) != 5: + if len(sys.argv) != 4: print("Error: Both table name and expected count are required") sys.exit(1) table_name = sys.argv[2] diff --git a/e2e_test/source_inline/nats/test.slt b/e2e_test/source_inline/nats/test.slt index 36d5671a79f4d..b7566b253ad05 100644 --- a/e2e_test/source_inline/nats/test.slt +++ b/e2e_test/source_inline/nats/test.slt @@ -14,17 +14,20 @@ statement ok set streaming_parallelism to 4; statement ok -create table t_nats (i int ) with ( +create table t_nats ( i int ) with ( connector = 'nats', server_url='nats-server:4222', subject='testsubject', connect_mode='plain', consumer.durable_name = 'demo1', consumer.ack_policy = 'all', - stream='teststream1', + stream='teststream', consumer.max_ack_pending = '100000') format plain encode json; +statement ok +select * from t_nats; + sleep 1s # at least once From 18a1b3365cc6ccc872ef87184d0a43be479f127f Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 2 Dec 2024 18:18:16 +0800 Subject: [PATCH 17/23] fix sql --- e2e_test/source_inline/nats/test.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e_test/source_inline/nats/test.slt b/e2e_test/source_inline/nats/test.slt index b7566b253ad05..c3c58e377cb60 100644 --- a/e2e_test/source_inline/nats/test.slt +++ b/e2e_test/source_inline/nats/test.slt @@ -40,7 +40,7 @@ system ok python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 4 statement ok -alter table t_nats set streaming_parallelism to 6; +alter table t_nats set PARALLELISM to 6; system ok python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" @@ -56,7 +56,7 @@ system ok python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 6 statement ok -alter table t_nats set streaming_parallelism to 2; +alter table t_nats set PARALLELISM to 2; system ok python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" From 6b911b489a81d9f6b2238d73544c391ed9a33f73 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 2 Dec 2024 23:54:56 +0800 Subject: [PATCH 18/23] fix Signed-off-by: tabversion --- e2e_test/source_inline/nats/operation.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py index 640fc50acd4ed..e79bb11b7d96b 100644 --- a/e2e_test/source_inline/nats/operation.py +++ b/e2e_test/source_inline/nats/operation.py @@ -70,13 +70,15 @@ def validate_state_table_item(table_name: str, expect_count: int): ) # query for the internal table name and make sure it exists with conn.cursor() as cursor: - cursor.execute(f"select name from rw_catalog.rw_internal_tables where name ILIKE '%{table_name}%'") + cursor.execute(f"select name from rw_internal_table_info where job_name = '{table_name}'") results = cursor.fetchall() assert len(results) == 1, f"Expected exactly one internal table matching {table_name}, found {len(results)}" internal_table_name = results[0][0] print(f"Found internal table: {internal_table_name}") cursor.execute(f"SELECT * FROM {internal_table_name}") - assert cursor.rowcount == expect_count + results = cursor.fetchall() + print(f"Get items from state table: {results}") + assert len(results) == expect_count if __name__ == "__main__": import sys From 9ceb23712aa15024a5337e778a2a206cf56b359a Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 3 Dec 2024 11:09:16 +0800 Subject: [PATCH 19/23] fix --- e2e_test/source_inline/nats/{test.slt => test.slt.serial} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename e2e_test/source_inline/nats/{test.slt => test.slt.serial} (100%) diff --git a/e2e_test/source_inline/nats/test.slt b/e2e_test/source_inline/nats/test.slt.serial similarity index 100% rename from e2e_test/source_inline/nats/test.slt rename to e2e_test/source_inline/nats/test.slt.serial From da01ec25313e1ae5ef5fadac2020af1f8bf77e73 Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 3 Dec 2024 11:40:39 +0800 Subject: [PATCH 20/23] more sleep --- e2e_test/source_inline/nats/test.slt.serial | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/e2e_test/source_inline/nats/test.slt.serial b/e2e_test/source_inline/nats/test.slt.serial index c3c58e377cb60..628041f21d61c 100644 --- a/e2e_test/source_inline/nats/test.slt.serial +++ b/e2e_test/source_inline/nats/test.slt.serial @@ -28,7 +28,7 @@ format plain encode json; statement ok select * from t_nats; -sleep 1s +sleep 3s # at least once query T @@ -61,6 +61,8 @@ alter table t_nats set PARALLELISM to 2; system ok python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject" +sleep 3s + query T select count(*) >= 300 from t_nats; ---- From 57381099544b623e31a4cb8b7e7cbe94050fc008 Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 3 Dec 2024 11:41:44 +0800 Subject: [PATCH 21/23] add flush --- e2e_test/source_inline/nats/test.slt.serial | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/e2e_test/source_inline/nats/test.slt.serial b/e2e_test/source_inline/nats/test.slt.serial index 628041f21d61c..3f4227a0d05e5 100644 --- a/e2e_test/source_inline/nats/test.slt.serial +++ b/e2e_test/source_inline/nats/test.slt.serial @@ -30,6 +30,9 @@ select * from t_nats; sleep 3s +statement ok +flush; + # at least once query T select count(*) >= 100 from t_nats; @@ -47,6 +50,9 @@ python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "te sleep 3s +statement ok +flush; + query T select count(*) >= 200 from t_nats; ---- @@ -63,6 +69,9 @@ python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "te sleep 3s +statement ok +flush; + query T select count(*) >= 300 from t_nats; ---- From 85b9801173a11a4b48b73fac99514e6a90d96e54 Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 3 Dec 2024 12:17:22 +0800 Subject: [PATCH 22/23] retry --- e2e_test/source_inline/nats/operation.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py index e79bb11b7d96b..27c6f1778463f 100644 --- a/e2e_test/source_inline/nats/operation.py +++ b/e2e_test/source_inline/nats/operation.py @@ -75,10 +75,15 @@ def validate_state_table_item(table_name: str, expect_count: int): assert len(results) == 1, f"Expected exactly one internal table matching {table_name}, found {len(results)}" internal_table_name = results[0][0] print(f"Found internal table: {internal_table_name}") - cursor.execute(f"SELECT * FROM {internal_table_name}") - results = cursor.fetchall() - print(f"Get items from state table: {results}") - assert len(results) == expect_count + for _ in range(10): + cursor.execute(f"SELECT * FROM {internal_table_name}") + results = cursor.fetchall() + print(f"Get items from state table: {results}") + if len(results) == expect_count: + print(f"Found {expect_count} items in {internal_table_name}") + break + print(f"Waiting for {internal_table_name} to have {expect_count} items, got {len(results)}. Retry...") + time.sleep(0.5) if __name__ == "__main__": import sys From 3104690f034110413d99357b09444f0918c865cf Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 3 Dec 2024 12:38:12 +0800 Subject: [PATCH 23/23] fix --- e2e_test/source_inline/nats/operation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/e2e_test/source_inline/nats/operation.py b/e2e_test/source_inline/nats/operation.py index 27c6f1778463f..5f0250c3404aa 100644 --- a/e2e_test/source_inline/nats/operation.py +++ b/e2e_test/source_inline/nats/operation.py @@ -1,4 +1,5 @@ import asyncio +import time import json from nats.aio.client import Client as NATS from nats.js.api import StreamConfig