From 9aa96acb3a53014f52ae1e72bf3203cf9e5301dd Mon Sep 17 00:00:00 2001 From: hi-kbys Date: Tue, 4 Jun 2024 14:04:47 +0900 Subject: [PATCH 1/7] feat: add get database name --- .../org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java | 6 +++++- .../java/org/embulk/output/jdbc/JdbcOutputConnection.java | 5 +++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java index 9f2c1cb9..41cf480d 100644 --- a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java +++ b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java @@ -583,7 +583,7 @@ protected void doBegin(JdbcOutputConnection con, } } } - task.setActualTable(new TableIdentifier(null, con.getSchemaName(), actualTable)); + task.setActualTable(new TableIdentifier(con.getDatabaseName(), con.getSchemaName(), actualTable)); Optional initialTargetTableSchema = mode.ignoreTargetTableSchema() ? @@ -999,6 +999,9 @@ public Optional newJdbcSchemaFromTableIfExists(JdbcOutputConnection final Set primaryKeys = Collections.unmodifiableSet(primaryKeysBuilder); final ArrayList builder = new ArrayList<>(); + logger.info("table database: {}", table.getDatabase()); + logger.info("table schema: {}", table.getSchemaName()); + logger.info("table name: {}", table.getTableName()); rs = dbm.getColumns( JdbcUtils.escapeSearchString(table.getDatabase(), escape), JdbcUtils.escapeSearchString(table.getSchemaName(), escape), @@ -1007,6 +1010,7 @@ public Optional newJdbcSchemaFromTableIfExists(JdbcOutputConnection try { while (rs.next()) { String columnName = rs.getString("COLUMN_NAME"); + logger.info("column name {}", columnName); String simpleTypeName = rs.getString("TYPE_NAME").toUpperCase(Locale.ENGLISH); boolean isUniqueKey = primaryKeys.contains(columnName); int sqlType = rs.getInt("DATA_TYPE"); diff --git a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java index 24287eca..e2f70cf1 100644 --- a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java +++ b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java @@ -61,6 +61,11 @@ public void close() throws SQLException } } + public String getDatabaseName() throws SQLException + { + return connection.getCatalog(); + } + public String getSchemaName() { return schemaName; From 07af87cc0e6885ff3402155b4f8db477bad0b06f Mon Sep 17 00:00:00 2001 From: hi-kbys Date: Tue, 4 Jun 2024 14:32:50 +0900 Subject: [PATCH 2/7] remove comment --- .../java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java index 41cf480d..d80408bd 100644 --- a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java +++ b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java @@ -999,9 +999,6 @@ public Optional newJdbcSchemaFromTableIfExists(JdbcOutputConnection final Set primaryKeys = Collections.unmodifiableSet(primaryKeysBuilder); final ArrayList builder = new ArrayList<>(); - logger.info("table database: {}", table.getDatabase()); - logger.info("table schema: {}", table.getSchemaName()); - logger.info("table name: {}", table.getTableName()); rs = dbm.getColumns( JdbcUtils.escapeSearchString(table.getDatabase(), escape), JdbcUtils.escapeSearchString(table.getSchemaName(), escape), @@ -1010,7 +1007,6 @@ public Optional newJdbcSchemaFromTableIfExists(JdbcOutputConnection try { while (rs.next()) { String columnName = rs.getString("COLUMN_NAME"); - logger.info("column name {}", columnName); String simpleTypeName = rs.getString("TYPE_NAME").toUpperCase(Locale.ENGLISH); boolean isUniqueKey = primaryKeys.contains(columnName); int sqlType = rs.getInt("DATA_TYPE"); From 97caed5e6c05878169f4f2e6daf85df7411e4a9f Mon Sep 17 00:00:00 2001 From: hi-kbys Date: Tue, 4 Jun 2024 14:59:39 +0900 Subject: [PATCH 3/7] temp: throw error --- .../java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java index d80408bd..650ac04b 100644 --- a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java +++ b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java @@ -981,6 +981,7 @@ public Optional newJdbcSchemaFromTableIfExists(JdbcOutputConnection { if (!connection.tableExists(table)) { // DatabaseMetaData.getPrimaryKeys fails if table does not exist + throw new SQLException(); return Optional.empty(); } @@ -1028,6 +1029,7 @@ public Optional newJdbcSchemaFromTableIfExists(JdbcOutputConnection } final List columns = Collections.unmodifiableList(builder); if (columns.isEmpty()) { + logger.info("column is empty!"); return Optional.empty(); } else { return Optional.of(new JdbcSchema(columns)); From 79d2e959ac6babf81ddf69940bf06973eadb6e5a Mon Sep 17 00:00:00 2001 From: hi-kbys Date: Tue, 4 Jun 2024 15:03:44 +0900 Subject: [PATCH 4/7] temp: remove return stmt --- .../java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java index 650ac04b..55898506 100644 --- a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java +++ b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java @@ -982,7 +982,7 @@ public Optional newJdbcSchemaFromTableIfExists(JdbcOutputConnection if (!connection.tableExists(table)) { // DatabaseMetaData.getPrimaryKeys fails if table does not exist throw new SQLException(); - return Optional.empty(); + // return Optional.empty(); } DatabaseMetaData dbm = connection.getMetaData(); From 9f29dc35a5d0e3658abeab036d114082f4676811 Mon Sep 17 00:00:00 2001 From: hi-kbys Date: Tue, 4 Jun 2024 15:27:52 +0900 Subject: [PATCH 5/7] add nullDatabaseMeansCurrent=true --- .../java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java | 3 +-- .../src/main/java/org/embulk/output/MySQLOutputPlugin.java | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java index 55898506..c136d59c 100644 --- a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java +++ b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java @@ -981,8 +981,7 @@ public Optional newJdbcSchemaFromTableIfExists(JdbcOutputConnection { if (!connection.tableExists(table)) { // DatabaseMetaData.getPrimaryKeys fails if table does not exist - throw new SQLException(); - // return Optional.empty(); + return Optional.empty(); } DatabaseMetaData dbm = connection.getMetaData(); diff --git a/embulk-output-mysql/src/main/java/org/embulk/output/MySQLOutputPlugin.java b/embulk-output-mysql/src/main/java/org/embulk/output/MySQLOutputPlugin.java index ea90aace..571f3379 100644 --- a/embulk-output-mysql/src/main/java/org/embulk/output/MySQLOutputPlugin.java +++ b/embulk-output-mysql/src/main/java/org/embulk/output/MySQLOutputPlugin.java @@ -88,6 +88,8 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet props.setProperty("connectTimeout", "300000"); // milliseconds props.setProperty("socketTimeout", "1800000"); // smillieconds + props.setProperty("nullDatabaseMeansCurrent", "true"); + // Enable keepalive based on tcp_keepalive_time, tcp_keepalive_intvl and tcp_keepalive_probes kernel parameters. // Socket options TCP_KEEPCNT, TCP_KEEPIDLE, and TCP_KEEPINTVL are not configurable. props.setProperty("tcpKeepAlive", "true"); From 92b81b52952316b142b4ab61a12407045c5f1be8 Mon Sep 17 00:00:00 2001 From: hi-kbys Date: Tue, 4 Jun 2024 16:11:43 +0900 Subject: [PATCH 6/7] change nullCatalogMeansCurrent options --- .../src/main/java/org/embulk/output/MySQLOutputPlugin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/embulk-output-mysql/src/main/java/org/embulk/output/MySQLOutputPlugin.java b/embulk-output-mysql/src/main/java/org/embulk/output/MySQLOutputPlugin.java index 571f3379..1ea4dcfb 100644 --- a/embulk-output-mysql/src/main/java/org/embulk/output/MySQLOutputPlugin.java +++ b/embulk-output-mysql/src/main/java/org/embulk/output/MySQLOutputPlugin.java @@ -88,7 +88,7 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet props.setProperty("connectTimeout", "300000"); // milliseconds props.setProperty("socketTimeout", "1800000"); // smillieconds - props.setProperty("nullDatabaseMeansCurrent", "true"); + props.setProperty("nullCatalogMeansCurrent", "false"); // Enable keepalive based on tcp_keepalive_time, tcp_keepalive_intvl and tcp_keepalive_probes kernel parameters. // Socket options TCP_KEEPCNT, TCP_KEEPIDLE, and TCP_KEEPINTVL are not configurable. From 88b6964cb18bb7a83d0b7d391b0a8d1d57e64d8e Mon Sep 17 00:00:00 2001 From: hi-kbys Date: Tue, 4 Jun 2024 16:11:57 +0900 Subject: [PATCH 7/7] remove debug log --- .../java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java | 1 - 1 file changed, 1 deletion(-) diff --git a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java index c136d59c..d80408bd 100644 --- a/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java +++ b/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java @@ -1028,7 +1028,6 @@ public Optional newJdbcSchemaFromTableIfExists(JdbcOutputConnection } final List columns = Collections.unmodifiableList(builder); if (columns.isEmpty()) { - logger.info("column is empty!"); return Optional.empty(); } else { return Optional.of(new JdbcSchema(columns));