Skip to content

Commit

Permalink
[feature][connector-v2] add xugudb connector (apache#6561)
Browse files Browse the repository at this point in the history
  • Loading branch information
L-Gryps authored Apr 2, 2024
1 parent fe33422 commit 80f392a
Show file tree
Hide file tree
Showing 19 changed files with 2,241 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: run jdbc connectors integration test (part-6)
- name: run jdbc connectors integration test (part-7)
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-7 -am -Pci
Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ there are some reference value for params above.
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |
| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | / | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar |
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar |
| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | / | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar |

## Example

Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ there are some reference value for params above.
| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar |
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar |
| Hive | org.apache.hive.jdbc.HiveDriver | jdbc:hive2://localhost:10000 | https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar |
| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar |

## Example

Expand Down
7 changes: 7 additions & 0 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<kingbase8.version>8.6.0</kingbase8.version>
<hive.jdbc.version>3.1.3</hive.jdbc.version>
<oceanbase.jdbc.version>2.4.3</oceanbase.jdbc.version>
<xugu.jdbc.version>12.2.0</xugu.jdbc.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -188,6 +189,12 @@
<version>${oceanbase.jdbc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.xugudb</groupId>
<artifactId>xugu-jdbc</artifactId>
<version>${xugu.jdbc.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper;

import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

@Slf4j
public class XuguCatalog extends AbstractJdbcCatalog {

protected static List<String> EXCLUDED_SCHEMAS =
Collections.unmodifiableList(Arrays.asList("GUEST", "SYSAUDITOR", "SYSSSO"));

private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT\n"
+ " dc.COLUMN_NAME,\n"
+ " CASE\n"
+ " WHEN dc.TYPE_NAME LIKE 'INTERVAL%%' THEN 'INTERVAL' ELSE REGEXP_SUBSTR(dc.TYPE_NAME, '^[^(]+')\n"
+ " END AS TYPE_NAME,\n"
+ " dc.TYPE_NAME ||\n"
+ " CASE\n"
+ " WHEN dc.TYPE_NAME IN ('VARCHAR', 'CHAR') THEN '(' || dc.COLUMN_LENGTH || ')'\n"
+ " WHEN dc.TYPE_NAME IN ('NUMERIC') AND dc.COLUMN_PRECISION IS NOT NULL AND dc.COLUMN_SCALE IS NOT NULL THEN '(' || dc.COLUMN_PRECISION || ', ' || dc.COLUMN_SCALE || ')'\n"
+ " WHEN dc.TYPE_NAME IN ('NUMERIC') AND dc.COLUMN_PRECISION IS NOT NULL AND dc.COLUMN_SCALE IS NULL THEN '(' || dc.COLUMN_PRECISION || ')'\n"
+ " WHEN dc.TYPE_NAME IN ('TIMESTAMP') THEN '(' || dc.COLUMN_SCALE || ')'\n"
+ " END AS FULL_TYPE_NAME,\n"
+ " dc.COLUMN_LENGTH,\n"
+ " dc.COLUMN_PRECISION,\n"
+ " dc.COLUMN_SCALE,\n"
+ " dc.COLUMN_COMMENT,\n"
+ " dc.DEFAULT_VALUE,\n"
+ " CASE\n"
+ " dc.IS_NULLABLE WHEN TRUE THEN 'NO' ELSE 'YES'\n"
+ " END AS IS_NULLABLE\n"
+ "FROM\n"
+ " (\n"
+ " SELECT\n"
+ " c.col_name AS COLUMN_NAME,\n"
+ " CASE\n"
+ " WHEN c.type_name = 'CHAR' AND c.\"VARYING\" = TRUE THEN 'VARCHAR'\n"
+ " WHEN c.type_name = 'DATETIME' AND c.TIMESTAMP_T = 'i' THEN 'TIMESTAMP' ELSE c.type_name\n"
+ " END AS TYPE_NAME,\n"
+ " DECODE(c.type_name,\n"
+ " 'TINYINT', 1, 'SMALLINT', 2,\n"
+ " 'INTEGER', 4, 'BIGINT', 8,\n"
+ " 'FLOAT', 4, 'DOUBLE', 8,\n"
+ " 'NUMERIC', 17,\n"
+ " 'CHAR', DECODE(c.scale, -1, 60000, c.scale),\n"
+ " 'DATE', 4, 'DATETIME', 8,\n"
+ " 'TIMESTAMP', 8, 'DATETIME WITH TIME ZONE', 8,\n"
+ " 'TIME', 4, 'TIME WITH TIME ZONE', 4,\n"
+ " 'INTERVAL YEAR', 4, 'INTERVAL MONTH', 4,\n"
+ " 'INTERVAL DAY', 4, 'INTERVAL HOUR', 4,\n"
+ " 'INTERVAL MINUTE', 4, 'INTERVAL SECOND', 8,\n"
+ " 'INTERVAL YEAR TO MONTH', 4,\n"
+ " 'INTERVAL DAY TO HOUR', 4,\n"
+ " 'INTERVAL DAY TO MINUTE', 4,\n"
+ " 'INTERVAL DAY TO SECOND', 8,\n"
+ " 'INTERVAL HOUR TO MINUTE', 4,\n"
+ " 'INTERVAL HOUR TO SECOND', 8,\n"
+ " 'INTERVAL MINUTE TO SECOND', 8,\n"
+ " 'CLOB', 2147483648,\n"
+ " 'BLOB', 2147483648, 'BINARY', 2147483648,\n"
+ " 'GUID', 2, 'BOOLEAN', 1,\n"
+ " 'ROWVERSION', 8, 'ROWID', 10, NULL) AS COLUMN_LENGTH,\n"
+ " DECODE(TRUNC(c.scale / 65536), 0, NULL, TRUNC(c.scale / 65536)::INTEGER) AS COLUMN_PRECISION,\n"
+ " DECODE(DECODE(c.type_name, 'CHAR',-1, c.scale),-1, NULL, MOD(c.scale, 65536)) AS COLUMN_SCALE,\n"
+ " c.comments AS COLUMN_COMMENT,\n"
+ " c.DEF_VAL AS DEFAULT_VALUE,\n"
+ " c.NOT_NULl AS IS_NULLABLE\n"
+ " FROM\n"
+ " dba_columns c\n"
+ " LEFT JOIN dba_tables tab ON\n"
+ " c.db_id = tab.db_id\n"
+ " AND c.table_id = tab.table_id\n"
+ " LEFT JOIN dba_schemas sc ON\n"
+ " tab.schema_id = sc.schema_id\n"
+ " AND tab.db_id = sc.db_id\n"
+ " WHERE\n"
+ " sc.schema_name = '%s'\n"
+ " AND tab.table_name = '%s'\n"
+ ") AS dc \n";

public XuguCatalog(
String catalogName,
String username,
String pwd,
JdbcUrlUtil.UrlInfo urlInfo,
String defaultSchema) {
super(catalogName, username, pwd, urlInfo, defaultSchema);
}

@Override
protected String getListDatabaseSql() {
return "SELECT DB_NAME FROM dba_databases";
}

@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
return new XuguCreateTableSqlBuilder(table).build(tablePath);
}

@Override
protected String getDropTableSql(TablePath tablePath) {
return String.format("DROP TABLE %s", tablePath.getSchemaAndTableName("\""));
}

@Override
protected String getCreateDatabaseSql(String databaseName) {
return String.format("CREATE DATABASE \"%s\"", databaseName);
}

@Override
protected String getDropDatabaseSql(String databaseName) {
return String.format("DROP DATABASE \"%s\"", databaseName);
}

@Override
protected String getListTableSql(String databaseName) {
return "SELECT user_name ,table_name FROM all_users au \n"
+ "INNER JOIN all_tables at ON au.user_id=at.user_id AND au.db_id=at.db_id";
}

@Override
protected String getTableName(ResultSet rs) throws SQLException {
if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
return null;
}
return rs.getString(1) + "." + rs.getString(2);
}

@Override
protected String getSelectColumnsSql(TablePath tablePath) {
return String.format(
SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), tablePath.getTableName());
}

@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
String typeName = resultSet.getString("TYPE_NAME");
String fullTypeName = resultSet.getString("FULL_TYPE_NAME");
long columnLength = resultSet.getLong("COLUMN_LENGTH");
Long columnPrecision = resultSet.getObject("COLUMN_PRECISION", Long.class);
Integer columnScale = resultSet.getObject("COLUMN_SCALE", Integer.class);
String columnComment = resultSet.getString("COLUMN_COMMENT");
Object defaultValue = resultSet.getObject("DEFAULT_VALUE");
boolean isNullable = resultSet.getString("IS_NULLABLE").equals("YES");

BasicTypeDefine typeDefine =
BasicTypeDefine.builder()
.name(columnName)
.columnType(fullTypeName)
.dataType(typeName)
.length(columnLength)
.precision(columnPrecision)
.scale(columnScale)
.nullable(isNullable)
.defaultValue(defaultValue)
.comment(columnComment)
.build();
return XuguTypeConverter.INSTANCE.convert(typeDefine);
}

@Override
protected String getUrlFromDatabaseName(String databaseName) {
return defaultUrl;
}

@Override
protected String getOptionTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName())
.contains(tablePath.getSchemaAndTableName());
}
return listTables().contains(tablePath.getSchemaAndTableName());
} catch (DatabaseNotExistException e) {
return false;
}
}

private List<String> listTables() {
List<String> databases = listDatabases();
return listTables(databases.get(0));
}

@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new XuguTypeMapper());
}

@Override
protected String getTruncateTableSql(TablePath tablePath) {
return String.format(
"TRUNCATE TABLE \"%s\".\"%s\"",
tablePath.getSchemaName(), tablePath.getTableName());
}

@Override
protected String getExistDataSql(TablePath tablePath) {
return String.format(
"SELECT * FROM \"%s\".\"%s\" WHERE ROWNUM = 1",
tablePath.getSchemaName(), tablePath.getTableName());
}

@Override
protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath)
throws SQLException {
try {
return getConstraintKeys(
metaData,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName());
} catch (SQLException e) {
log.info("Obtain constraint failure", e);
return new ArrayList<>();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.configuration.util.OptionValidationException;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLParser;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

import com.google.auto.service.AutoService;

import java.util.Optional;

@AutoService(Factory.class)
public class XuguCatalogFactory implements CatalogFactory {

@Override
public String factoryIdentifier() {
return DatabaseIdentifier.XUGU;
}

@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
JdbcUrlUtil.UrlInfo urlInfo = OracleURLParser.parse(urlWithDatabase);
Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
if (!defaultDatabase.isPresent()) {
throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
}
return new XuguCatalog(
catalogName,
options.get(JdbcCatalogOptions.USERNAME),
options.get(JdbcCatalogOptions.PASSWORD),
urlInfo,
options.get(JdbcCatalogOptions.SCHEMA));
}

@Override
public OptionRule optionRule() {
return JdbcCatalogOptions.BASE_RULE.build();
}
}
Loading

0 comments on commit 80f392a

Please sign in to comment.