Skip to content

Commit

Permalink
Merge branch 'main' into patrick/bump-opendal-47.pr
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Jun 18, 2024
2 parents bd94c63 + ee06743 commit 9670400
Show file tree
Hide file tree
Showing 39 changed files with 602 additions and 195 deletions.
3 changes: 3 additions & 0 deletions e2e_test/iceberg/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def drop_table(args, drop_sqls):
verify_sql = test_case.get("verify_sql")
print(f"verify_sql:{verify_sql}")
verify_data = test_case.get("verify_data")
verify_slt = test_case.get("verify_slt")
cmp_sqls = test_case.get("cmp_sqls")
drop_sqls = test_case["drop_sqls"]
config = configparser.ConfigParser()
Expand All @@ -146,6 +147,8 @@ def drop_table(args, drop_sqls):
verify_result(config, verify_sql, verify_schema, verify_data)
if cmp_sqls is not None and cmp_sqls != "" and len(cmp_sqls) == 2:
compare_sql(config, cmp_sqls)
if verify_slt is not None and verify_slt != "":
execute_slt(config, verify_slt)
if drop_sqls is not None and drop_sqls != "":
drop_table(config, drop_sqls)

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
statement ok
CREATE SOURCE iceberg_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
database.name = 'demo_db',
table.name = 'no_partition_append_only_table',
);

query I
SELECT id from iceberg_source ORDER by id;
----
1
2
3
4
5

statement ok
DROP SOURCE iceberg_source
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
statement ok
CREATE SOURCE iceberg_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
database.name = 'demo_db',
table.name = 'partition_append_only_table',
);

query I
SELECT id from iceberg_source ORDER by id;
----
1
2
3
4
5

statement ok
DROP SOURCE iceberg_source
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
statement ok
CREATE SOURCE iceberg_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
database.name = 'demo_db',
table.name = 'range_partition_append_only_table',
);

query I
SELECT id from iceberg_source ORDER by id;
----
1
2
3
4
5

statement ok
DROP SOURCE iceberg_source
2 changes: 2 additions & 0 deletions e2e_test/iceberg/test_case/no_partition_append_only.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ verify_data = """
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none
"""

verify_slt = 'test_case/iceberg_sink_no_partition_append_only_table_verify.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.no_partition_append_only_table',
'DROP SCHEMA IF EXISTS demo_db'
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/iceberg/test_case/partition_append_only.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ verify_data = """
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none
"""

verify_slt = 'test_case/iceberg_sink_partition_append_only_table_verify.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.partition_append_only_table',
'DROP SCHEMA IF EXISTS demo_db'
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/iceberg/test_case/range_partition_append_only.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ verify_data = """
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none
"""

verify_slt = 'test_case/iceberg_sink_range_partition_append_only_table_verify.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.range_partition_append_only_table',
'DROP SCHEMA IF EXISTS demo_db'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,7 @@ public void prepareUpsert(SinkRow row) {
break;
case UPDATE_INSERT:
if (!updateFlag) {
throw Status.FAILED_PRECONDITION
.withDescription(
"an UPDATE_DELETE should precede an UPDATE_INSERT")
.asRuntimeException();
LOG.warn("Missing an UPDATE_DELETE precede an UPDATE_INSERT");
}
jdbcDialect.bindUpsertStatement(upsertStatement, conn, tableSchema, row);
updateFlag = false;
Expand Down Expand Up @@ -364,10 +361,7 @@ public void beginEpoch(long epoch) {}
@Override
public Optional<ConnectorServiceProto.SinkMetadata> barrier(boolean isCheckpoint) {
if (updateFlag) {
throw Status.FAILED_PRECONDITION
.withDescription(
"expected UPDATE_INSERT to complete an UPDATE operation, got `sync`")
.asRuntimeException();
LOG.warn("expect an UPDATE_INSERT to complete an UPDATE operation, got `sync`");
}
return Optional.empty();
}
Expand Down
19 changes: 15 additions & 4 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ message StreamSourceInfo {
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;

// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 16;
// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, SecretRef> secret_ref = 16;
}

message Source {
Expand Down Expand Up @@ -180,8 +180,8 @@ message Sink {
// Whether it should use background ddl or block until backfill finishes.
CreateType create_type = 24;

// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 25;
// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, SecretRef> secret_ref = 25;
}

message Subscription {
Expand Down Expand Up @@ -450,3 +450,14 @@ message Secret {
uint32 owner = 5;
uint32 schema_id = 6;
}

message SecretRef {
enum RefAsType {
UNSPECIFIED = 0;
TEXT = 1;
// AS FILE
FILE = 2;
}
uint32 secret_id = 1;
RefAsType ref_as = 2;
}
Loading

0 comments on commit 9670400

Please sign in to comment.