From 541334ea7a3c830179a85c536b1855cdbdc203b5 Mon Sep 17 00:00:00 2001 From: RickyHuo Date: Thu, 21 Jun 2018 17:13:05 +0800 Subject: [PATCH] Support ClickHouse-Native-JDBC --- README.md | 2 +- pom.xml | 14 ++-- .../bip/hangout/outputs/ClickhouseUtils.java | 7 +- .../com/sina/bip/hangout/outputs/Native.java | 65 +++++++++++-------- .../bip/hangout/outputs/TabSeparated.java | 3 +- .../sina/bip/hangout/outputs/NativeTest.java | 20 ------ 6 files changed, 51 insertions(+), 60 deletions(-) delete mode 100644 src/test/java/com/sina/bip/hangout/outputs/NativeTest.java diff --git a/README.md b/README.md index beff10c..2bd4a33 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ * Author: rickyHuo * Homepage: https://github.com/RickyHuo/hangout-output-clickhouse -* Version: 0.0.6 +* Version: 0.0.7 ### Description diff --git a/pom.xml b/pom.xml index 8ad0341..96a3b1b 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ sina hangout-output-plugins-clickhouse - 0.0.6 + 0.0.7 jar hangout-output-plugins-clickhouse @@ -22,10 +22,15 @@ system C:\Users\huochen\.m2\repository\ctrip\hangout-baseplugin\0.3.0\hangout-baseplugin-0.3.0.jar + + com.github.housepower + clickhouse-native-jdbc + 1.1-testing + ru.yandex.clickhouse clickhouse-jdbc - 0.1.36 + 0.1.40 org.apache.logging.log4j @@ -39,11 +44,6 @@ 2.7 provided - - com.github.housepower - clickhouse-native-jdbc - 1.0-testing - diff --git a/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java b/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java index f520327..d3fb1a6 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java +++ b/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java @@ -1,9 +1,8 @@ package com.sina.bip.hangout.outputs; -import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import com.github.housepower.jdbc.ClickHouseConnection; import ru.yandex.clickhouse.ClickHouseConnectionImpl; -import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; @@ -29,8 +28,8 @@ public static Map getSchema(ClickHouseConnectionImpl connection, return schema; } - public static Map getSchema(Connection connection, String table) throws SQLException { - String sql = String.format("desc %s", table); + public static Map getSchema(ClickHouseConnection connection, String db, String table) throws SQLException { + String sql = String.format("desc %s.%s", db, table); ResultSet resultSet = connection.createStatement().executeQuery(sql); Map schema = new HashMap(); while(resultSet.next()) { diff --git a/src/main/java/com/sina/bip/hangout/outputs/Native.java b/src/main/java/com/sina/bip/hangout/outputs/Native.java index e47d14a..b504f20 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/Native.java +++ b/src/main/java/com/sina/bip/hangout/outputs/Native.java @@ -1,17 +1,15 @@ package com.sina.bip.hangout.outputs; import com.ctrip.ops.sysdev.render.TemplateRender; +import com.github.housepower.jdbc.ClickHouseConnection; +import com.github.housepower.jdbc.settings.ClickHouseConfig; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.text.SimpleDateFormat; +import java.util.*; public class Native implements FormatParse { @@ -26,8 +24,10 @@ public class Native implements FormatParse { private Boolean withCredit; private String user; private String password; - private Connection conn; + private ClickHouseConnection conn; private Map templateRenderMap; + private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + private SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public Native(Map config) { this.config = config; @@ -65,32 +65,28 @@ public void prepare() { this.withCredit = false; } - // 连接验证 - this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database); + this.jdbcLink = String.format("jdbc:clickhouse://%s", this.host); + log.info(this.jdbcLink); - try { - Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); - } catch (ClassNotFoundException e) { - log.error(e); - System.exit(1); + Properties p = new Properties(); + if (this.withCredit) { + p.put("user", this.user); + p.put("password", this.password); } try { - this.conn = DriverManager.getConnection(this.jdbcLink); - - if (this.withCredit) { - this.conn = DriverManager.getConnection(this.jdbcLink, this.user, this.password); - } - } catch (Exception e) { + ClickHouseConfig cf = new ClickHouseConfig(this.jdbcLink, p); + this.conn = ClickHouseConnection.createClickHouseConnection(cf); + } catch (SQLException e) { log.error(e); - log.error("Cannot connect to data source"); System.exit(1); } try { - this.schema = ClickhouseUtils.getSchema(this.conn, this.table); + this.schema = ClickhouseUtils.getSchema(this.conn, this.database, this.table); } catch (SQLException e) { + log.error(e); log.error("input table is not valid"); System.exit(1); } @@ -125,7 +121,8 @@ public String initSql() { realFields.add(ClickhouseUtils.realField(field)); } - String init = String.format("insert into %s (%s) values (%s)", this.table, + String init = String.format("insert into %s.%s (%s) values (%s)", this.database, + this.table, String.join(", ", realFields), ClickhouseUtils.tabSeparatedPreSql(this.fields.size())); @@ -173,8 +170,8 @@ public void bulkInsert(List events) throws Exception { case "UInt32": if (fieldValue != null) { try { - long v = ((Number) fieldValue).longValue(); - statement.setLong(i + 1, v); + int v = ((Number) fieldValue).intValue(); + statement.setInt(i + 1, v); } catch (Exception exp) { String msg = String.format("Cannot Convert %s %s to long, render default", fieldValue.getClass(), fieldValue); @@ -188,14 +185,28 @@ public void bulkInsert(List events) throws Exception { break; case "String": - case "DateTime": - case "Date": if (fieldValue != null) { statement.setString(i + 1, fieldValue.toString()); } else { statement.setString(i + 1, ""); } break; + case "DateTime": + if (fieldValue != null) { + Date date = this.datetimeFormat.parse(fieldValue.toString()); + statement.setTimestamp(i + 1, new java.sql.Timestamp(date.getTime())); + } else { + statement.setTimestamp(i + 1, new java.sql.Timestamp(System.currentTimeMillis())); + } + break; + case "Date": + if (fieldValue != null) { + Date date = this.dateFormat.parse(fieldValue.toString()); + statement.setDate(i + 1, new java.sql.Date(date.getTime())); + } else { + statement.setDate(i + 1, new java.sql.Date(System.currentTimeMillis())); + } + break; case "Float32": case "Float64": if (fieldValue != null) { diff --git a/src/main/java/com/sina/bip/hangout/outputs/TabSeparated.java b/src/main/java/com/sina/bip/hangout/outputs/TabSeparated.java index 873026e..f8e687c 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/TabSeparated.java +++ b/src/main/java/com/sina/bip/hangout/outputs/TabSeparated.java @@ -137,7 +137,8 @@ public String initSql() { realFields.add(ClickhouseUtils.realField(field)); } - String init = String.format("insert into %s (%s) values (%s)", this.table, + String init = String.format("insert into %s.%s (%s) values (%s)", this.database, + this.table, String.join(", ", realFields), ClickhouseUtils.tabSeparatedPreSql(this.fields.size())); diff --git a/src/test/java/com/sina/bip/hangout/outputs/NativeTest.java b/src/test/java/com/sina/bip/hangout/outputs/NativeTest.java deleted file mode 100644 index 17cbe30..0000000 --- a/src/test/java/com/sina/bip/hangout/outputs/NativeTest.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.sina.bip.hangout.outputs; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; - -public class NativeTest { - public static void main(String[] args) throws Exception { - Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); - Connection connection = DriverManager.getConnection("jdbc:clickhouse://ck31.mars.grid.sina.com.cn:9000"); - - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("desc test.dpool_msg"); - - while (rs.next()) { - System.out.println(rs.getInt(1) + "\t" + rs.getLong(2)); - } - } -}