Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make Nats jetstream Source work in parallel #19529

Merged
merged 30 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 44 additions & 33 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ services:
ports:
- 5432
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U postgres" ]
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
command: [ "postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=30" ]
command:
["postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=30"]

mysql:
image: mysql:8.0
Expand All @@ -25,11 +26,7 @@ services:
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
healthcheck:
test:
[
"CMD-SHELL",
"mysqladmin ping -h 127.0.0.1 -u root -p123456"
]
test: ["CMD-SHELL", "mysqladmin ping -h 127.0.0.1 -u root -p123456"]
interval: 5s
timeout: 5s
retries: 5
Expand Down Expand Up @@ -82,6 +79,7 @@ services:
- mongodb
- mongodb-setup
- mongo_data_generator
- nats-server
volumes:
- ..:/risingwave

Expand All @@ -107,7 +105,6 @@ services:
volumes:
- ..:/risingwave


rw-build-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241030
volumes:
Expand Down Expand Up @@ -185,7 +182,7 @@ services:

redis-server:
container_name: redis-server
image: 'redis:latest'
image: "redis:latest"
expose:
- 6379
ports:
Expand Down Expand Up @@ -223,16 +220,15 @@ services:
ports:
- 1433:1433
environment:
ACCEPT_EULA: 'Y'
SA_PASSWORD: 'SomeTestOnly@SA'
ACCEPT_EULA: "Y"
SA_PASSWORD: "SomeTestOnly@SA"
MSSQL_AGENT_ENABLED: "true"

starrocks-fe-server:
container_name: starrocks-fe-server
image: starrocks/fe-ubuntu:3.1.7
hostname: starrocks-fe-server
command:
/opt/starrocks/fe/bin/start_fe.sh
command: /opt/starrocks/fe/bin/start_fe.sh
ports:
- 28030:8030
- 29020:9020
Expand All @@ -259,10 +255,10 @@ services:
depends_on:
- starrocks-fe-server

# # Temporary workaround for json schema registry test since redpanda only supports
# # protobuf/avro schema registry. Should be removed after the support.
# # Related tracking issue:
# # https://github.com/redpanda-data/redpanda/issues/1878
# # Temporary workaround for json schema registry test since redpanda only supports
# # protobuf/avro schema registry. Should be removed after the support.
# # Related tracking issue:
# # https://github.com/redpanda-data/redpanda/issues/1878
schemaregistry:
container_name: schemaregistry
hostname: schemaregistry
Expand All @@ -287,7 +283,7 @@ services:
- "8080"
- "6650"
healthcheck:
test: [ "CMD-SHELL", "bin/pulsar-admin brokers healthcheck"]
test: ["CMD-SHELL", "bin/pulsar-admin brokers healthcheck"]
interval: 5s
timeout: 5s
retries: 5
Expand All @@ -313,33 +309,48 @@ services:
[
"bash",
"-c",
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10"
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10",
]
restart: "no"
volumes:
- ./mongodb/config-replica.js:/config-replica.js

mongo_data_generator:
build:
context: .
dockerfile: ./mongodb/Dockerfile.generator
container_name: mongo_data_generator
depends_on:
- mongodb
environment:
MONGO_HOST: mongodb
MONGO_PORT: 27017
MONGO_DB_NAME: random_data
build:
context: .
dockerfile: ./mongodb/Dockerfile.generator
container_name: mongo_data_generator
depends_on:
- mongodb
environment:
MONGO_HOST: mongodb
MONGO_PORT: 27017
MONGO_DB_NAME: random_data
mqtt-server:
image: eclipse-mosquitto
command:
- sh
- -c
- echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf
- sh
- -c
- echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf
ports:
- 1883:1883
healthcheck:
test: ["CMD-SHELL", "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0"]
test:
[
"CMD-SHELL",
"(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0",
]
interval: 10s
timeout: 10s
retries: 6
nats-server:
image: nats:latest
command: ["-js"]
ports:
- "4222:4222"
- "8222:8222"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8222/healthz"]
interval: 10s
timeout: 5s
retries: 3
2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mkdir ./connector-node
tar xf ./risingwave-connector.tar.gz -C ./connector-node

echo "--- Install dependencies"
python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema requests
python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema nats-py requests psycopg2-binary
apt-get -y install jq

echo "--- e2e, inline test"
Expand Down
117 changes: 117 additions & 0 deletions e2e_test/source_inline/nats/operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import asyncio
import time
import json
from nats.aio.client import Client as NATS
from nats.js.api import StreamConfig
import psycopg2

NATS_SERVER = "nats://nats-server:4222"

async def create_stream(stream_name: str, subject: str):
# Create a NATS client
nc = NATS()

try:
# Connect to the NATS server
await nc.connect(servers=[NATS_SERVER])

# Enable JetStream
js = nc.jetstream()
stream_config = StreamConfig(
name=stream_name,
subjects=[subject],
retention="limits", # Retention policy (limits, interest, or workqueue)
max_msgs=1000, # Maximum number of messages to retain
max_bytes=10 * 1024 * 1024, # Maximum size of messages in bytes
max_age=0, # Maximum age of messages (0 means unlimited)
storage="file", # Storage type (file or memory)
)

# Add the stream
await js.add_stream(stream_config)
print(f"Stream '{stream_name}' added successfully with subject '{subject}'.")

except Exception as e:
print(f"Error: {e}")

finally:
# Close the connection
await nc.close()
print("Disconnected from NATS server.")

async def produce_message(stream_name: str, subject: str):
nc = NATS()
await nc.connect(servers=[NATS_SERVER])
js = nc.jetstream()
for i in range(100):
payload = {"i": i}
await js.publish(subject, str.encode(json.dumps(payload)))

await nc.close()


async def consume_message(_stream_name: str, subject: str):
nc = NATS()
await nc.connect(servers=[NATS_SERVER])
js = nc.jetstream()
consumer = await js.pull_subscribe(subject)
for i in range(100):
msgs = await consumer.fetch(1)
for msg in msgs:
print(msg.data)
await msg.ack()


def validate_state_table_item(table_name: str, expect_count: int):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)
# query for the internal table name and make sure it exists
with conn.cursor() as cursor:
cursor.execute(f"select name from rw_internal_table_info where job_name = '{table_name}'")
results = cursor.fetchall()
assert len(results) == 1, f"Expected exactly one internal table matching {table_name}, found {len(results)}"
internal_table_name = results[0][0]
print(f"Found internal table: {internal_table_name}")
for _ in range(10):
cursor.execute(f"SELECT * FROM {internal_table_name}")
results = cursor.fetchall()
print(f"Get items from state table: {results}")
if len(results) == expect_count:
print(f"Found {expect_count} items in {internal_table_name}")
break
print(f"Waiting for {internal_table_name} to have {expect_count} items, got {len(results)}. Retry...")
time.sleep(0.5)

if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python operation.py <command> <stream_name> <subject> [table_name] [expect_count]")
sys.exit(1)

command = sys.argv[1]
if command in ["create_stream", "produce_stream"]:
if len(sys.argv) != 4:
print("Error: Both stream name and subject are required")
sys.exit(1)
stream_name = sys.argv[2]
subject = sys.argv[3]

if command == "create_stream":
asyncio.run(create_stream(stream_name, subject))
elif command == "produce_stream":
asyncio.run(produce_message(stream_name, subject))
elif command == "validate_state":
if len(sys.argv) != 4:
print("Error: Both table name and expected count are required")
sys.exit(1)
table_name = sys.argv[2]
expect_count = int(sys.argv[3])
validate_state_table_item(table_name, expect_count)
else:
print(f"Unknown command: {command}")
print("Supported commands: create_stream, produce_stream, validate_state")
sys.exit(1)
88 changes: 88 additions & 0 deletions e2e_test/source_inline/nats/test.slt.serial
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
control substitution on

statement ok
set streaming_use_shared_source to false;

system ok
python3 e2e_test/source_inline/nats/operation.py create_stream "teststream" "testsubject"

# produce 100 message of format `{"i": $i}` to the stream
system ok
python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject"

statement ok
set streaming_parallelism to 4;

statement ok
create table t_nats ( i int ) with (
connector = 'nats',
server_url='nats-server:4222',
subject='testsubject',
connect_mode='plain',
consumer.durable_name = 'demo1',
consumer.ack_policy = 'all',
stream='teststream',
consumer.max_ack_pending = '100000')
format plain encode json;

statement ok
select * from t_nats;

sleep 3s

statement ok
flush;

# at least once
query T
select count(*) >= 100 from t_nats;
----
t

system ok
python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 4

statement ok
alter table t_nats set PARALLELISM to 6;

system ok
python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject"

sleep 3s

statement ok
flush;

query T
select count(*) >= 200 from t_nats;
----
t

system ok
python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 6

statement ok
alter table t_nats set PARALLELISM to 2;

system ok
python3 e2e_test/source_inline/nats/operation.py produce_stream "teststream" "testsubject"

sleep 3s

statement ok
flush;

query T
select count(*) >= 300 from t_nats;
----
t

system ok
python3 e2e_test/source_inline/nats/operation.py validate_state "t_nats" 2


statement ok
drop table t_nats;

statement ok
set streaming_use_shared_source to true;
12 changes: 10 additions & 2 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,17 @@ impl ConnectorProperties {
)
}

pub fn enable_split_scale_in(&self) -> bool {
pub fn enable_drop_split(&self) -> bool {
// enable split scale in just for Kinesis
matches!(self, ConnectorProperties::Kinesis(_))
matches!(
self,
ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
)
}

/// For most connectors, this should be false. When enabled, RisingWave should not track any progress.
pub fn enable_adaptive_splits(&self) -> bool {
matches!(self, ConnectorProperties::Nats(_))
}

/// Load additional info from `PbSource`. Currently only used by CDC.
Expand Down
Loading
Loading