Skip to content

Commit

Permalink
Merge branch 'main' into yiming/snapshot-backfill-executor-backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 17, 2024
2 parents 80df807 + fb1edb6 commit 72274be
Show file tree
Hide file tree
Showing 158 changed files with 1,478 additions and 879 deletions.
2 changes: 1 addition & 1 deletion .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ header:
- "src/sqlparser/**/*.rs"
- "java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/*.java"
- "java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/**/*.java"
- "src/meta/model_v2/migration/**/*.rs"
- "src/meta/model/migration/**/*.rs"
- "lints/ui/**"

comment: on-failure
26 changes: 13 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ members = [
"src/license",
"src/meta",
"src/meta/dashboard",
"src/meta/model_v2",
"src/meta/model_v2/migration",
"src/meta/model",
"src/meta/model/migration",
"src/meta/node",
"src/meta/service",
"src/object_store",
Expand Down Expand Up @@ -228,8 +228,8 @@ risingwave_mem_table_spill_test = { path = "./src/stream/spill_test" }
risingwave_meta = { path = "./src/meta" }
risingwave_meta_dashboard = { path = "./src/meta/dashboard" }
risingwave_meta_service = { path = "./src/meta/service" }
risingwave_meta_model_migration = { path = "src/meta/model_v2/migration" }
risingwave_meta_model_v2 = { path = "./src/meta/model_v2" }
risingwave_meta_model = { path = "src/meta/model" }
risingwave_meta_model_migration = { path = "src/meta/model/migration" }
risingwave_meta_node = { path = "./src/meta/node" }
risingwave_object_store = { path = "./src/object_store" }
risingwave_pb = { path = "./src/prost" }
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/backwards-compat-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ setup_old_cluster() {
echo "--- Get RisingWave binary for $OLD_VERSION"
OLD_URL=https://github.com/risingwavelabs/risingwave/releases/download/v${OLD_VERSION}/risingwave-v${OLD_VERSION}-x86_64-unknown-linux.tar.gz
set +e
wget "$OLD_URL"
wget --no-verbose "$OLD_URL"
if [[ "$?" -ne 0 ]]; then
set -e
echo "Failed to download ${OLD_VERSION} from github releases, build from source later during \`risedev d\`"
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/connector-node-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ sudo -u postgres psql -d test -c "CREATE TABLE test (id serial PRIMARY KEY, name

echo "--- starting minio"
echo "setting up minio"
wget https://dl.minio.io/server/minio/release/linux-amd64/minio > /dev/null
wget --no-verbose https://dl.minio.io/server/minio/release/linux-amd64/minio > /dev/null
chmod +x minio
sudo ./minio server /tmp/minio &
# wait for minio to start
sleep 3
wget https://dl.minio.io/client/mc/release/linux-amd64/mc > /dev/null
wget --no-verbose https://dl.minio.io/client/mc/release/linux-amd64/mc > /dev/null
chmod +x mc
MC_PATH=${PWD}/mc
${MC_PATH} config host add minio http://127.0.0.1:9000 minioadmin minioadmin
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/e2e-cassandra-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ risedev ci-start ci-sink-test
sleep 40

echo "--- install cassandra"
wget $(get_latest_cassandra_download_url) -O cassandra_latest.tar.gz
wget --no-verbose $(get_latest_cassandra_download_url) -O cassandra_latest.tar.gz
tar xfvz cassandra_latest.tar.gz
export LATEST_CASSANDRA_VERSION=$(get_latest_cassandra_version)
export CASSANDRA_DIR="./apache-cassandra-${LATEST_CASSANDRA_VERSION}"
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/e2e-deltalake-sink-rust-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ sleep 1
# prepare minio deltalake sink
echo "--- preparing deltalake"
.risingwave/bin/mcli -C .risingwave/config/mcli mb hummock-minio/deltalake
wget https://rw-ci-deps-dist.s3.amazonaws.com/spark-3.3.1-bin-hadoop3.tgz
wget --no-verbose https://rw-ci-deps-dist.s3.amazonaws.com/spark-3.3.1-bin-hadoop3.tgz
tar -xf spark-3.3.1-bin-hadoop3.tgz --no-same-owner
DEPENDENCIES=io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2
spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/e2e-iceberg-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ sleep 1
# prepare minio iceberg sink
echo "--- preparing iceberg"
.risingwave/bin/mcli -C .risingwave/config/mcli mb hummock-minio/iceberg
wget https://rw-ci-deps-dist.s3.amazonaws.com/spark-3.3.1-bin-hadoop3.tgz
wget --no-verbose https://rw-ci-deps-dist.s3.amazonaws.com/spark-3.3.1-bin-hadoop3.tgz
tar -xf spark-3.3.1-bin-hadoop3.tgz --no-same-owner
DEPENDENCIES=org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,org.apache.hadoop:hadoop-aws:3.3.2
spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/e2e-mongodb-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ cargo make ci-start ci-sink-test
sleep 1

# install the mongo shell
wget http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
wget https://repo.mongodb.org/apt/ubuntu/dists/focal/mongodb-org/4.4/multiverse/binary-amd64/mongodb-org-shell_4.4.28_amd64.deb
wget --no-verbose http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
wget --no-verbose https://repo.mongodb.org/apt/ubuntu/dists/focal/mongodb-org/4.4/multiverse/binary-amd64/mongodb-org-shell_4.4.28_amd64.deb
dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb
dpkg -i mongodb-org-shell_4.4.28_amd64.deb

Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ sleep 2

echo "--- mongodb cdc test"
# install the mongo shell
wget http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
wget https://repo.mongodb.org/apt/ubuntu/dists/focal/mongodb-org/4.4/multiverse/binary-amd64/mongodb-org-shell_4.4.28_amd64.deb
wget --no-verbose http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
wget --no-verbose https://repo.mongodb.org/apt/ubuntu/dists/focal/mongodb-org/4.4/multiverse/binary-amd64/mongodb-org-shell_4.4.28_amd64.deb
dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb
dpkg -i mongodb-org-shell_4.4.28_amd64.deb

Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dnf install -y perl-core wget python3 python3-devel cyrus-sasl-devel rsync opens
echo "--- Install java and maven"
dnf install -y java-17-openjdk java-17-openjdk-devel
pip3 install toml-cli
wget https://rw-ci-deps-dist.s3.amazonaws.com/apache-maven-3.9.3-bin.tar.gz && tar -zxvf apache-maven-3.9.3-bin.tar.gz
wget --no-verbose https://rw-ci-deps-dist.s3.amazonaws.com/apache-maven-3.9.3-bin.tar.gz && tar -zxvf apache-maven-3.9.3-bin.tar.gz
export PATH="${REPO_ROOT}/apache-maven-3.9.3/bin:$PATH"
mvn -v

Expand Down
22 changes: 22 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,28 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3 sink batching test"
key: "s3-sink-batching-test"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_sink_batch.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3 source batch read on AWS (json parser)"
key: "s3-v2-source-batch-read-check-aws-json-parser"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_batch.py -t json"
Expand Down
142 changes: 142 additions & 0 deletions e2e_test/s3/fs_sink_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import os
import sys
import random
import psycopg2
import json
import time
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
from datetime import datetime, timezone
from time import sleep
from minio import Minio
from random import uniform

def do_test(config, file_num, item_num_per_file, prefix):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()


# Execute a SELECT statement
cur.execute(f'''CREATE TABLE t (v1 int, v2 int);''')

print('create sink')
cur.execute(f'''CREATE sink test_file_sink_batching as select
v1, v2 from t WITH (
connector = 's3',
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
s3.path = 'test_sink/',
s3.file_type = 'parquet',
type = 'append-only',
rollover_seconds = 5,
max_row_count = 5,
force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''')

cur.execute(f'''CREATE TABLE test_sink_table(
v1 int,
v2 int,
) WITH (
connector = 's3',
match_pattern = 'test_sink/*.parquet',
refresh.interval.sec = 1,
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
) FORMAT PLAIN ENCODE PARQUET;''')

cur.execute(f'''ALTER SINK test_file_sink_batching SET PARALLELISM = 2;''')

cur.execute(f'''INSERT INTO t VALUES (10, 10);''')


cur.execute(f'select count(*) from test_sink_table')
# no item will be selectedpsq
result = cur.fetchone()

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'
def _assert_greater(field, got, expect):
assert got > expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], 0)
print('the rollover_seconds has not reached, count(*) = 0')


time.sleep(11)

cur.execute(f'select count(*) from test_sink_table')
result = cur.fetchone()
_assert_eq('count(*)', result[0], 1)
print('the rollover_seconds has reached, count(*) = ', result[0])

cur.execute(f'''
INSERT INTO t VALUES (20, 20);
INSERT INTO t VALUES (30, 30);
INSERT INTO t VALUES (40, 40);
INSERT INTO t VALUES (50, 10);
''')

cur.execute(f'select count(*) from test_sink_table')
# count(*) = 1
result = cur.fetchone()
_assert_eq('count(*)', result[0], 1)
print('the max row count has not reached, count(*) = ', result[0])

cur.execute(f'''
INSERT INTO t VALUES (60, 20);
INSERT INTO t VALUES (70, 30);
INSERT INTO t VALUES (80, 10);
INSERT INTO t VALUES (90, 20);
INSERT INTO t VALUES (100, 30);
INSERT INTO t VALUES (100, 10);
''')

time.sleep(10)

cur.execute(f'select count(*) from test_sink_table')
result = cur.fetchone()
_assert_greater('count(*)', result[0], 1)
print('the rollover_seconds has reached, count(*) = ', result[0])

cur.execute(f'drop sink test_file_sink_batching;')
cur.execute(f'drop table t;')
cur.execute(f'drop table test_sink_table;')
cur.close()
conn.close()

if __name__ == "__main__":
FILE_NUM = 10
ITEM_NUM_PER_FILE = 2000

config = json.loads(os.environ["S3_SOURCE_TEST_CONF"])
client = Minio(
"127.0.0.1:9301",
"hummockadmin",
"hummockadmin",
secure=False,
)
run_id = str(random.randint(1000, 9999))
_local = lambda idx: f'data_{idx}.parquet'
_s3 = lambda idx: f"{run_id}_data_{idx}.parquet"

do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)

objects = client.list_objects("hummock001", prefix="test_sink/", recursive=True)

for obj in objects:
client.remove_object("hummock001", obj.object_name)
print(f"Deleted: {obj.object_name}")
26 changes: 26 additions & 0 deletions e2e_test/sink/append_only_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,32 @@ create sink invalid_force_append_only from t with (connector = 'blackhole', forc
statement error unsupported sink type invalid
create sink invalid_connector from t with (connector = 'invalid');

statement ok
set sink_decouple=false;

statement error
CREATE SINK file_sink
FROM
t
WITH
(
connector = 's3',
s3.region_name = 'test',
s3.bucket_name = 'test',
s3.path = '',
s3.file_type = 'parquet',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Sink error
2: config error
3: File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.


statement ok
drop sink s1

Expand Down
Loading

0 comments on commit 72274be

Please sign in to comment.