Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make Nats jetstream Source work in parallel #19529

Merged
merged 30 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ services:
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
healthcheck:
test:
[
"CMD-SHELL",
"mysqladmin ping -h 127.0.0.1 -u root -p123456"
]
test: [ "CMD-SHELL", "mysqladmin ping -h 127.0.0.1 -u root -p123456" ]
interval: 5s
timeout: 5s
retries: 5
Expand Down Expand Up @@ -107,7 +103,6 @@ services:
volumes:
- ..:/risingwave


rw-build-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241030
volumes:
Expand Down Expand Up @@ -191,7 +186,7 @@ services:
ports:
- 6378:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
test: [ "CMD", "redis-cli", "ping" ]
interval: 5s
timeout: 30s
retries: 50
Expand All @@ -211,7 +206,7 @@ services:
- 8030:8030
- 8040:8040
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9030"]
test: [ "CMD", "curl", "-f", "http://localhost:9030" ]
interval: 5s
timeout: 5s
retries: 30
Expand All @@ -231,14 +226,13 @@ services:
container_name: starrocks-fe-server
image: starrocks/fe-ubuntu:3.1.7
hostname: starrocks-fe-server
command:
/opt/starrocks/fe/bin/start_fe.sh
command: /opt/starrocks/fe/bin/start_fe.sh
ports:
- 28030:8030
- 29020:9020
- 29030:9030
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9030"]
test: [ "CMD", "curl", "-f", "http://localhost:9030" ]
interval: 5s
timeout: 5s
retries: 30
Expand All @@ -259,10 +253,10 @@ services:
depends_on:
- starrocks-fe-server

# # Temporary workaround for json schema registry test since redpanda only supports
# # protobuf/avro schema registry. Should be removed after the support.
# # Related tracking issue:
# # https://github.com/redpanda-data/redpanda/issues/1878
# # Temporary workaround for json schema registry test since redpanda only supports
# # protobuf/avro schema registry. Should be removed after the support.
# # Related tracking issue:
# # https://github.com/redpanda-data/redpanda/issues/1878
schemaregistry:
container_name: schemaregistry
hostname: schemaregistry
Expand All @@ -287,7 +281,7 @@ services:
- "8080"
- "6650"
healthcheck:
test: [ "CMD-SHELL", "bin/pulsar-admin brokers healthcheck"]
test: [ "CMD-SHELL", "bin/pulsar-admin brokers healthcheck" ]
interval: 5s
timeout: 5s
retries: 5
Expand All @@ -309,37 +303,43 @@ services:
container_name: mongodb-setup
depends_on:
- mongodb
entrypoint:
[
"bash",
"-c",
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10"
]
entrypoint: [ "bash", "-c", "sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10" ]
restart: "no"
volumes:
- ./mongodb/config-replica.js:/config-replica.js

mongo_data_generator:
build:
context: .
dockerfile: ./mongodb/Dockerfile.generator
container_name: mongo_data_generator
depends_on:
- mongodb
environment:
MONGO_HOST: mongodb
MONGO_PORT: 27017
MONGO_DB_NAME: random_data
build:
context: .
dockerfile: ./mongodb/Dockerfile.generator
container_name: mongo_data_generator
depends_on:
- mongodb
environment:
MONGO_HOST: mongodb
MONGO_PORT: 27017
MONGO_DB_NAME: random_data
mqtt-server:
image: eclipse-mosquitto
command:
- sh
- -c
- echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf
- sh
- -c
- echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf
ports:
- 1883:1883
healthcheck:
test: ["CMD-SHELL", "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0"]
test: [ "CMD-SHELL", "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0" ]
interval: 10s
timeout: 10s
retries: 6
nats:
image: nats:latest
command: [ "-js" ]
ports:
- "4222:4222"
- "8222:8222"
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:8222/healthz" ]
interval: 10s
timeout: 5s
retries: 3
2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mkdir ./connector-node
tar xf ./risingwave-connector.tar.gz -C ./connector-node

echo "--- Install dependencies"
python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema requests
python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema nats-py requests
apt-get -y install jq

echo "--- e2e, inline test"
Expand Down
107 changes: 107 additions & 0 deletions e2e_test/source_inline/nats/operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import asyncio
import json
from nats.aio.client import Client as NATS
from nats.js.api import StreamConfig
import psycopg2

NATS_SERVER = "nats://localhost:4222"

async def create_stream(stream_name: str, subject: str):
# Create a NATS client
nc = NATS()

try:
# Connect to the NATS server
await nc.connect(servers=[NATS_SERVER])

# Enable JetStream
js = nc.jetstream()
stream_config = StreamConfig(
name=stream_name,
subjects=[subject],
retention="limits", # Retention policy (limits, interest, or workqueue)
max_msgs=1000, # Maximum number of messages to retain
max_bytes=10 * 1024 * 1024, # Maximum size of messages in bytes
max_age=0, # Maximum age of messages (0 means unlimited)
storage="file", # Storage type (file or memory)
)

# Add the stream
await js.add_stream(stream_config)
print(f"Stream '{stream_name}' added successfully with subject '{subject}'.")

except Exception as e:
print(f"Error: {e}")

finally:
# Close the connection
await nc.close()
print("Disconnected from NATS server.")

async def produce_message(stream_name: str, subject: str):
nc = NATS()
await nc.connect(servers=[NATS_SERVER])
js = nc.jetstream()
for i in range(100):
payload = {"i": i}
await js.publish(subject, str.encode(json.dumps(payload)))

await nc.close()

async def consume_message(stream_name: str, subject: str):
nc = NATS()
await nc.connect(servers=[NATS_SERVER])
js = nc.jetstream()
consumer = await js.pull_subscribe(subject)
for i in range(100):
msgs = await consumer.fetch(1)
for msg in msgs:
print(msg.data)
await msg.ack()

def validate_state_table_item(table_name: str, expect_count: int):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)
# query for the internal table name and make sure it exists
with conn.cursor() as cursor:
cursor.execute(f"select name from rw_catalog.rw_internal_tables where name ILIKE '%{table_name}%'")
results = cursor.fetchall()
assert len(results) == 1, f"Expected exactly one internal table matching {table_name}, found {len(results)}"
internal_table_name = results[0][0]
print(f"Found internal table: {internal_table_name}")
cursor.execute(f"SELECT * FROM {internal_table_name}")
assert cursor.rowcount == expect_count

if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python operation.py <command> <stream_name> <subject> [table_name] [expect_count]")
sys.exit(1)

command = sys.argv[1]
if command in ["create_stream", "produce_stream"]:
if len(sys.argv) != 4:
print("Error: Both stream name and subject are required")
sys.exit(1)
stream_name = sys.argv[2]
subject = sys.argv[3]

if command == "create_stream":
asyncio.run(create_stream(stream_name, subject))
elif command == "produce_stream":
asyncio.run(produce_message(stream_name, subject))
elif command == "validate_state":
if len(sys.argv) != 5:
print("Error: Both table name and expected count are required")
sys.exit(1)
table_name = sys.argv[2]
expect_count = int(sys.argv[3])
validate_state_table_item(table_name, expect_count)
else:
print(f"Unknown command: {command}")
print("Supported commands: create_stream, produce_stream, validate_state")
sys.exit(1)
74 changes: 74 additions & 0 deletions e2e_test/source_inline/nats/test.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
control substitution on

statement ok
set streaming_use_shared_source to false;

system ok
python e2e_test/source_inline/nats/operation.py create_stream "teststream" "testsubject"

# produce 100 message of format `{"i": $i}` to the stream
system ok
python e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject"

statement ok
set streaming_parallelism to 4;

statement ok
create table t_nats (i int ) with (
connector = 'nats',
server_url='nats:4222',
subject='testsubject',
connect_mode='plain',
consumer.durable_name = 'demo1',
consumer.ack_policy = 'all',
stream='teststream1',
consumer.max_ack_pending = '100000')
format plain encode json;

sleep 1s

# at least once
query T
select count(*) >= 100 from t_nats;
---
t

system ok
python e2e_test/source_inline/nats/operation.py validate_state "t_nats" 4

statement ok
alter table t_nats set streaming_parallelism to 6;

system ok
python e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject"

sleep 3s

query T
select count(*) >= 200 from t_nats;
---
t

system ok
python e2e_test/source_inline/nats/operation.py validate_state "t_nats" 6

statement ok
alter table t_nats set streaming_parallelism to 2;

system ok
python e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject"

query T
select count(*) >= 300 from t_nats;
---
t

system ok
python e2e_test/source_inline/nats/operation.py validate_state "t_nats" 2


statement ok
drop table t_nats;

statement ok
set streaming_use_shared_source to true;
12 changes: 10 additions & 2 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,17 @@ impl ConnectorProperties {
)
}

pub fn enable_split_scale_in(&self) -> bool {
pub fn enable_drop_split(&self) -> bool {
// enable split scale in just for Kinesis
matches!(self, ConnectorProperties::Kinesis(_))
matches!(
self,
ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
)
}

/// For most connectors, this should be false. When enabled, RisingWave should not track any progress.
pub fn enable_adaptive_splits(&self) -> bool {
matches!(self, ConnectorProperties::Nats(_))
}

/// Load additional info from `PbSource`. Currently only used by CDC.
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod mqtt;
pub mod nats;
pub mod nexmark;
pub mod pulsar;
mod util;

use std::future::IntoFuture;

Expand All @@ -47,6 +48,7 @@ use async_nats::jetstream::context::Context as JetStreamContext;
pub use manager::{SourceColumnDesc, SourceColumnType};
use risingwave_common::array::{Array, ArrayRef};
use thiserror_ext::AsReport;
pub use util::fill_adaptive_split;

pub use crate::source::filesystem::opendal_source::{
AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
Expand Down
Loading