From d6f280f59c94f33e6e1ccf35dcf22e65d5985ed6 Mon Sep 17 00:00:00 2001 From: RickyHuo Date: Wed, 20 Jun 2018 15:48:56 +0800 Subject: [PATCH 1/2] Add Native.java --- pom.xml | 5 + .../sina/bip/hangout/outputs/Clickhouse.java | 21 +- .../bip/hangout/outputs/ClickhouseUtils.java | 12 + .../com/sina/bip/hangout/outputs/Native.java | 232 ++++++++++++++++++ .../com/sina/bip/hangout/outputs/Values.java | 5 + .../sina/bip/hangout/outputs/NativeTest.java | 20 ++ 6 files changed, 284 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/sina/bip/hangout/outputs/Native.java create mode 100644 src/test/java/com/sina/bip/hangout/outputs/NativeTest.java diff --git a/pom.xml b/pom.xml index 2537318..8ad0341 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,11 @@ 2.7 provided + + com.github.housepower + clickhouse-native-jdbc + 1.0-testing + diff --git a/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java b/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java index a81d748..84d25ef 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java +++ b/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java @@ -14,7 +14,6 @@ public class Clickhouse extends BaseOutput { - private static List formats = Arrays.asList("Values", "JSONEachRow", "TabSeparated"); private static final Logger log = LogManager.getLogger(Clickhouse.class); private final static int BULKSIZE = 1000; @@ -29,15 +28,8 @@ public Clickhouse(Map config) { protected void prepare() { - String format = "Values"; - - if (this.config.containsKey("format")) { - format = (String) this.config.get("format"); - if (!this.formats.contains(format)) { - log.error("Not support format: " + format); - System.exit(1); - } - } + String format = "TabSeparated"; + format = (String) this.config.get("format"); switch (format) { case "JSONEachRow": @@ -49,6 +41,13 @@ protected void prepare() { case "TabSeparated": this.formatParse = new TabSeparated(config); break; + case "Native": + this.formatParse = new Native(config); + break; + default: + log.error("Unknown format <%s>", format); + System.exit(1); + break; } this.formatParse.prepare(); @@ -98,7 +97,7 @@ public void shutdown() { this.formatParse.bulkInsert(this.events); } catch (Exception e) { log.info("failed to bulk events before shutdown"); - log.debug(e); + log.error(e); } } } diff --git a/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java b/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java index 20680cd..f520327 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java +++ b/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java @@ -3,6 +3,7 @@ import ru.yandex.clickhouse.BalancedClickhouseDataSource; import ru.yandex.clickhouse.ClickHouseConnectionImpl; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; @@ -28,6 +29,17 @@ public static Map getSchema(ClickHouseConnectionImpl connection, return schema; } + public static Map getSchema(Connection connection, String table) throws SQLException { + String sql = String.format("desc %s", table); + ResultSet resultSet = connection.createStatement().executeQuery(sql); + Map schema = new HashMap(); + while(resultSet.next()) { + + schema.put(resultSet.getString(1), resultSet.getString(2)); + } + return schema; + } + public static String renderDefault(String dataType) { if (dataType.equals("String")) diff --git a/src/main/java/com/sina/bip/hangout/outputs/Native.java b/src/main/java/com/sina/bip/hangout/outputs/Native.java new file mode 100644 index 0000000..e47d14a --- /dev/null +++ b/src/main/java/com/sina/bip/hangout/outputs/Native.java @@ -0,0 +1,232 @@ +package com.sina.bip.hangout.outputs; + +import com.ctrip.ops.sysdev.render.TemplateRender; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Native implements FormatParse { + + private static final Logger log = LogManager.getLogger(Native.class); + private Map config; + private String host; + private String database; + private String table; + private String jdbcLink; + private List fields; + private Map schema; + private Boolean withCredit; + private String user; + private String password; + private Connection conn; + private Map templateRenderMap; + + public Native(Map config) { + this.config = config; + } + + public void prepare() { + this.templateRenderMap = new HashMap<>(); + + if (!this.config.containsKey("host")) { + log.error("hostname must be included in config"); + System.exit(1); + } + this.host = (String) this.config.get("host"); + + if (!this.config.containsKey("database")) { + log.error("database must be included in config"); + System.exit(1); + } + this.database = (String) this.config.get("database"); + + if (!this.config.containsKey("table")) { + log.error("table must be included in config"); + System.exit(1); + } + this.table = (String) this.config.get("table"); + + if(this.config.containsKey("username") && this.config.containsKey("password")) { + this.user = (String) this.config.get("username"); + this.password = (String) this.config.get("password"); + this.withCredit = true; + + } else if (this.config.containsKey("username") || this.config.containsKey("password")) { + log.warn("username and password must be included in config at same time"); + } else { + this.withCredit = false; + } + + + // 连接验证 + this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database); + + try { + Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); + } catch (ClassNotFoundException e) { + log.error(e); + System.exit(1); + } + + try { + this.conn = DriverManager.getConnection(this.jdbcLink); + + if (this.withCredit) { + this.conn = DriverManager.getConnection(this.jdbcLink, this.user, this.password); + } + } catch (Exception e) { + log.error(e); + log.error("Cannot connect to data source"); + System.exit(1); + } + + try { + this.schema = ClickhouseUtils.getSchema(this.conn, this.table); + } catch (SQLException e) { + log.error("input table is not valid"); + System.exit(1); + } + + + if (this.config.containsKey("fields")) { + this.fields = (List) this.config.get("fields"); + } else { + log.error("fields must be included in config"); + System.exit(1); + } + + for (String field: fields) { + if (!this.schema.containsKey(ClickhouseUtils.realField(field))) { + String msg = String.format("table [%s] doesn't contain field '%s'", this.table, field); + log.error(msg); + System.exit(1); + } + try { + this.templateRenderMap.put(field, TemplateRender.getRender(field, false)); + } catch (Exception e) { + String msg = String.format("cannot get templateRender of field [%s]", field); + log.warn(msg); + } + } + } + + public String initSql() { + + List realFields = new ArrayList<>(); + for(String field: this.fields) { + realFields.add(ClickhouseUtils.realField(field)); + } + + String init = String.format("insert into %s (%s) values (%s)", this.table, + String.join(", ", realFields), + ClickhouseUtils.tabSeparatedPreSql(this.fields.size())); + + log.debug("init sql: " + init); + return init; + } + + public void bulkInsert(List events) throws Exception { + + PreparedStatement statement = this.conn.prepareStatement(this.initSql()); + + int size = fields.size(); + for (Map e: events) { + //statement.setString(1, "sd"); + + for (int i=0; i v = (List) fieldValue; + String [] array = v.toArray(new String[v.size()]); + statement.setArray(i + 1, this.conn.createArrayOf("string", array)); + } else { + statement.setArray(i + 1, this.conn.createArrayOf("string", new String[1])); + } + + } + } + statement.addBatch(); + } + statement.executeBatch(); + } + +} diff --git a/src/main/java/com/sina/bip/hangout/outputs/Values.java b/src/main/java/com/sina/bip/hangout/outputs/Values.java index 3c9c4f0..12e1f3b 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/Values.java +++ b/src/main/java/com/sina/bip/hangout/outputs/Values.java @@ -13,6 +13,11 @@ import java.util.List; import java.util.Map; +@Deprecated +/* +* It can be replaced by TabSeparated with better performance +* +* */ public class Values implements FormatParse { private static final Logger log = LogManager.getLogger(Values.class); diff --git a/src/test/java/com/sina/bip/hangout/outputs/NativeTest.java b/src/test/java/com/sina/bip/hangout/outputs/NativeTest.java new file mode 100644 index 0000000..17cbe30 --- /dev/null +++ b/src/test/java/com/sina/bip/hangout/outputs/NativeTest.java @@ -0,0 +1,20 @@ +package com.sina.bip.hangout.outputs; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; + +public class NativeTest { + public static void main(String[] args) throws Exception { + Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); + Connection connection = DriverManager.getConnection("jdbc:clickhouse://ck31.mars.grid.sina.com.cn:9000"); + + Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery("desc test.dpool_msg"); + + while (rs.next()) { + System.out.println(rs.getInt(1) + "\t" + rs.getLong(2)); + } + } +} From 541334ea7a3c830179a85c536b1855cdbdc203b5 Mon Sep 17 00:00:00 2001 From: RickyHuo Date: Thu, 21 Jun 2018 17:13:05 +0800 Subject: [PATCH 2/2] Support ClickHouse-Native-JDBC --- README.md | 2 +- pom.xml | 14 ++-- .../bip/hangout/outputs/ClickhouseUtils.java | 7 +- .../com/sina/bip/hangout/outputs/Native.java | 65 +++++++++++-------- .../bip/hangout/outputs/TabSeparated.java | 3 +- .../sina/bip/hangout/outputs/NativeTest.java | 20 ------ 6 files changed, 51 insertions(+), 60 deletions(-) delete mode 100644 src/test/java/com/sina/bip/hangout/outputs/NativeTest.java diff --git a/README.md b/README.md index beff10c..2bd4a33 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ * Author: rickyHuo * Homepage: https://github.com/RickyHuo/hangout-output-clickhouse -* Version: 0.0.6 +* Version: 0.0.7 ### Description diff --git a/pom.xml b/pom.xml index 8ad0341..96a3b1b 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ sina hangout-output-plugins-clickhouse - 0.0.6 + 0.0.7 jar hangout-output-plugins-clickhouse @@ -22,10 +22,15 @@ system C:\Users\huochen\.m2\repository\ctrip\hangout-baseplugin\0.3.0\hangout-baseplugin-0.3.0.jar + + com.github.housepower + clickhouse-native-jdbc + 1.1-testing + ru.yandex.clickhouse clickhouse-jdbc - 0.1.36 + 0.1.40 org.apache.logging.log4j @@ -39,11 +44,6 @@ 2.7 provided - - com.github.housepower - clickhouse-native-jdbc - 1.0-testing - diff --git a/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java b/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java index f520327..d3fb1a6 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java +++ b/src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java @@ -1,9 +1,8 @@ package com.sina.bip.hangout.outputs; -import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import com.github.housepower.jdbc.ClickHouseConnection; import ru.yandex.clickhouse.ClickHouseConnectionImpl; -import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; @@ -29,8 +28,8 @@ public static Map getSchema(ClickHouseConnectionImpl connection, return schema; } - public static Map getSchema(Connection connection, String table) throws SQLException { - String sql = String.format("desc %s", table); + public static Map getSchema(ClickHouseConnection connection, String db, String table) throws SQLException { + String sql = String.format("desc %s.%s", db, table); ResultSet resultSet = connection.createStatement().executeQuery(sql); Map schema = new HashMap(); while(resultSet.next()) { diff --git a/src/main/java/com/sina/bip/hangout/outputs/Native.java b/src/main/java/com/sina/bip/hangout/outputs/Native.java index e47d14a..b504f20 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/Native.java +++ b/src/main/java/com/sina/bip/hangout/outputs/Native.java @@ -1,17 +1,15 @@ package com.sina.bip.hangout.outputs; import com.ctrip.ops.sysdev.render.TemplateRender; +import com.github.housepower.jdbc.ClickHouseConnection; +import com.github.housepower.jdbc.settings.ClickHouseConfig; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.text.SimpleDateFormat; +import java.util.*; public class Native implements FormatParse { @@ -26,8 +24,10 @@ public class Native implements FormatParse { private Boolean withCredit; private String user; private String password; - private Connection conn; + private ClickHouseConnection conn; private Map templateRenderMap; + private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + private SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public Native(Map config) { this.config = config; @@ -65,32 +65,28 @@ public void prepare() { this.withCredit = false; } - // 连接验证 - this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database); + this.jdbcLink = String.format("jdbc:clickhouse://%s", this.host); + log.info(this.jdbcLink); - try { - Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); - } catch (ClassNotFoundException e) { - log.error(e); - System.exit(1); + Properties p = new Properties(); + if (this.withCredit) { + p.put("user", this.user); + p.put("password", this.password); } try { - this.conn = DriverManager.getConnection(this.jdbcLink); - - if (this.withCredit) { - this.conn = DriverManager.getConnection(this.jdbcLink, this.user, this.password); - } - } catch (Exception e) { + ClickHouseConfig cf = new ClickHouseConfig(this.jdbcLink, p); + this.conn = ClickHouseConnection.createClickHouseConnection(cf); + } catch (SQLException e) { log.error(e); - log.error("Cannot connect to data source"); System.exit(1); } try { - this.schema = ClickhouseUtils.getSchema(this.conn, this.table); + this.schema = ClickhouseUtils.getSchema(this.conn, this.database, this.table); } catch (SQLException e) { + log.error(e); log.error("input table is not valid"); System.exit(1); } @@ -125,7 +121,8 @@ public String initSql() { realFields.add(ClickhouseUtils.realField(field)); } - String init = String.format("insert into %s (%s) values (%s)", this.table, + String init = String.format("insert into %s.%s (%s) values (%s)", this.database, + this.table, String.join(", ", realFields), ClickhouseUtils.tabSeparatedPreSql(this.fields.size())); @@ -173,8 +170,8 @@ public void bulkInsert(List events) throws Exception { case "UInt32": if (fieldValue != null) { try { - long v = ((Number) fieldValue).longValue(); - statement.setLong(i + 1, v); + int v = ((Number) fieldValue).intValue(); + statement.setInt(i + 1, v); } catch (Exception exp) { String msg = String.format("Cannot Convert %s %s to long, render default", fieldValue.getClass(), fieldValue); @@ -188,14 +185,28 @@ public void bulkInsert(List events) throws Exception { break; case "String": - case "DateTime": - case "Date": if (fieldValue != null) { statement.setString(i + 1, fieldValue.toString()); } else { statement.setString(i + 1, ""); } break; + case "DateTime": + if (fieldValue != null) { + Date date = this.datetimeFormat.parse(fieldValue.toString()); + statement.setTimestamp(i + 1, new java.sql.Timestamp(date.getTime())); + } else { + statement.setTimestamp(i + 1, new java.sql.Timestamp(System.currentTimeMillis())); + } + break; + case "Date": + if (fieldValue != null) { + Date date = this.dateFormat.parse(fieldValue.toString()); + statement.setDate(i + 1, new java.sql.Date(date.getTime())); + } else { + statement.setDate(i + 1, new java.sql.Date(System.currentTimeMillis())); + } + break; case "Float32": case "Float64": if (fieldValue != null) { diff --git a/src/main/java/com/sina/bip/hangout/outputs/TabSeparated.java b/src/main/java/com/sina/bip/hangout/outputs/TabSeparated.java index 873026e..f8e687c 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/TabSeparated.java +++ b/src/main/java/com/sina/bip/hangout/outputs/TabSeparated.java @@ -137,7 +137,8 @@ public String initSql() { realFields.add(ClickhouseUtils.realField(field)); } - String init = String.format("insert into %s (%s) values (%s)", this.table, + String init = String.format("insert into %s.%s (%s) values (%s)", this.database, + this.table, String.join(", ", realFields), ClickhouseUtils.tabSeparatedPreSql(this.fields.size())); diff --git a/src/test/java/com/sina/bip/hangout/outputs/NativeTest.java b/src/test/java/com/sina/bip/hangout/outputs/NativeTest.java deleted file mode 100644 index 17cbe30..0000000 --- a/src/test/java/com/sina/bip/hangout/outputs/NativeTest.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.sina.bip.hangout.outputs; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; - -public class NativeTest { - public static void main(String[] args) throws Exception { - Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); - Connection connection = DriverManager.getConnection("jdbc:clickhouse://ck31.mars.grid.sina.com.cn:9000"); - - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("desc test.dpool_msg"); - - while (rs.next()) { - System.out.println(rs.getInt(1) + "\t" + rs.getLong(2)); - } - } -}