diff --git a/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java b/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java index 2064b33..eada6c2 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java +++ b/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java @@ -15,6 +15,7 @@ * Created by huochen on 2017/09/23. */ public class Clickhouse extends BaseOutput{ + private final static int BULKSIZE = 1000; private String host; @@ -22,7 +23,8 @@ public class Clickhouse extends BaseOutput{ private String table; private String jdbcLink; private List fields; - private List replace_fields; + private List replace_include_fields; + private List replace_exclude_fields; private int bulkSize; private String preSql = initSql(); private List events; @@ -34,38 +36,48 @@ public class Clickhouse extends BaseOutput{ protected void prepare() { this.events = new ArrayList(); - if(!this.config.containsKey("host")) { + if (!this.config.containsKey("host")) { System.out.println("hostname must be included in config"); System.exit(1); } this.host = (String) this.config.get("host"); - if(!this.config.containsKey("database")) { + if (!this.config.containsKey("database")) { System.out.println("database must be included in config"); System.exit(1); } this.database = (String) this.config.get("database"); - if(!this.config.containsKey("table")) { + if (!this.config.containsKey("table")) { System.out.println("table must be included in config"); System.exit(1); } this.table = (String) this.config.get("table"); - if(this.config.containsKey("bulk_size")) { + if (this.config.containsKey("bulk_size")) { this.bulkSize = (Integer) this.config.get("bulk_size"); } else { this.bulkSize = BULKSIZE; } - if(this.config.containsKey("fields")) { + if (this.config.containsKey("fields")) { this.fields = (List) this.config.get("fields"); } else { System.out.println("fields must be included in config"); } - if(this.config.containsKey("replace_fields")) { - this.replace_fields = (List) this.config.get("replace_fields"); + if (this.config.containsKey("replace_include_fields")) { + this.replace_include_fields = (List) this.config.get("replace_include_fields"); + } + + if (this.config.containsKey("replace_exclude_fields")) { + this.replace_exclude_fields = (List) this.config.get("replace_exclude_fields"); + } + + if (this.config.containsKey("replace_include_fields") && this.config.containsKey("replace_exclude_fields")) { + System.out.println("Replace_include_fields and replace_exclude_fields exist at the same time," + + " please use one of them"); + System.exit(1); } this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database); @@ -91,56 +103,68 @@ protected void prepare() { } protected String initSql() { + String init = String.format("insert into %s (%s) values", this.table, String.join(" ,", this.fields)); return init; } - protected void bulkInsert(Map event) throws Exception{ + protected String dealWithQuote(String str) { + /* + * 因为Clickhouse SQL语句必须用单引号' + * insert into test.test (date, value) values ('2017-10-29', '23') + * SQL语句需要将数值中的单引号'转义 + * */ + + if (str.indexOf("'") < 0) { + return str; + } else if (str.indexOf("\\'") > 0) { +// deal with "\'" + return str.replace("'", "\\'").replace("\\\\'", "\\\\\\'"); + } else { + return str.replace("'", "\\'"); + } + } - this.events.add(event); - if(this.events.size() == this.bulkSize) { - StringBuilder sqls = new StringBuilder(preSql); - for (int i = 0; i < this.events.size(); i++) { - Map e = events.get(i); - String value = "("; - for (int j =0; j < fields.size(); j++) { - String field = fields.get(j); - // 判断元数据中是否有对应的key - - if (e.containsKey(field)) { - if (e.get(field) instanceof String) { - String fieldValue = e.get(field).toString(); - if (this.replace_fields != null && this.replace_fields.contains(field)) { - if (!(fieldValue.indexOf("'") > 0)){ - value += "'" + e.get(field) + "'"; - } else { - if (fieldValue.indexOf("\\'") > 0) { - value += "'" + fieldValue.replace("'", "\\'").replace("\\\\'", "\\\\\\'") + "'"; - } - else { - value += "'" + fieldValue.replace("'", "\\'") + "'"; - } - } - } else { - value += "'" + fieldValue + "'"; - } + protected StringBuilder makeUpSql(List events) { + + StringBuilder sqls = new StringBuilder(preSql); + for(Map e: events) { + String value = "("; + for(String field: fields) { + if (e.containsKey(field)) { + if (e.get(field) instanceof String) { + String fieldValue = e.get(field).toString(); + if ((this.replace_include_fields != null && this.replace_include_fields.contains(field)) || + (this.replace_exclude_fields != null && !this.replace_exclude_fields.contains(field))) { + dealWithQuote(fieldValue); } else { - if (e.get(field) == null){ - value += "''"; - } else { - value += e.get(field); - } + value += "'" + fieldValue + "'"; } } else { - value += ClickhouseUtils.renderDefault(this.schema.get(field)); - } - if(j != fields.size() -1 ) { - value += ","; + if (e.get(field) == null){ + value += "''"; + } else { + value += e.get(field); + } } + } else { + value += ClickhouseUtils.renderDefault(this.schema.get(field)); + } + if (fields.indexOf(field) != fields.size() -1) { + value += ","; } - value += ")"; - sqls.append(value); } + value += ")"; + sqls.append(value); + } + return sqls; + } + + protected void bulkInsert(Map event) throws Exception{ + + this.events.add(event); + if(this.events.size() == this.bulkSize) { + StringBuilder sqls = makeUpSql(this.events); ClickHouseProperties properties = new ClickHouseProperties(); BalancedClickhouseDataSource balanced = new BalancedClickhouseDataSource(this.jdbcLink, properties); @@ -153,7 +177,6 @@ protected void bulkInsert(Map event) throws Exception{ for (int i = 0; i < this.events.size(); i++) { System.out.println(events.get(i)); } - // System.out.println("insert error"); } catch (Exception e) { System.out.println(e.toString()); System.out.println("error");