From 5b1b9802e8270311e888ceb7250cf17379af752a Mon Sep 17 00:00:00 2001
From: zhangliang <zhangliang@apache.org>
Date: Tue, 7 Nov 2023 16:57:23 +0800
Subject: [PATCH] Remove YamlJdbcConfiguration

---
 ...andardPipelineDataSourceConfiguration.java | 28 ++++++----
 .../api/yaml/YamlJdbcConfiguration.java       | 36 ------------
 ...rdPipelineDataSourceConfigurationTest.java |  7 +--
 .../yaml/YamlJdbcConfigurationTest.java       | 56 -------------------
 .../mysql/ingest/MySQLIncrementalDumper.java  | 22 ++++----
 .../wal/OpenGaussLogicalReplication.java      | 10 ++--
 .../wal/PostgreSQLLogicalReplication.java     |  8 +--
 7 files changed, 36 insertions(+), 131 deletions(-)
 delete mode 100644 kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/yaml/YamlJdbcConfiguration.java
 delete mode 100644 kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/config/yaml/YamlJdbcConfigurationTest.java

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java
index 83f8131315f4b..3b354bca6143c 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java
@@ -20,14 +20,13 @@
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration;
 import org.apache.shardingsphere.data.pipeline.spi.JdbcQueryPropertiesExtension;
 import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.core.connector.url.StandardJdbcUrlParser;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
+import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
 import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
-import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 
@@ -54,10 +53,16 @@ public final class StandardPipelineDataSourceConfiguration implements PipelineDa
     private final DataSourcePoolProperties dataSourcePoolProps;
     
     @Getter
-    private final YamlJdbcConfiguration jdbcConfig;
+    private final DatabaseType databaseType;
+    
+    @Getter
+    private final String url;
     
     @Getter
-    private final DatabaseType databaseType;
+    private final String username;
+    
+    @Getter
+    private final String password;
     
     @SuppressWarnings("unchecked")
     public StandardPipelineDataSourceConfiguration(final String param) {
@@ -78,10 +83,12 @@ private StandardPipelineDataSourceConfiguration(final String param, final Map<St
             poolProps.remove("jdbcUrl");
         }
         poolProps.remove(DATA_SOURCE_CLASS_NAME);
-        jdbcConfig = YamlEngine.unmarshal(YamlEngine.marshal(poolProps), YamlJdbcConfiguration.class, true);
-        databaseType = DatabaseTypeFactory.get(jdbcConfig.getUrl());
+        databaseType = DatabaseTypeFactory.get(String.valueOf(poolProps.get("url")));
         poolProps.put(DATA_SOURCE_CLASS_NAME, "com.zaxxer.hikari.HikariDataSource");
         appendJdbcQueryProperties(databaseType, poolProps);
+        username = String.valueOf(poolProps.get("username"));
+        password = String.valueOf(poolProps.get("password"));
+        url = String.valueOf(poolProps.get("url"));
         dataSourcePoolProps = new YamlDataSourceConfigurationSwapper().swapToDataSourcePoolProperties(poolProps);
     }
     
@@ -98,17 +105,16 @@ private static Map<String, Object> wrapParameter(final String jdbcUrl, final Str
         return result;
     }
     
-    private void appendJdbcQueryProperties(final DatabaseType databaseType, final Map<String, Object> yamlConfig) {
+    private void appendJdbcQueryProperties(final DatabaseType databaseType, final Map<String, Object> poolProps) {
         Optional<JdbcQueryPropertiesExtension> extension = DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class, databaseType);
         if (!extension.isPresent()) {
             return;
         }
-        String jdbcUrl = jdbcConfig.getUrl();
+        String jdbcUrl = String.valueOf(poolProps.get("url"));
         Properties queryProps = new StandardJdbcUrlParser().parseQueryProperties(jdbcUrl.contains("?") ? jdbcUrl.substring(jdbcUrl.indexOf("?") + 1) : "");
         extension.get().extendQueryProperties(queryProps);
         String url = new JdbcUrlAppender().appendQueryProperties(jdbcUrl, queryProps);
-        jdbcConfig.setUrl(url);
-        yamlConfig.put("url", url);
+        poolProps.put("url", url);
     }
     
     @Override
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/yaml/YamlJdbcConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/yaml/YamlJdbcConfiguration.java
deleted file mode 100644
index 8ea7848bbdebf..0000000000000
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/yaml/YamlJdbcConfiguration.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.yaml;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-
-/**
- * JDBC configuration for YAML.
- */
-@Getter
-@Setter
-public final class YamlJdbcConfiguration implements YamlConfiguration {
-    
-    private String url;
-    
-    private String username;
-    
-    private String password;
-}
diff --git a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java
index 7d185055e3b34..b5bd9f6aa90d6 100644
--- a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java
+++ b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.api.type;
 
-import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration;
 import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 import org.junit.jupiter.api.Test;
@@ -78,14 +77,10 @@ private void assertGetConfig(final StandardPipelineDataSourceConfiguration actua
         assertThat(actual.getType(), is(StandardPipelineDataSourceConfiguration.TYPE));
         DataSourcePoolProperties props = (DataSourcePoolProperties) actual.getDataSourceConfiguration();
         assertThat(props.getPoolClassName(), is("com.zaxxer.hikari.HikariDataSource"));
-        assertGetJdbcConfig(actual.getJdbcConfig());
-        assertDataSourcePoolProperties(props);
-    }
-    
-    private void assertGetJdbcConfig(final YamlJdbcConfiguration actual) {
         assertThat(actual.getUrl(), is(JDBC_URL));
         assertThat(actual.getUsername(), is(USERNAME));
         assertThat(actual.getPassword(), is(PASSWORD));
+        assertDataSourcePoolProperties(props);
     }
     
     private void assertDataSourcePoolProperties(final DataSourcePoolProperties props) {
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/config/yaml/YamlJdbcConfigurationTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/config/yaml/YamlJdbcConfigurationTest.java
deleted file mode 100644
index 57baee6a8ef7c..0000000000000
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/config/yaml/YamlJdbcConfigurationTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.datasource.config.yaml;
-
-import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-class YamlJdbcConfigurationTest {
-    
-    private static final String JDBC_URL = "jdbc:mysql://127.0.0.1:3306/demo_ds_0?serverTimezone=UTC&useSSL=false";
-    
-    private static final String USERNAME = "root";
-    
-    private static final String PASSWORD = "password";
-    
-    @Test
-    void assertConstructionWithUrl() {
-        assertYamlJdbcConfiguration(YamlEngine.unmarshal(YamlEngine.marshal(getDataSourcePoolPropertiesWithUrl()), YamlJdbcConfiguration.class));
-    }
-    
-    private Map<String, String> getDataSourcePoolPropertiesWithUrl() {
-        Map<String, String> result = new HashMap<>(3, 1F);
-        result.put("url", JDBC_URL);
-        result.put("username", USERNAME);
-        result.put("password", PASSWORD);
-        return result;
-    }
-    
-    private void assertYamlJdbcConfiguration(final YamlJdbcConfiguration actual) {
-        assertThat(actual.getUrl(), is(JDBC_URL));
-        assertThat(actual.getUsername(), is(USERNAME));
-        assertThat(actual.getPassword(), is(PASSWORD));
-    }
-}
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index f3d4fd2d37f42..d64fd12236177 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -20,8 +20,13 @@
 import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
+import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
+import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
@@ -30,12 +35,6 @@
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
-import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
@@ -86,11 +85,12 @@ public MySQLIncrementalDumper(final IncrementalDumperContext dumperContext, fina
         this.binlogPosition = (BinlogPosition) binlogPosition;
         this.channel = channel;
         this.metaDataLoader = metaDataLoader;
-        YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).getJdbcConfig();
+        StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig();
         ConnectionPropertiesParser parser = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, TypedSPILoader.getService(DatabaseType.class, "MySQL"));
-        ConnectionProperties connectionProps = parser.parse(jdbcConfig.getUrl(), null, null);
-        ConnectInfo connectInfo = new ConnectInfo(generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword());
-        log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={}, port={}", jdbcConfig.getUrl(), connectInfo.getServerId(), connectInfo.getHost(), connectInfo.getPort());
+        ConnectionProperties connectionProps = parser.parse(pipelineDataSourceConfig.getUrl(), null, null);
+        ConnectInfo connectInfo = new ConnectInfo(
+                generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), pipelineDataSourceConfig.getUsername(), pipelineDataSourceConfig.getPassword());
+        log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={}, port={}", pipelineDataSourceConfig.getUrl(), connectInfo.getServerId(), connectInfo.getHost(), connectInfo.getPort());
         client = new MySQLClient(connectInfo, dumperContext.isDecodeWithTX());
         catalog = connectionProps.getCatalog();
     }
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
index ffcb2a8a33389..082134dc0f29a 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
@@ -19,7 +19,6 @@
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
 import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrl;
 import org.apache.shardingsphere.infra.database.core.connector.url.StandardJdbcUrlParser;
@@ -52,18 +51,17 @@ public final class OpenGaussLogicalReplication {
      */
     public Connection createConnection(final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
         Properties props = new Properties();
-        YamlJdbcConfiguration jdbcConfig = pipelineDataSourceConfig.getJdbcConfig();
-        PGProperty.USER.set(props, jdbcConfig.getUsername());
-        PGProperty.PASSWORD.set(props, jdbcConfig.getPassword());
+        PGProperty.USER.set(props, pipelineDataSourceConfig.getUsername());
+        PGProperty.PASSWORD.set(props, pipelineDataSourceConfig.getPassword());
         PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
         PGProperty.REPLICATION.set(props, "database");
         PGProperty.PREFER_QUERY_MODE.set(props, "simple");
         try {
-            return DriverManager.getConnection(jdbcConfig.getUrl(), props);
+            return DriverManager.getConnection(pipelineDataSourceConfig.getUrl(), props);
         } catch (final SQLException ex) {
             if (failedBecauseOfNonHAPort(ex)) {
                 log.info("Failed to connect to openGauss caused by: {} - {}. Try connecting to HA port.", ex.getSQLState(), ex.getMessage());
-                return tryConnectingToHAPort(jdbcConfig.getUrl(), props);
+                return tryConnectingToHAPort(pipelineDataSourceConfig.getUrl(), props);
             }
             throw ex;
         }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java
index 4efa1384423c8..3b8e8142a8172 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
 
 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
 import org.postgresql.PGConnection;
 import org.postgresql.PGProperty;
@@ -44,13 +43,12 @@ public final class PostgreSQLLogicalReplication {
      */
     public Connection createConnection(final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
         Properties props = new Properties();
-        YamlJdbcConfiguration jdbcConfig = pipelineDataSourceConfig.getJdbcConfig();
-        PGProperty.USER.set(props, jdbcConfig.getUsername());
-        PGProperty.PASSWORD.set(props, jdbcConfig.getPassword());
+        PGProperty.USER.set(props, pipelineDataSourceConfig.getUsername());
+        PGProperty.PASSWORD.set(props, pipelineDataSourceConfig.getPassword());
         PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6");
         PGProperty.REPLICATION.set(props, "database");
         PGProperty.PREFER_QUERY_MODE.set(props, "simple");
-        return DriverManager.getConnection(jdbcConfig.getUrl(), props);
+        return DriverManager.getConnection(pipelineDataSourceConfig.getUrl(), props);
     }
     
     /**