diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java index 83f8131315f4b..3b354bca6143c 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java @@ -20,14 +20,13 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.spi.JdbcQueryPropertiesExtension; import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender; import org.apache.shardingsphere.infra.database.core.connector.url.StandardJdbcUrlParser; -import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory; +import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory; import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; -import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; @@ -54,10 +53,16 @@ public final class StandardPipelineDataSourceConfiguration implements PipelineDa private final DataSourcePoolProperties dataSourcePoolProps; @Getter - private final YamlJdbcConfiguration jdbcConfig; + private final DatabaseType databaseType; + + @Getter + private final String url; @Getter - private final DatabaseType databaseType; + private final String username; + + @Getter + private final String password; @SuppressWarnings("unchecked") public StandardPipelineDataSourceConfiguration(final String param) { @@ -78,10 +83,12 @@ private StandardPipelineDataSourceConfiguration(final String param, final Map wrapParameter(final String jdbcUrl, final Str return result; } - private void appendJdbcQueryProperties(final DatabaseType databaseType, final Map yamlConfig) { + private void appendJdbcQueryProperties(final DatabaseType databaseType, final Map poolProps) { Optional extension = DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class, databaseType); if (!extension.isPresent()) { return; } - String jdbcUrl = jdbcConfig.getUrl(); + String jdbcUrl = String.valueOf(poolProps.get("url")); Properties queryProps = new StandardJdbcUrlParser().parseQueryProperties(jdbcUrl.contains("?") ? jdbcUrl.substring(jdbcUrl.indexOf("?") + 1) : ""); extension.get().extendQueryProperties(queryProps); String url = new JdbcUrlAppender().appendQueryProperties(jdbcUrl, queryProps); - jdbcConfig.setUrl(url); - yamlConfig.put("url", url); + poolProps.put("url", url); } @Override diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/yaml/YamlJdbcConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/yaml/YamlJdbcConfiguration.java deleted file mode 100644 index 8ea7848bbdebf..0000000000000 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/yaml/YamlJdbcConfiguration.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.api.yaml; - -import lombok.Getter; -import lombok.Setter; -import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; - -/** - * JDBC configuration for YAML. - */ -@Getter -@Setter -public final class YamlJdbcConfiguration implements YamlConfiguration { - - private String url; - - private String username; - - private String password; -} diff --git a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java index 7d185055e3b34..b5bd9f6aa90d6 100644 --- a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java +++ b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.data.pipeline.api.type; -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; import org.junit.jupiter.api.Test; @@ -78,14 +77,10 @@ private void assertGetConfig(final StandardPipelineDataSourceConfiguration actua assertThat(actual.getType(), is(StandardPipelineDataSourceConfiguration.TYPE)); DataSourcePoolProperties props = (DataSourcePoolProperties) actual.getDataSourceConfiguration(); assertThat(props.getPoolClassName(), is("com.zaxxer.hikari.HikariDataSource")); - assertGetJdbcConfig(actual.getJdbcConfig()); - assertDataSourcePoolProperties(props); - } - - private void assertGetJdbcConfig(final YamlJdbcConfiguration actual) { assertThat(actual.getUrl(), is(JDBC_URL)); assertThat(actual.getUsername(), is(USERNAME)); assertThat(actual.getPassword(), is(PASSWORD)); + assertDataSourcePoolProperties(props); } private void assertDataSourcePoolProperties(final DataSourcePoolProperties props) { diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/config/yaml/YamlJdbcConfigurationTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/config/yaml/YamlJdbcConfigurationTest.java deleted file mode 100644 index 57baee6a8ef7c..0000000000000 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/config/yaml/YamlJdbcConfigurationTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.common.datasource.config.yaml; - -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; -import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -class YamlJdbcConfigurationTest { - - private static final String JDBC_URL = "jdbc:mysql://127.0.0.1:3306/demo_ds_0?serverTimezone=UTC&useSSL=false"; - - private static final String USERNAME = "root"; - - private static final String PASSWORD = "password"; - - @Test - void assertConstructionWithUrl() { - assertYamlJdbcConfiguration(YamlEngine.unmarshal(YamlEngine.marshal(getDataSourcePoolPropertiesWithUrl()), YamlJdbcConfiguration.class)); - } - - private Map getDataSourcePoolPropertiesWithUrl() { - Map result = new HashMap<>(3, 1F); - result.put("url", JDBC_URL); - result.put("username", USERNAME); - result.put("password", PASSWORD); - return result; - } - - private void assertYamlJdbcConfiguration(final YamlJdbcConfiguration actual) { - assertThat(actual.getUrl(), is(JDBC_URL)); - assertThat(actual.getUsername(), is(USERNAME)); - assertThat(actual.getPassword(), is(PASSWORD)); - } -} diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index f3d4fd2d37f42..d64fd12236177 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -20,8 +20,13 @@ import com.google.common.base.Preconditions; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; +import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType; +import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader; +import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; +import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; +import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils; import org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; @@ -30,12 +35,6 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; -import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader; -import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; -import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; -import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType; -import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent; @@ -86,11 +85,12 @@ public MySQLIncrementalDumper(final IncrementalDumperContext dumperContext, fina this.binlogPosition = (BinlogPosition) binlogPosition; this.channel = channel; this.metaDataLoader = metaDataLoader; - YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).getJdbcConfig(); + StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig(); ConnectionPropertiesParser parser = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, TypedSPILoader.getService(DatabaseType.class, "MySQL")); - ConnectionProperties connectionProps = parser.parse(jdbcConfig.getUrl(), null, null); - ConnectInfo connectInfo = new ConnectInfo(generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()); - log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={}, port={}", jdbcConfig.getUrl(), connectInfo.getServerId(), connectInfo.getHost(), connectInfo.getPort()); + ConnectionProperties connectionProps = parser.parse(pipelineDataSourceConfig.getUrl(), null, null); + ConnectInfo connectInfo = new ConnectInfo( + generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), pipelineDataSourceConfig.getUsername(), pipelineDataSourceConfig.getPassword()); + log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={}, port={}", pipelineDataSourceConfig.getUrl(), connectInfo.getServerId(), connectInfo.getHost(), connectInfo.getPort()); client = new MySQLClient(connectInfo, dumperContext.isDecodeWithTX()); catalog = connectionProps.getCatalog(); } diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java index ffcb2a8a33389..082134dc0f29a 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java @@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber; import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrl; import org.apache.shardingsphere.infra.database.core.connector.url.StandardJdbcUrlParser; @@ -52,18 +51,17 @@ public final class OpenGaussLogicalReplication { */ public Connection createConnection(final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException { Properties props = new Properties(); - YamlJdbcConfiguration jdbcConfig = pipelineDataSourceConfig.getJdbcConfig(); - PGProperty.USER.set(props, jdbcConfig.getUsername()); - PGProperty.PASSWORD.set(props, jdbcConfig.getPassword()); + PGProperty.USER.set(props, pipelineDataSourceConfig.getUsername()); + PGProperty.PASSWORD.set(props, pipelineDataSourceConfig.getPassword()); PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4"); PGProperty.REPLICATION.set(props, "database"); PGProperty.PREFER_QUERY_MODE.set(props, "simple"); try { - return DriverManager.getConnection(jdbcConfig.getUrl(), props); + return DriverManager.getConnection(pipelineDataSourceConfig.getUrl(), props); } catch (final SQLException ex) { if (failedBecauseOfNonHAPort(ex)) { log.info("Failed to connect to openGauss caused by: {} - {}. Try connecting to HA port.", ex.getSQLState(), ex.getMessage()); - return tryConnectingToHAPort(jdbcConfig.getUrl(), props); + return tryConnectingToHAPort(pipelineDataSourceConfig.getUrl(), props); } throw ex; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java index 4efa1384423c8..3b8e8142a8172 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber; import org.postgresql.PGConnection; import org.postgresql.PGProperty; @@ -44,13 +43,12 @@ public final class PostgreSQLLogicalReplication { */ public Connection createConnection(final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException { Properties props = new Properties(); - YamlJdbcConfiguration jdbcConfig = pipelineDataSourceConfig.getJdbcConfig(); - PGProperty.USER.set(props, jdbcConfig.getUsername()); - PGProperty.PASSWORD.set(props, jdbcConfig.getPassword()); + PGProperty.USER.set(props, pipelineDataSourceConfig.getUsername()); + PGProperty.PASSWORD.set(props, pipelineDataSourceConfig.getPassword()); PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6"); PGProperty.REPLICATION.set(props, "database"); PGProperty.PREFER_QUERY_MODE.set(props, "simple"); - return DriverManager.getConnection(jdbcConfig.getUrl(), props); + return DriverManager.getConnection(pipelineDataSourceConfig.getUrl(), props); } /**