Skip to content

Commit

Permalink
Merge branch 'main' into feat/expose-hidden-dist-cols
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Oct 16, 2023
2 parents 65296cc + e1bdf78 commit 7590159
Show file tree
Hide file tree
Showing 85 changed files with 3,006 additions and 856 deletions.
36 changes: 28 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ aws-types = "0.55"
etcd-client = { package = "madsim-etcd-client", version = "0.4" }
futures-async-stream = "0.2"
hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = [
rdkafka = { package = "madsim-rdkafka", version = "0.3.0", features = [
"cmake-build",
] }
hashbrown = { version = "0.14.0", features = [
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/s3-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ cargo make ci-start ci-1cn-1fe

echo "--- Run test"
python3 -m pip install minio psycopg2-binary
python3 e2e_test/s3/$script.py
python3 e2e_test/s3/$script

echo "--- Kill cluster"
cargo make ci-kill
55 changes: 53 additions & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,24 @@ steps:
retry: *auto-retry

- label: "S3 source check on AWS (json parser)"
command: "ci/scripts/s3-source-test.sh -p ci-release -s json_file"
command: "ci/scripts/s3-source-test.sh -p ci-release -s run.py"
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 20
retry: *auto-retry

- label: "S3 source check on AWS (json parser)"
command: "ci/scripts/s3-source-test.sh -p ci-release -s json_file.py"
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
Expand All @@ -307,7 +324,41 @@ steps:
retry: *auto-retry

- label: "S3 source check on AWS (csv parser)"
command: "ci/scripts/s3-source-test.sh -p ci-release -s run_csv"
command: "ci/scripts/s3-source-test.sh -p ci-release -s run_csv.py"
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source check on AWS (json parser)"
command: "ci/scripts/s3-source-test.sh -p ci-release -s 'fs_source_v2.py json'"
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source check on AWS (csv parser)"
command: "ci/scripts/s3-source-test.sh -p ci-release -s 'fs_source_v2.py csv_without_header'"
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
Expand Down
5 changes: 5 additions & 0 deletions docs/developer-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ http://ecotrust-canada.github.io/markdown-toc/
* [Start the playground with RiseDev](#start-the-playground-with-risedev)
* [Start the playground with cargo](#start-the-playground-with-cargo)
- [Debug playground using vscode](#debug-playground-using-vscode)
- [Use standalone-mode](#use-standalone-mode)
- [Develop the dashboard](#develop-the-dashboard)
- [Observability components](#observability-components)
* [Cluster Control](#cluster-control)
Expand Down Expand Up @@ -206,6 +207,10 @@ psql -h localhost -p 4566 -d dev -U root

To step through risingwave locally with a debugger you can use the `launch.json` and the `tasks.json` provided in `vscode_suggestions`. After adding these files to your local `.vscode` folder you can debug and set breakpoints by launching `Launch 'risingwave p' debug`.

## Use standalone-mode

Please refer to [README](../src/cmd_all/src/README.md) for more details.

## Develop the dashboard

Currently, RisingWave has two versions of dashboards. You can use RiseDev config to select which version to use.
Expand Down
158 changes: 158 additions & 0 deletions e2e_test/s3/fs_source_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import os
import sys
import csv
import json
import random
import psycopg2

from time import sleep
from io import StringIO
from minio import Minio
from functools import partial

def gen_data(file_num, item_num_per_file):
assert item_num_per_file % 2 == 0, \
f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}'
return [
[{
'id': file_id * item_num_per_file + item_id,
'name': f'{file_id}_{item_id}',
'sex': item_id % 2,
'mark': (-1) ** (item_id % 2),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]

def format_json(data):
return [
'\n'.join([json.dumps(item) for item in file])
for file in data
]

def format_csv(data, with_header):
csv_files = []

for file_data in data:
ostream = StringIO()
writer = csv.DictWriter(ostream, fieldnames=file_data[0].keys())
if with_header:
writer.writeheader()
for item_data in file_data:
writer.writerow(item_data)
# For now paser can only handle \n line seperator,
# and tailing white spaces are not allowed.
# TODO: remove replace and rstrip later
csv_files.append(ostream.getvalue().replace('\r', '').rstrip())
return csv_files

def do_test(config, file_num, item_num_per_file, prefix, fmt):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _table():
return f's3_test_{fmt}'

def _encode():
if fmt == 'json':
return 'JSON'
else:
return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})"

# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id int,
name TEXT,
sex int,
mark int,
) WITH (
connector = 's3_v2',
match_pattern = '{prefix}*.{fmt}',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
) FORMAT PLAIN ENCODE {_encode()};''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from {_table()}')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s")
sleep(30)

stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}'
print(f'Execute {stmt}')
cur.execute(stmt)
result = cur.fetchone()

print('Got:', result)

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)
_assert_eq('sum(sex)', result[2], total_rows / 2)
_assert_eq('sum(mark)', result[3], 0)

print('Test pass')

cur.execute(f'drop table {_table()}')
cur.close()
conn.close()


if __name__ == "__main__":
FILE_NUM = 4001
ITEM_NUM_PER_FILE = 2
data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE)

fmt = sys.argv[1]
FORMATTER = {
'json': format_json,
'csv_with_header': partial(format_csv, with_header=True),
'csv_without_header': partial(format_csv, with_header=False),
}
assert fmt in FORMATTER, f"Unsupported format: {fmt}"
formatted_files = FORMATTER[fmt](data)

config = json.loads(os.environ["S3_SOURCE_TEST_CONF"])
client = Minio(
config["S3_ENDPOINT"],
access_key=config["S3_ACCESS_KEY"],
secret_key=config["S3_SECRET_KEY"],
secure=True,
)
run_id = str(random.randint(1000, 9999))
_local = lambda idx: f'data_{idx}.{fmt}'
_s3 = lambda idx: f"{run_id}_data_{idx}.{fmt}"

# put s3 files
for idx, file_str in enumerate(formatted_files):
with open(_local(idx), "w") as f:
f.write(file_str)
os.fsync(f.fileno())

client.fput_object(
config["S3_BUCKET"],
_s3(idx),
_local(idx)
)

# do test
do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt)

# clean up s3 files
for idx, _ in enumerate(formatted_files):
client.remove_object(config["S3_BUCKET"], _s3(idx))
46 changes: 46 additions & 0 deletions e2e_test/streaming/aggregate/jsonb_agg.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t(v1 boolean, v2 int, v3 varchar, v4 jsonb);

statement ok
create materialized view mv_tmp as
select jsonb_agg(v1) as j1 from t;

statement ok
drop materialized view mv_tmp;

statement ok
create materialized view mv1 as
select
jsonb_agg(v1 order by v2) as j1,
jsonb_agg(v2 order by v2) as j2,
jsonb_object_agg(v3, v4) as j3
from t;

statement ok
insert into t values
(null, 2, 'bbb', null),
(false, 1, 'ccc', 'null');

query TTT
select * from mv1;
----
[false, null] [1, 2] {"bbb": null, "ccc": null}

statement ok
insert into t values
(true, 0, 'bbb', '999'),
(true, 8, 'ddd', '{"foo": "bar"}');

query TTT
select * from mv1;
----
[true, false, null, true] [0, 1, 2, 8] {"bbb": 999, "ccc": null, "ddd": {"foo": "bar"}}

statement ok
drop materialized view mv1;

statement ok
drop table t;
Loading

0 comments on commit 7590159

Please sign in to comment.