Skip to content

Commit

Permalink
code style
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed Apr 8, 2024
1 parent e13930d commit 91c93b9
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,11 @@ public class CommonConfig implements Serializable {
private String username;
private String password;
private String database;
private String table;

public CommonConfig(ReadonlyConfig config) {
this.nodeUrls = config.get(NODE_URLS);
this.username = config.get(USERNAME);
this.password = config.get(PASSWORD);
this.database = config.get(DATABASE);
this.table = config.get(TABLE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ public OptionRule optionRule() {
SourceConfig.NODE_URLS,
SourceConfig.USERNAME,
SourceConfig.PASSWORD,
SourceConfig.DATABASE,
SourceConfig.TABLE,
TableSchemaOptions.SCHEMA)
SourceConfig.DATABASE)
.optional(
TableSchemaOptions.SCHEMA,
SourceConfig.MAX_RETRIES,
SourceConfig.QUERY_TABLET_SIZE,
SourceConfig.SCAN_FILTER,
Expand All @@ -59,6 +58,7 @@ public OptionRule optionRule() {
SourceConfig.SCAN_KEEP_ALIVE_MIN,
SourceConfig.SCAN_BATCH_ROWS,
SourceConfig.SCAN_CONNECT_TIMEOUT)
.exclusive(SourceConfig.TABLE, SourceConfig.TABLE_LIST)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class StarRocksIT extends TestSuiteBase implements TestResource {
private static final String DATABASE = "test";
private static final String URL = "jdbc:mysql://%s:" + SR_PORT;
private static final String SOURCE_TABLE = "e2e_table_source";
private static final String SOURCE_TABLE_2 = "e2e_table_source_2";
private static final String SINK_TABLE = "e2e_table_sink";
private static final String SR_DRIVER_JAR =
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
Expand Down Expand Up @@ -112,6 +113,35 @@ public class StarRocksIT extends TestSuiteBase implements TestResource {
+ "\"storage_format\" = \"DEFAULT\""
+ ")";

private static final String DDL_SOURCE_2 =
"create table "
+ DATABASE
+ "."
+ SOURCE_TABLE_2
+ " (\n"
+ " BIGINT_COL BIGINT,\n"
+ " LARGEINT_COL LARGEINT,\n"
+ " SMALLINT_COL SMALLINT,\n"
+ " TINYINT_COL TINYINT,\n"
+ " BOOLEAN_COL BOOLEAN,\n"
+ " DECIMAL_COL Decimal(12, 1),\n"
+ " DOUBLE_COL DOUBLE,\n"
+ " FLOAT_COL FLOAT,\n"
+ " INT_COL INT,\n"
+ " CHAR_COL CHAR,\n"
+ " VARCHAR_11_COL VARCHAR(11),\n"
+ " STRING_COL STRING,\n"
+ " DATETIME_COL DATETIME,\n"
+ " DATE_COL DATE\n"
+ ")ENGINE=OLAP\n"
+ "DUPLICATE KEY(`BIGINT_COL`)\n"
+ "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\",\n"
+ "\"in_memory\" = \"false\","
+ "\"storage_format\" = \"DEFAULT\""
+ ")";

private static final String INIT_DATA_SQL =
"insert into "
+ DATABASE
Expand All @@ -136,6 +166,30 @@ public class StarRocksIT extends TestSuiteBase implements TestResource {
+ "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n"
+ ")";

private static final String INIT_DATA_SQL_2 =
"insert into "
+ DATABASE
+ "."
+ SOURCE_TABLE_2
+ " (\n"
+ " BIGINT_COL,\n"
+ " LARGEINT_COL,\n"
+ " SMALLINT_COL,\n"
+ " TINYINT_COL,\n"
+ " BOOLEAN_COL,\n"
+ " DECIMAL_COL,\n"
+ " DOUBLE_COL,\n"
+ " FLOAT_COL,\n"
+ " INT_COL,\n"
+ " CHAR_COL,\n"
+ " VARCHAR_11_COL,\n"
+ " STRING_COL,\n"
+ " DATETIME_COL,\n"
+ " DATE_COL\n"
+ ")values(\n"
+ "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n"
+ ")";

private Connection jdbcConnection;
private GenericContainer<?> starRocksServer;
private static final List<SeaTunnelRow> TEST_DATASET = generateTestDataSet();
Expand Down Expand Up @@ -170,7 +224,7 @@ public void startUp() throws Exception {
.atMost(360, TimeUnit.SECONDS)
.untilAsserted(this::initializeJdbcConnection);
initializeJdbcTable();
batchInsertData();
batchInsertData(INIT_DATA_SQL);
}

private static List<SeaTunnelRow> generateTestDataSet() {
Expand Down Expand Up @@ -275,19 +329,20 @@ private void initializeJdbcTable() {
statement.execute("create database test");
// create source table
statement.execute(DDL_SOURCE);
statement.execute(DDL_SOURCE_2);
// create sink table
// statement.execute(DDL_SINK);
} catch (SQLException e) {
throw new RuntimeException("Initializing table failed!", e);
}
}

private void batchInsertData() {
private void batchInsertData(String initDataSql) {
List<SeaTunnelRow> rows = TEST_DATASET;
try {
jdbcConnection.setAutoCommit(false);
try (PreparedStatement preparedStatement =
jdbcConnection.prepareStatement(INIT_DATA_SQL)) {
jdbcConnection.prepareStatement(initDataSql)) {
for (int i = 0; i < rows.size(); i++) {
for (int index = 0; index < rows.get(i).getFields().length; index++) {
preparedStatement.setObject(index + 1, rows.get(i).getFields()[index]);
Expand Down Expand Up @@ -388,14 +443,9 @@ public void testCatalog() {
@TestTemplate
public void testKuduMultipleRead(TestContainer container)
throws IOException, InterruptedException {
// initializeKuduTable("kudu_source_table_1");
// initializeKuduTable("kudu_source_table_2");
// batchInsertData("kudu_source_table_1");
// batchInsertData("kudu_source_table_2");
// Container.ExecResult execResult =
// container.executeJob("/kudu_to_assert_with_multipletable.conf");
// Assertions.assertEquals(0, execResult.getExitCode());
// kuduClient.deleteTable("kudu_source_table_1");
// kuduClient.deleteTable("kudu_source_table_2");
batchInsertData(INIT_DATA_SQL_2);
Container.ExecResult execResult =
container.executeJob("/starrocks_to_assert_with_multipletable");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
StarRocks {
nodeUrls = ["starrocks_e2e:8030"]
username = root
password = ""
database = "test"
table_list = [
{
table = "e2e_table_source"
schema {
fields {
BIGINT_COL = BIGINT
LARGEINT_COL = STRING
SMALLINT_COL = SMALLINT
TINYINT_COL = TINYINT
BOOLEAN_COL = BOOLEAN
DECIMAL_COL = "DECIMAL(20, 1)"
DOUBLE_COL = DOUBLE
FLOAT_COL = FLOAT
INT_COL = INT
CHAR_COL = STRING
VARCHAR_11_COL = STRING
STRING_COL = STRING
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
}
},
scan_filter = ""
},
{
table = "e2e_table_source_2"
schema {
fields {
BIGINT_COL = BIGINT
LARGEINT_COL = STRING
SMALLINT_COL = SMALLINT
TINYINT_COL = TINYINT
BOOLEAN_COL = BOOLEAN
DECIMAL_COL = "DECIMAL(20, 1)"
DOUBLE_COL = DOUBLE
FLOAT_COL = FLOAT
INT_COL = INT
CHAR_COL = STRING
VARCHAR_11_COL = STRING
STRING_COL = STRING
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
}
},
scan_filter = ""
}
]
max_retries = 3
scan.params.scanner_thread_pool_thread_num = "3"
}
}

transform {
}

sink {
Assert {
rules {
table-names = ["e2e_table_source", "e2e_table_source_2"]
}
}
}

0 comments on commit 91c93b9

Please sign in to comment.