Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_cut_sst_file
  • Loading branch information
Li0k committed Oct 26, 2023
2 parents 62a9cdf + e74b32d commit e3e51d0
Show file tree
Hide file tree
Showing 122 changed files with 1,265 additions and 456 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.0" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "16dab0e36ab337e58ee8002d828def2d212fa116" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "186fde7663545d1d6a5856ce9fbbc541224eadfb" }
arrow-array = "48"
arrow-cast = "48"
arrow-schema = "48"
Expand Down
49 changes: 32 additions & 17 deletions e2e_test/batch/catalog/issue_10177.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,35 @@ ORDER BY CASE
WHEN typtype = 'd' AND elemtyptype = 'a' THEN 6 -- Domains over arrays last
END;
----
pg_catalog 1043 varchar b f NULL
pg_catalog 1082 date b f NULL
pg_catalog 1083 time b f NULL
pg_catalog 1114 timestamp b f NULL
pg_catalog 1184 timestamptz b f NULL
pg_catalog 1186 interval b f NULL
pg_catalog 1301 rw_int256 b f NULL
pg_catalog 16 bool b f NULL
pg_catalog 17 bytea b f NULL
pg_catalog 1700 numeric b f NULL
pg_catalog 20 int8 b f NULL
pg_catalog 21 int2 b f NULL
pg_catalog 23 int4 b f NULL
pg_catalog 25 text b f NULL
pg_catalog 3802 jsonb b f NULL
pg_catalog 700 float4 b f NULL
pg_catalog 701 float8 b f NULL
pg_catalog 1000 _bool b f NULL
pg_catalog 1001 _bytea b f NULL
pg_catalog 1005 _int2 b f NULL
pg_catalog 1007 _int4 b f NULL
pg_catalog 1015 _varchar b f NULL
pg_catalog 1016 _int8 b f NULL
pg_catalog 1021 _float4 b f NULL
pg_catalog 1022 _float8 b f NULL
pg_catalog 1043 varchar b f NULL
pg_catalog 1082 date b f NULL
pg_catalog 1083 time b f NULL
pg_catalog 1114 timestamp b f NULL
pg_catalog 1115 _timestamp b f NULL
pg_catalog 1182 _date b f NULL
pg_catalog 1183 _time b f NULL
pg_catalog 1184 timestamptz b f NULL
pg_catalog 1185 _timestamptz b f NULL
pg_catalog 1186 interval b f NULL
pg_catalog 1187 _interval b f NULL
pg_catalog 1231 _numeric b f NULL
pg_catalog 1301 rw_int256 b f NULL
pg_catalog 16 bool b f NULL
pg_catalog 17 bytea b f NULL
pg_catalog 1700 numeric b f NULL
pg_catalog 20 int8 b f NULL
pg_catalog 21 int2 b f NULL
pg_catalog 23 int4 b f NULL
pg_catalog 25 text b f NULL
pg_catalog 3802 jsonb b f NULL
pg_catalog 3807 _jsonb b f NULL
pg_catalog 700 float4 b f NULL
pg_catalog 701 float8 b f NULL
49 changes: 32 additions & 17 deletions e2e_test/batch/catalog/pg_type.slt.part
Original file line number Diff line number Diff line change
@@ -1,20 +1,35 @@
query ITITT
SELECT oid, typname, typelem, typnotnull, typtype, typinput FROM pg_catalog.pg_type order by oid;
----
16 bool 0 f b boolin
17 bytea 0 f b byteain
20 int8 0 f b int8in
21 int2 0 f b int2in
23 int4 0 f b int4in
25 text 0 f b textin
700 float4 0 f b float4in
701 float8 0 f b float8in
1043 varchar 0 f b varcharin
1082 date 0 f b date_in
1083 time 0 f b time_in
1114 timestamp 0 f b timestamp_in
1184 timestamptz 0 f b timestamptz_in
1186 interval 0 f b interval_in
1301 rw_int256 0 f b rw_int256_in
1700 numeric 0 f b numeric_in
3802 jsonb 0 f b jsonb_in
16 bool 0 f b boolin
17 bytea 0 f b byteain
20 int8 0 f b int8in
21 int2 0 f b int2in
23 int4 0 f b int4in
25 text 0 f b textin
700 float4 0 f b float4in
701 float8 0 f b float8in
1000 _bool 16 f b array_in
1001 _bytea 17 f b array_in
1005 _int2 21 f b array_in
1007 _int4 23 f b array_in
1015 _varchar 1043 f b array_in
1016 _int8 20 f b array_in
1021 _float4 700 f b array_in
1022 _float8 701 f b array_in
1043 varchar 0 f b varcharin
1082 date 0 f b date_in
1083 time 0 f b time_in
1114 timestamp 0 f b timestamp_in
1115 _timestamp 1114 f b array_in
1182 _date 1082 f b array_in
1183 _time 1083 f b array_in
1184 timestamptz 0 f b timestamptz_in
1185 _timestamptz 1184 f b array_in
1186 interval 0 f b interval_in
1187 _interval 1186 f b array_in
1231 _numeric 1700 f b array_in
1301 rw_int256 0 f b rw_int256_in
1700 numeric 0 f b numeric_in
3802 jsonb 0 f b jsonb_in
3807 _jsonb 3802 f b array_in
8 changes: 4 additions & 4 deletions e2e_test/sink/remote/jdbc.check.pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ select * from t_remote_1 order by id;
query III
select * from biz.t_types order by id;
----
1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"Value 1","Value 2"} {12.345,56.789}
2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} {"Value 3","Value 4"} {43.21,65.432}
3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"How're you?","\"hello\\ \\world\""} {12.345,56.789}
1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"Value 1","Value 2"} {12.345,56.789} {1,2,3} {1,2,3} {1,2,3} {12.3,56.7}
2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} {"Value 3","Value 4"} {43.21,65.432} {4,5,6} {4,5,6} {4,5,6} {43.2,65.4}
3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"How're you?","\"hello\\ \\world\""} {12.345,56.789} {1,2,3} {1,2,3} {1,2,3} {43.2,65.4}


query IT
select * from t_append_only order by v1, v2;
select * from t_append_only order by v1,v2;
----
1 aaa
1 bbb
Expand Down
14 changes: 9 additions & 5 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ CREATE TABLE rw_typed_data (
interval_column INTERVAL,
jsonb_column JSONB,
array_column VARCHAR[],
array_column2 FLOAT[]
array_column2 FLOAT[],
array_column3 SMALLINT[],
array_column4 INTEGER[],
array_column5 BIGINT[],
array_column6 DOUBLE PRECISION[],
);

statement ok
Expand Down Expand Up @@ -196,10 +200,10 @@ INSERT INTO t_remote_1 VALUES
(6, 'Varchar value 6', 'Text value 6', 789, 123, 456, 67.89, 34.56, 78.91, FALSE, '2023-05-27', '23:45:01', '2023-05-27 23:45:01', '2023-05-27 23:45:01', '2 years 3 months 4 days 5 hours 6 minutes 7 seconds', '{"key": "value6"}', E'\\xDEADBABE');

statement ok
INSERT INTO rw_typed_data (id, varchar_column, text_column, integer_column, smallint_column, bigint_column, decimal_column, real_column, double_column, boolean_column, date_column, time_column, timestamp_column, interval_column, jsonb_column, array_column, array_column2) VALUES
(1, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['Value 1', 'Value 2'], '{12.345,56.789}'),
(2, 'Varcharvalue2', 'Textvalue2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '2 days', '{"key": "value2"}', ARRAY['Value 3', 'Value 4'], '{43.21,65.432}'),
(3, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['How''re you?', '"hello\ \world"'], ARRAY[12.345,56.789]);
INSERT INTO rw_typed_data (id, varchar_column, text_column, integer_column, smallint_column, bigint_column, decimal_column, real_column, double_column, boolean_column, date_column, time_column, timestamp_column, interval_column, jsonb_column, array_column, array_column2, array_column3, array_column4, array_column5, array_column6) VALUES
(1, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['Value 1', 'Value 2'], '{12.345,56.789}', '{1, 2, 3}', '{1, 2, 3}', '{1, 2, 3}', '{12.3,56.7}'),
(2, 'Varcharvalue2', 'Textvalue2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '2 days', '{"key": "value2"}', ARRAY['Value 3', 'Value 4'], '{43.21,65.432}', '{4, 5, 6}', '{4, 5, 6}', '{4, 5, 6}', '{43.2,65.4}'),
(3, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['How''re you?', '"hello\ \world"'], ARRAY[12.345,56.789], ARRAY[1, 2, 3], ARRAY[1, 2, 3], ARRAY[1, 2, 3], ARRAY[43.2,65.4]);

statement ok
FLUSH;
Expand Down
6 changes: 5 additions & 1 deletion e2e_test/sink/remote/mysql_create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,9 @@ CREATE TABLE t_types (
interval_column VARCHAR(100),
jsonb_column JSON,
array_column LONGTEXT,
array_column2 LONGTEXT
array_column2 LONGTEXT,
array_column3 LONGTEXT,
array_column4 LONGTEXT,
array_column5 LONGTEXT,
array_column6 LONGTEXT
);
6 changes: 3 additions & 3 deletions e2e_test/sink/remote/mysql_expected_result_2.tsv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} Value 1,Value 2 12.345,56.789
2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 0 2023-05-23 23:45:01 2023-05-23 23:45:01 P0Y0M2DT0H0M0S {"key": "value2"} Value 3,Value 4 43.21,65.432
3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} How're you?,"hello\ \world" 12.345,56.789
1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} Value 1,Value 2 12.345,56.789 1,2,3 1,2,3 1,2,3 12.3,56.7
2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 0 2023-05-23 23:45:01 2023-05-23 23:45:01 P0Y0M2DT0H0M0S {"key": "value2"} Value 3,Value 4 43.21,65.432 4,5,6 4,5,6 4,5,6 43.2,65.4
3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} How're you?,"hello\ \world" 12.345,56.789 1,2,3 1,2,3 1,2,3 43.2,65.4
6 changes: 5 additions & 1 deletion e2e_test/sink/remote/pg_create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ CREATE TABLE biz.t_types (
interval_column INTERVAL,
jsonb_column JSONB,
array_column VARCHAR[],
array_column2 DECIMAL[]
array_column2 FLOAT[],
array_column3 SMALLINT[],
array_column4 INTEGER[],
array_column5 BIGINT[],
array_column6 DOUBLE PRECISION[]
);

CREATE TABLE biz.t2 (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.proto.Data.DataType.TypeName;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand All @@ -33,6 +35,26 @@ public PostgresDialect(int[] columnSqlTypes) {
this.columnSqlTypes = columnSqlTypes;
}

private static final HashMap<TypeName, String> RW_TYPE_TO_JDBC_TYPE_NAME;

static {
RW_TYPE_TO_JDBC_TYPE_NAME = new HashMap<TypeName, String>();
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.INT16, "int2");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.INT32, "int4");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.INT64, "int8");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.FLOAT, "float4");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.DOUBLE, "float8");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.BOOLEAN, "bool");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.VARCHAR, "varchar");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.DECIMAL, "numeric");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.TIME, "time");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.TIMESTAMP, "timestamp");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.INTERVAL, "varchar");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.DATE, "date");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.TIMESTAMPTZ, "timestamptz");
RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.JSONB, "varchar");
}

@Override
public SchemaTableName createSchemaTableName(String schemaName, String tableName) {
if (schemaName == null || schemaName.isBlank()) {
Expand Down Expand Up @@ -115,9 +137,11 @@ public void bindInsertIntoStatement(
Object[] objArray = (Object[]) val;
assert (column.getDataType().getFieldTypeCount() == 1);
var fieldType = column.getDataType().getFieldType(0);
stmt.setArray(
placeholderIdx++,
conn.createArrayOf(fieldType.getTypeName().name(), objArray));
var typeName = RW_TYPE_TO_JDBC_TYPE_NAME.get(fieldType.getTypeName());
if (typeName == null) {
typeName = fieldType.getTypeName().name();
}
stmt.setArray(placeholderIdx++, conn.createArrayOf(typeName, objArray));
break;
case VARCHAR:
// since VARCHAR column may sink to a UUID column, we get the target type
Expand Down
10 changes: 7 additions & 3 deletions src/connector/src/aws_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ use url::Url;

use crate::aws_auth::AwsAuthProps;

pub const REGION: &str = "region";
pub const ACCESS_KEY: &str = "access_key";
pub const SECRET_ACCESS: &str = "secret_access";

pub const AWS_DEFAULT_CONFIG: [&str; 7] = [
"region",
REGION,
"arn",
"profile",
"access_key",
"secret_access",
ACCESS_KEY,
SECRET_ACCESS,
"session_token",
"endpoint_url",
];
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub enum ConnectorError {
#[error("MySQL error: {0}")]
MySql(#[from] mysql_async::Error),

#[error("Pulsar error: {0}")]
Pulsar(anyhow::Error),

#[error(transparent)]
Internal(#[from] anyhow::Error),
}
Expand Down
11 changes: 10 additions & 1 deletion src/connector/src/source/pulsar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ pub mod topic;

pub use enumerator::*;
use serde::Deserialize;
use serde_with::serde_as;
pub use split::*;

use self::source::reader::PulsarSplitReader;
use crate::common::PulsarCommon;
use crate::source::pulsar::source::reader::PulsarSplitReader;
use crate::source::SourceProperties;

pub const PULSAR_CONNECTOR: &str = "pulsar";
Expand All @@ -36,6 +37,7 @@ impl SourceProperties for PulsarProperties {
}

#[derive(Clone, Debug, Deserialize)]
#[serde_as]
pub struct PulsarProperties {
#[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")]
pub scan_startup_mode: Option<String>,
Expand All @@ -45,4 +47,11 @@ pub struct PulsarProperties {

#[serde(flatten)]
pub common: PulsarCommon,

#[serde(rename = "iceberg.enabled")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub iceberg_loader_enabled: bool,

#[serde(rename = "iceberg.bucket", default)]
pub iceberg_bucket: Option<String>,
}
Loading

0 comments on commit e3e51d0

Please sign in to comment.