From feb9468a733629d68a030e7f47a170ded9944fe6 Mon Sep 17 00:00:00 2001 From: RickyHuo Date: Tue, 6 Mar 2018 18:35:46 +0800 Subject: [PATCH 1/4] Support format JSONEachRow --- README.md | 21 +++- .../sina/bip/hangout/outputs/Clickhouse.java | 109 +++++++++++------- 2 files changed, 85 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index 0a92aab..8451cf6 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ | [bulk_size](#bulk_size-list) | int | no | 1000 | | [database](#database-string) | string | yes | - | | [fields](#fields-list) | list | yes | - | +| [format](#format-string) | string | no | TabSeparated | | [host](#host-string) | string | yes | - | | [replace_include_fields](#replace_include_fields-list) | list | no | - | | [replace_exclude_fields](#replace_exclude_fields-list) | list | no | - | @@ -34,6 +35,12 @@ database table fields, 必须和Hangout清洗后的字段保持一致 +##### format [string] + +数据插入格式[Format Introduction](https://clickhouse.yandex/docs/en/formats/) + +当前支持`TabSeparated`以及`JSONEachRow` + ##### host [string] ClickHouse cluster host @@ -73,5 +80,17 @@ outputs: bulk_size: 500 ``` -> 将fields中对应的字段写入ClickHouse,且对`_ping_big`字段中的单引号进行转义 +> 使用默认的`TabSeparated`将fields中对应的字段写入ClickHouse,且对`_ping_big`字段中的单引号进行转义 +``` +outputs: + - com.sina.bip.hangout.outputs.Clickhouse: + host: clickhouse.bip.sina.com.cn:8123 + username: user + password: passwd + database: apm + format: JSONEachRow + table: apm_netdiagno + bulk_size: 500 +``` +> 使用`JSONEachRow`将数据写入ClickHouse \ No newline at end of file diff --git a/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java b/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java index 77162ea..39e0922 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java +++ b/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java @@ -1,15 +1,13 @@ package com.sina.bip.hangout.outputs; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import com.ctrip.ops.sysdev.baseplugin.BaseOutput; import com.ctrip.ops.sysdev.render.TemplateRender; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.json.simple.JSONObject; import ru.yandex.clickhouse.BalancedClickhouseDataSource; import ru.yandex.clickhouse.ClickHouseConnectionImpl; import ru.yandex.clickhouse.settings.ClickHouseProperties; @@ -21,6 +19,7 @@ public class Clickhouse extends BaseOutput { + private static List formats = Arrays.asList("TabSeparated", "JSONEachRow"); private static final Logger log = LogManager.getLogger(Clickhouse.class); private final static int BULKSIZE = 1000; @@ -29,6 +28,7 @@ public class Clickhouse extends BaseOutput { private String database; private String table; private String jdbcLink; + private String format; private List fields; private List replace_include_fields; private List replace_exclude_fields; @@ -46,7 +46,7 @@ public class Clickhouse extends BaseOutput { public Clickhouse(Map config) { super (config); } protected void prepare() { - this.events = new ArrayList(); + this.events = new ArrayList<>(); if (!this.config.containsKey("host")) { log.error("hostname must be included in config"); @@ -83,6 +83,16 @@ protected void prepare() { this.bulkSize = BULKSIZE; } + if (this.config.containsKey("format")) { + this.format = (String) this.config.get("format"); + if (!this.formats.contains(this.format)) { + log.error("Not support format: " + this.format); + System.exit(1); + } + } else { + this.format = "TabSeparated"; + } + if (this.config.containsKey("fields")) { this.fields = (List) this.config.get("fields"); } else { @@ -131,8 +141,6 @@ protected void prepare() { this.templateRenderMap = new HashMap<>(); for (String field: fields) { - - // Check remote clickhouse tables whether contain all field or not if (!this.schema.containsKey(ClickhouseUtils.realField(field))) { String msg = String.format("table [%s] doesn't contain field '%s'", this.table, field); log.error(msg); @@ -150,14 +158,20 @@ protected void prepare() { private String initSql() { - List realFields = new ArrayList<>(); - for(String field: fields) { - realFields.add(ClickhouseUtils.realField(field)); - } + if (this.format.equals("TabSeparated")) { + List realFields = new ArrayList<>(); + for(String field: fields) { + realFields.add(ClickhouseUtils.realField(field)); + } - String init = String.format("insert into %s (%s) values", this.table, String.join(" ,", realFields)); - log.debug("init sql: "+ init); - return init; + String init = String.format("insert into %s (%s) values", this.table, String.join(" ,", realFields)); + log.debug("init sql: "+ init); + return init; + } else { + String init = String.format("insert into %s format JSONEachRow "); + log.debug("init sql: "+ init); + return init; + } } private String dealWithQuote(String str) { @@ -180,38 +194,45 @@ private String dealWithQuote(String str) { private StringBuilder makeUpSql(List events) { StringBuilder sqls = new StringBuilder(preSql); - for(Map e: events) { - StringBuilder value = new StringBuilder("("); - for(String field: fields) { - Object fieldValue = this.templateRenderMap.get(field).render(e); - if (fieldValue != null) { - if (fieldValue instanceof String) { - if ((this.replace_include_fields != null && this.replace_include_fields.contains(field)) || - (this.replace_exclude_fields != null && !this.replace_exclude_fields.contains(field))) { - value.append("'"); - value.append(dealWithQuote(fieldValue.toString())); - value.append("'"); + + if ((this.format.equals("TabSeparated"))) { + for(Map e: events) { + StringBuilder value = new StringBuilder("("); + for(String field: fields) { + Object fieldValue = this.templateRenderMap.get(field).render(e); + if (fieldValue != null) { + if (fieldValue instanceof String) { + if ((this.replace_include_fields != null && this.replace_include_fields.contains(field)) || + (this.replace_exclude_fields != null && !this.replace_exclude_fields.contains(field))) { + value.append("'"); + value.append(dealWithQuote(fieldValue.toString())); + value.append("'"); + } else { + value.append("'"); + value.append(fieldValue.toString()); + value.append("'"); + } } else { - value.append("'"); - value.append(fieldValue.toString()); - value.append("'"); + if (e.get(field) == null){ + value.append("''"); + } else { + value.append(e.get(field)); + } } } else { - if (e.get(field) == null){ - value.append("''"); - } else { - value.append(e.get(field)); - } + value.append(ClickhouseUtils.renderDefault(this.schema.get(ClickhouseUtils.realField(field)))); + } + if (fields.indexOf(field) != fields.size() -1) { + value.append(","); } - } else { - value.append(ClickhouseUtils.renderDefault(this.schema.get(ClickhouseUtils.realField(field)))); - } - if (fields.indexOf(field) != fields.size() -1) { - value.append(","); } + value.append(")"); + sqls.append(value); + } + } else { + for(Map e: events) { + sqls.append(JSONObject.toJSONString(e)); } - value.append(")"); - sqls.append(value); } return sqls; } @@ -227,11 +248,11 @@ private void bulkInsert(List events) throws Exception { this.conn.createStatement().execute(sqls.toString()); } catch (SQLException e) { log.error(e); - log.debug(sqls.toString()); + log.error(sqls.toString()); - for (int i = 0; i < this.events.size(); i++) { - log.debug(events.get(i)); - } +// for (int i = 0; i < this.events.size(); i++) { +// log.debug(events.get(i)); +// } } catch (Exception e) { log.error(e); } From 76cb3a0aad2c1769008640ea32b2cde026cf1e74 Mon Sep 17 00:00:00 2001 From: RickyHuo Date: Wed, 7 Mar 2018 17:38:31 +0800 Subject: [PATCH 2/4] Update for format JSONEachRow --- .../sina/bip/hangout/outputs/Clickhouse.java | 92 ++++++++++--------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java b/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java index 39e0922..1ecfded 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java +++ b/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java @@ -47,6 +47,7 @@ public class Clickhouse extends BaseOutput { protected void prepare() { this.events = new ArrayList<>(); + this.templateRenderMap = new HashMap<>(); if (!this.config.containsKey("host")) { log.error("hostname must be included in config"); @@ -83,40 +84,11 @@ protected void prepare() { this.bulkSize = BULKSIZE; } - if (this.config.containsKey("format")) { - this.format = (String) this.config.get("format"); - if (!this.formats.contains(this.format)) { - log.error("Not support format: " + this.format); - System.exit(1); - } - } else { - this.format = "TabSeparated"; - } - - if (this.config.containsKey("fields")) { - this.fields = (List) this.config.get("fields"); - } else { - log.error("fields must be included in config"); - System.exit(1); - } - - if (this.config.containsKey("replace_include_fields")) { - this.replace_include_fields = (List) this.config.get("replace_include_fields"); - } - - if (this.config.containsKey("replace_exclude_fields")) { - this.replace_exclude_fields = (List) this.config.get("replace_exclude_fields"); - } - - if (this.config.containsKey("replace_include_fields") && this.config.containsKey("replace_exclude_fields")) { - log.error("Replace_include_fields and replace_exclude_fields exist at the same time, " + - "please use one of them."); - System.exit(1); - } - + // 连接验证 this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database); ClickHouseProperties properties = new ClickHouseProperties(); + // 避免每次INSERT操作取服务器时间 properties.setUseServerTimeZone(false); this.dataSource = new BalancedClickhouseDataSource(this.jdbcLink, properties); if (this.withCredit) { @@ -127,7 +99,7 @@ protected void prepare() { try { this.conn = (ClickHouseConnectionImpl) dataSource.getConnection(); } catch (Exception e) { - log.error("cannot connection to datasource"); + log.error("Cannot connection to datasource"); log.error(e); System.exit(1); } @@ -139,18 +111,52 @@ protected void prepare() { System.exit(1); } - this.templateRenderMap = new HashMap<>(); - for (String field: fields) { - if (!this.schema.containsKey(ClickhouseUtils.realField(field))) { - String msg = String.format("table [%s] doesn't contain field '%s'", this.table, field); - log.error(msg); + + if (this.config.containsKey("format")) { + this.format = (String) this.config.get("format"); + if (!this.formats.contains(this.format)) { + log.error("Not support format: " + this.format); System.exit(1); } - try { - this.templateRenderMap.put(field, TemplateRender.getRender(field, false)); - } catch (Exception e) { - String msg = String.format("cannot get templateRender of field [%s]", field); - log.warn(msg); + } else { + this.format = "TabSeparated"; + } + + if (this.format.equals("TabSeparated")) { + + if (this.config.containsKey("fields")) { + this.fields = (List) this.config.get("fields"); + } else { + log.error("fields must be included in config"); + System.exit(1); + } + + if (this.config.containsKey("replace_include_fields")) { + this.replace_include_fields = (List) this.config.get("replace_include_fields"); + } + + if (this.config.containsKey("replace_exclude_fields")) { + this.replace_exclude_fields = (List) this.config.get("replace_exclude_fields"); + } + + if (this.config.containsKey("replace_include_fields") && this.config.containsKey("replace_exclude_fields")) { + log.error("Replace_include_fields and replace_exclude_fields exist at the same time, " + + "please use one of them."); + System.exit(1); + } + + for (String field: fields) { + if (!this.schema.containsKey(ClickhouseUtils.realField(field))) { + String msg = String.format("table [%s] doesn't contain field '%s'", this.table, field); + log.error(msg); + System.exit(1); + } + try { + this.templateRenderMap.put(field, TemplateRender.getRender(field, false)); + } catch (Exception e) { + String msg = String.format("cannot get templateRender of field [%s]", field); + log.warn(msg); + } } } @@ -168,7 +174,7 @@ private String initSql() { log.debug("init sql: "+ init); return init; } else { - String init = String.format("insert into %s format JSONEachRow "); + String init = String.format("insert into %s format JSONEachRow ", this.table); log.debug("init sql: "+ init); return init; } From 15878071a900a26b489dfdea6d35479b12fbe019 Mon Sep 17 00:00:00 2001 From: RickyHuo Date: Wed, 7 Mar 2018 18:24:54 +0800 Subject: [PATCH 3/4] Add .travlis.yml --- .travlis.yml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .travlis.yml diff --git a/.travlis.yml b/.travlis.yml new file mode 100644 index 0000000..6f1b2a9 --- /dev/null +++ b/.travlis.yml @@ -0,0 +1,2 @@ +language: java +jdk: oraclejdk8 \ No newline at end of file From 6cf3d4f5de70286f5ce8e28c059fa2332cca38cd Mon Sep 17 00:00:00 2001 From: Ricky Huo Date: Wed, 7 Mar 2018 18:27:59 +0800 Subject: [PATCH 4/4] Rename .travlis.yml to .travis.yml --- .travis.yml | 2 ++ .travlis.yml | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 .travis.yml delete mode 100644 .travlis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..c0f28cf --- /dev/null +++ b/.travis.yml @@ -0,0 +1,2 @@ +language: java +jdk: oraclejdk8 diff --git a/.travlis.yml b/.travlis.yml deleted file mode 100644 index 6f1b2a9..0000000 --- a/.travlis.yml +++ /dev/null @@ -1,2 +0,0 @@ -language: java -jdk: oraclejdk8 \ No newline at end of file