Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into peng/alter-sink-targe…
Browse files Browse the repository at this point in the history
…t-table
  • Loading branch information
Shanicky Chen committed Jun 18, 2024
2 parents bc9b781 + ade66c7 commit e111bad
Show file tree
Hide file tree
Showing 123 changed files with 1,867 additions and 707 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions ci/scripts/e2e-cassandra-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node

echo "--- starting risingwave cluster"
risedev ci-start ci-sink-test
sleep 1
# Wait cassandra server to start
sleep 40

echo "--- create cassandra table"
curl https://downloads.apache.org/cassandra/4.1.3/apache-cassandra-4.1.3-bin.tar.gz --output apache-cassandra-4.1.3-bin.tar.gz
Expand All @@ -47,15 +48,15 @@ pip3 install --break-system-packages cassandra-driver
cd apache-cassandra-4.1.3/bin
export CQLSH_HOST=cassandra-server
export CQLSH_PORT=9042
./cqlsh -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;
./cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;
CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);"

echo "--- testing sinks"
cd ../../
sqllogictest -p 4566 -d dev './e2e_test/sink/cassandra_sink.slt'
sleep 1
cd apache-cassandra-4.1.3/bin
./cqlsh -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"
./cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"

if cat ./query_result.csv | awk -F "," '{
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01.000+0000" && $9 == "False\r"); }'; then
Expand Down
5 changes: 5 additions & 0 deletions e2e_test/batch/catalog/pg_attribute.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,10 @@ tmp_idx id1 {2,1,3,5}
tmp_idx id2 {2,1,3,5}
tmp_idx id3 {2,1,3,5}

query T
select attoptions from pg_catalog.pg_attribute LIMIT 1;
----
NULL

statement ok
drop table tmp;
3 changes: 2 additions & 1 deletion e2e_test/batch/catalog/pg_indexes.slt.part
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
create table t(a int, b int);
create table t(a int primary key, b int);

statement ok
create index idx1 on t(a);
Expand All @@ -12,6 +12,7 @@ select schemaname, tablename, indexname, tablespace, indexdef from pg_catalog.pg
----
public t idx1 NULL CREATE INDEX idx1 ON t(a)
public t idx2 NULL CREATE INDEX idx2 ON t(b)
public t t_pkey NULL (empty)

statement ok
drop table t;
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ internal data_directory
internal parallel_compact_size_mb
internal sstable_size_mb
internal state_store
internal use_new_object_prefix_strategy
postmaster backup_storage_directory
postmaster backup_storage_url
postmaster barrier_interval_ms
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/iceberg/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def drop_table(args, drop_sqls):
verify_sql = test_case.get("verify_sql")
print(f"verify_sql:{verify_sql}")
verify_data = test_case.get("verify_data")
verify_slt = test_case.get("verify_slt")
cmp_sqls = test_case.get("cmp_sqls")
drop_sqls = test_case["drop_sqls"]
config = configparser.ConfigParser()
Expand All @@ -146,6 +147,8 @@ def drop_table(args, drop_sqls):
verify_result(config, verify_sql, verify_schema, verify_data)
if cmp_sqls is not None and cmp_sqls != "" and len(cmp_sqls) == 2:
compare_sql(config, cmp_sqls)
if verify_slt is not None and verify_slt != "":
execute_slt(config, verify_slt)
if drop_sqls is not None and drop_sqls != "":
drop_table(config, drop_sqls)

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
statement ok
CREATE SOURCE iceberg_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
database.name = 'demo_db',
table.name = 'no_partition_append_only_table',
);

query I
SELECT id from iceberg_source ORDER by id;
----
1
2
3
4
5

statement ok
DROP SOURCE iceberg_source
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
statement ok
CREATE SOURCE iceberg_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
database.name = 'demo_db',
table.name = 'partition_append_only_table',
);

query I
SELECT id from iceberg_source ORDER by id;
----
1
2
3
4
5

statement ok
DROP SOURCE iceberg_source
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
statement ok
CREATE SOURCE iceberg_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
database.name = 'demo_db',
table.name = 'range_partition_append_only_table',
);

query I
SELECT id from iceberg_source ORDER by id;
----
1
2
3
4
5

statement ok
DROP SOURCE iceberg_source
2 changes: 2 additions & 0 deletions e2e_test/iceberg/test_case/no_partition_append_only.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ verify_data = """
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none
"""

verify_slt = 'test_case/iceberg_sink_no_partition_append_only_table_verify.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.no_partition_append_only_table',
'DROP SCHEMA IF EXISTS demo_db'
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/iceberg/test_case/partition_append_only.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ verify_data = """
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none
"""

verify_slt = 'test_case/iceberg_sink_partition_append_only_table_verify.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.partition_append_only_table',
'DROP SCHEMA IF EXISTS demo_db'
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/iceberg/test_case/range_partition_append_only.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ verify_data = """
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none
"""

verify_slt = 'test_case/iceberg_sink_range_partition_append_only_table_verify.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.range_partition_append_only_table',
'DROP SCHEMA IF EXISTS demo_db'
Expand Down
86 changes: 86 additions & 0 deletions integration_tests/sqlserver-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Demo: Sinking to Microsoft SQL Server

In this demo, we want to showcase how RisingWave is able to sink data to Microsoft SQL Server.


1. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a SQL Server instance for sink.

2. Create the SQL Server table:

```sh
docker exec -it sqlserver-server /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P SomeTestOnly@SA -Q "
CREATE DATABASE SinkTest;
GO
USE SinkTest;
GO
CREATE TABLE t_many_data_type (
k1 int, k2 int,
c_boolean bit,
c_int16 smallint,
c_int32 int,
c_int64 bigint,
c_float32 float,
c_float64 float,
c_decimal decimal,
c_date date,
c_time time,
c_timestamp datetime2,
c_nvarchar nvarchar(1024),
c_varbinary varbinary(1024),
PRIMARY KEY (k1,k2)
);
GO"
```

3. Create the RisingWave table and sink:

```sh
docker exec -it postgres-0 psql -h 127.0.0.1 -p 4566 -d dev -U root -c "
CREATE TABLE t_many_data_type_rw (
k1 int, k2 int,
c_int16 smallint,
c_int32 int,
c_int64 bigint,
c_float32 float,
c_float64 double,
c_timestamp timestamp,
c_nvarchar string
) WITH (
connector = 'datagen',
datagen.split.num = '1',
datagen.rows.per.second = '100',
fields.k1.kind = 'random',
fields.k1.min = '0',
fields.k1.max = '10000',
fields.k2.kind = 'random',
fields.k2.min = '0',
fields.k2.max = '10000'
);
CREATE SINK s_many_data_type FROM t_many_data_type_rw WITH (
connector = 'sqlserver',
type = 'upsert',
sqlserver.host = 'localhost',
sqlserver.port = 1433,
sqlserver.user = 'SA',
sqlserver.password = 'SomeTestOnly@SA',
sqlserver.database = 'SinkTest',
sqlserver.table = 't_many_data_type',
primary_key = 'k1,k2',
);
"
```

4. Verify the result in SQL Server, for example:

```sh
docker exec -it sqlserver-server /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P SomeTestOnly@SA -Q "
SELECT count(*) FROM SinkTest.dbo.t_many_data_type;
"
```
46 changes: 46 additions & 0 deletions integration_tests/sqlserver-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
services:
sqlserver-server:
container_name: sqlserver-server
image: mcr.microsoft.com/mssql/server:2022-latest
hostname: sqlserver-server
ports:
- 1433:1433
environment:
ACCEPT_EULA: 'Y'
SA_PASSWORD: 'SomeTestOnly@SA'
risingwave-standalone:
extends:
file: ../../docker/docker-compose.yml
service: risingwave-standalone
postgres-0:
container_name: postgres-0
extends:
file: ../../docker/docker-compose.yml
service: postgres-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
volumes:
risingwave-standalone:
external: false
postgres-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
26 changes: 23 additions & 3 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ message StreamSourceInfo {
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;

// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 16;
// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, SecretRef> secret_ref = 16;
}

message Source {
Expand Down Expand Up @@ -180,7 +180,7 @@ message Sink {
// Whether it should use background ddl or block until backfill finishes.
CreateType create_type = 24;

// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id.
// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, uint32> secret_ref = 25;

Check failure on line 184 in proto/catalog.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "value" on message "SecretRefEntry" changed type from "message" to "uint32". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

repeated plan_common.ColumnCatalog original_target_columns = 26;
Expand Down Expand Up @@ -243,6 +243,7 @@ message Index {
// Only `InputRef` type index is supported Now.
// The index of `InputRef` is the column index of the primary table.
repeated expr.ExprNode index_item = 8;
repeated IndexColumnProperties index_column_properties = 16;
reserved 9; // Deprecated repeated int32 original_columns = 9;

optional uint64 initialized_at_epoch = 10;
Expand All @@ -257,6 +258,14 @@ message Index {
optional string created_at_cluster_version = 15;
}

// https://www.postgresql.org/docs/current/functions-info.html#FUNCTIONS-INFO-INDEX-COLUMN-PROPS
message IndexColumnProperties {
// Whether the column sort in ascending(false) or descending(true) order on a forward scan.
bool is_desc = 1;
// Does the column sort with nulls first on a forward scan?
bool nulls_first = 2;
}

message Function {
uint32 id = 1;
uint32 schema_id = 2;
Expand Down Expand Up @@ -443,3 +452,14 @@ message Secret {
uint32 owner = 5;
uint32 schema_id = 6;
}

message SecretRef {
enum RefAsType {
UNSPECIFIED = 0;
TEXT = 1;
// AS FILE
FILE = 2;
}
uint32 secret_id = 1;
RefAsType ref_as = 2;
}
Loading

0 comments on commit e111bad

Please sign in to comment.