Skip to content

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed Jul 12, 2024
2 parents 91fd294 + 5c52c7d commit 5224b1a
Show file tree
Hide file tree
Showing 167 changed files with 4,813 additions and 954 deletions.
30 changes: 28 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ aws-config = { version = "1", default-features = false, features = [
aws-credential-types = { version = "1", default-features = false, features = [
"hardcoded-credentials",
] }
aws-sdk-glue = "1"
aws-sdk-kinesis = { version = "1", default-features = false, features = [
"rt-tokio",
"rustls",
Expand Down Expand Up @@ -334,8 +335,6 @@ tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88" }
futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" }
# patch: unlimit 4MB message size for grpc client
etcd-client = { git = "https://github.com/risingwavelabs/etcd-client.git", rev = "4e84d40" }
# patch to remove preserve_order from serde_json
deno_core = { git = "https://github.com/bakjos/deno_core", rev = "9b241c6" }
# patch to user reqwest 0.12.2
Expand Down
4 changes: 2 additions & 2 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -713,10 +713,10 @@ if ! ${TMUX} ls &>/dev/null ; then
exit 0
fi
# Kill other components with Ctrl+C/Ctrl+D
# Kill tmux components with Ctrl+C
${TMUX} list-windows -F "#{window_name} #{pane_id}" \
| awk '{ print $2 }' \
| xargs -I {} ${TMUX} send-keys -t {} C-c C-d
| xargs -I {} ${TMUX} send-keys -t {} C-c
# Stop docker components
containers=$(docker ps -a -q -f name=risedev- 2>/dev/null) || true
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/e2e-sqlserver-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ if [[ ${#actual[@]} -eq ${#expected[@]} && ${actual[@]} == ${expected[@]} ]]; th
else
cat ./query_result.txt
echo "The output is not as expected."
exit 1
fi

echo "--- Kill cluster"
Expand Down
43 changes: 43 additions & 0 deletions ci/scripts/e2e-time-travel-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.sh

while getopts 'p:' opt; do
case ${opt} in
p )
profile=$OPTARG
;;
\? )
echo "Invalid Option: -$OPTARG" 1>&2
exit 1
;;
: )
echo "Invalid option: $OPTARG requires an argument" 1>&2
;;
esac
done
shift $((OPTIND -1))

sudo apt install sqlite3 -y
download_and_prepare_rw "$profile" common

echo "--- starting risingwave cluster"
risedev ci-start ci-time-travel
sleep 1

sqllogictest -p 4566 -d dev './e2e_test/time_travel/*.slt'

echo "--- verify time travel metadata"
sleep 30 # ensure another time travel version snapshot has been taken
version_snapshot_count=$(sqlite3 .risingwave/data/sqlite/metadata.db "select count(*) from hummock_time_travel_version;")
if [ "$version_snapshot_count" -le 1 ]; then
echo "test failed: too few version_snapshot_count, actual ${version_snapshot_count}"
exit 1
fi

echo "--- Kill cluster"
risedev ci-kill

2 changes: 1 addition & 1 deletion ci/scripts/s3-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ echo "--- starting risingwave cluster with connector node"
risedev ci-start ci-1cn-1fe

echo "--- Run test"
python3 -m pip install --break-system-packages minio psycopg2-binary opendal
python3 -m pip install --break-system-packages minio psycopg2-binary opendal pandas
if [[ -v format_type ]]; then
python3 e2e_test/s3/"$script" "$format_type"
else
Expand Down
42 changes: 42 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,28 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source check on parquet file"
key: "s3-v2-source-check-parquet-file"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source batch read on AWS (json parser)"
key: "s3-v2-source-batch-read-check-aws-json-parser"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_batch.py -t json"
Expand Down Expand Up @@ -868,6 +890,26 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end time travel test"
key: "e2e-time-travel-tests"
command: "ci/scripts/e2e-time-travel-test.sh -p ci-release"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-time-travel-tests"
|| build.env("CI_STEPS") =~ /(^|,)e2e-time-travel-tests?(,|$$)/
depends_on:
- "build"
- "build-other"
- "docslt"
plugins:
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end sqlserver sink test"
key: "e2e-sqlserver-sink-tests"
command: "ci/scripts/e2e-sqlserver-sink-test.sh -p ci-release"
Expand Down
17 changes: 17 additions & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,23 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end time travel test"
key: "e2e-time-travel-tests"
command: "ci/scripts/e2e-time-travel-test.sh -p ci-dev"
if: build.pull_request.labels includes "ci/run-e2e-time-travel-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-time-travel-tests?(,|$$)/
depends_on:
- "build"
- "build-other"
- "docslt"
plugins:
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
retry: *auto-retry

- label: "end-to-end sqlserver sink test"
if: build.pull_request.labels includes "ci/run-e2e-sqlserver-sink-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-sqlserver-sink-tests?(,|$$)/
command: "ci/scripts/e2e-sqlserver-sink-test.sh -p ci-dev"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

137 changes: 137 additions & 0 deletions e2e_test/s3/fs_parquet_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import os
import sys
import random
import psycopg2
import json
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
from datetime import datetime, timezone
from time import sleep
from minio import Minio
from random import uniform

def gen_data(file_num, item_num_per_file):
assert item_num_per_file % 2 == 0, \
f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}'
return [
[{
'id': file_id * item_num_per_file + item_id,
'name': f'{file_id}_{item_id}_{file_id * item_num_per_file + item_id}',
'sex': item_id % 2,
'mark': (-1) ** (item_id % 2),
'test_int': pa.scalar(1, type=pa.int32()),
'test_real': pa.scalar(4.0, type=pa.float32()),
'test_double_precision': pa.scalar(5.0, type=pa.float64()),
'test_varchar': pa.scalar('7', type=pa.string()),
'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()),
'test_date': pa.scalar(datetime.now().date(), type=pa.date32()),
'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')),
'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]

def do_test(config, file_num, item_num_per_file, prefix):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _table():
return 's3_test_parquet'

# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id bigint primary key,
name TEXT,
sex bigint,
mark bigint,
test_int int,
test_real real,
test_double_precision double precision,
test_varchar varchar,
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
) WITH (
connector = 's3_v2',
match_pattern = '*.parquet',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
) FORMAT PLAIN ENCODE PARQUET;''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from {_table()}')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s")
sleep(10)

stmt = f'select count(*), sum(id) from {_table()}'
print(f'Execute {stmt}')
cur.execute(stmt)
result = cur.fetchone()

print('Got:', result)

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)

print('Test pass')

cur.execute(f'drop table {_table()}')
cur.close()
conn.close()


if __name__ == "__main__":
FILE_NUM = 10
ITEM_NUM_PER_FILE = 2000
data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE)

config = json.loads(os.environ["S3_SOURCE_TEST_CONF"])
client = Minio(
config["S3_ENDPOINT"],
access_key=config["S3_ACCESS_KEY"],
secret_key=config["S3_SECRET_KEY"],
secure=True,
)
run_id = str(random.randint(1000, 9999))
_local = lambda idx: f'data_{idx}.parquet'
_s3 = lambda idx: f"{run_id}_data_{idx}.parquet"

# put s3 files
for idx, file_data in enumerate(data):
table = pa.Table.from_pandas(pd.DataFrame(file_data))
pq.write_table(table, _local(idx))

client.fput_object(
config["S3_BUCKET"],
_s3(idx),
_local(idx)
)

# do test
do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)

# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object(config["S3_BUCKET"], _s3(idx))
Loading

0 comments on commit 5224b1a

Please sign in to comment.