Skip to content
This repository has been archived by the owner on Jan 6, 2022. It is now read-only.

Commit

Permalink
Merge branch 'rickyhuo.output.enhance'
Browse files Browse the repository at this point in the history
  • Loading branch information
RickyHuo committed Dec 8, 2017
2 parents 9be7990 + bb30049 commit f30ed03
Showing 1 changed file with 72 additions and 48 deletions.
120 changes: 72 additions & 48 deletions src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
* Created by huochen on 2017/09/23.
*/
public class Clickhouse extends BaseOutput{

private final static int BULKSIZE = 1000;

private String host;
private String database;
private String table;
private String jdbcLink;
private List<String> fields;
private List<String> replace_fields;
private List<String> replace_include_fields;
private List<String> replace_exclude_fields;
private int bulkSize;
private String preSql = initSql();
private List<Map> events;
Expand All @@ -34,38 +36,48 @@ public class Clickhouse extends BaseOutput{
protected void prepare() {
this.events = new ArrayList<Map>();

if(!this.config.containsKey("host")) {
if (!this.config.containsKey("host")) {
System.out.println("hostname must be included in config");
System.exit(1);
}
this.host = (String) this.config.get("host");

if(!this.config.containsKey("database")) {
if (!this.config.containsKey("database")) {
System.out.println("database must be included in config");
System.exit(1);
}
this.database = (String) this.config.get("database");

if(!this.config.containsKey("table")) {
if (!this.config.containsKey("table")) {
System.out.println("table must be included in config");
System.exit(1);
}
this.table = (String) this.config.get("table");

if(this.config.containsKey("bulk_size")) {
if (this.config.containsKey("bulk_size")) {
this.bulkSize = (Integer) this.config.get("bulk_size");
} else {
this.bulkSize = BULKSIZE;
}

if(this.config.containsKey("fields")) {
if (this.config.containsKey("fields")) {
this.fields = (List<String>) this.config.get("fields");
} else {
System.out.println("fields must be included in config");
}

if(this.config.containsKey("replace_fields")) {
this.replace_fields = (List<String>) this.config.get("replace_fields");
if (this.config.containsKey("replace_include_fields")) {
this.replace_include_fields = (List<String>) this.config.get("replace_include_fields");
}

if (this.config.containsKey("replace_exclude_fields")) {
this.replace_exclude_fields = (List<String>) this.config.get("replace_exclude_fields");
}

if (this.config.containsKey("replace_include_fields") && this.config.containsKey("replace_exclude_fields")) {
System.out.println("Replace_include_fields and replace_exclude_fields exist at the same time," +
" please use one of them.");
System.exit(1);
}

this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database);
Expand All @@ -91,69 +103,81 @@ protected void prepare() {
}

protected String initSql() {

String init = String.format("insert into %s (%s) values", this.table, String.join(" ,", this.fields));
return init;
}

protected void bulkInsert(Map event) throws Exception{
protected String dealWithQuote(String str) {
/*
* 因为Clickhouse SQL语句必须用单引号'
* insert into test.test (date, value) values ('2017-10-29', '23')
* SQL语句需要将数值中的单引号'转义
* */

if (str.indexOf("'") < 0) {
return str;
} else if (str.indexOf("\\'") > 0) {
// deal with "\'"
return str.replace("'", "\\'").replace("\\\\'", "\\\\\\'");
} else {
return str.replace("'", "\\'");
}
}

this.events.add(event);
if(this.events.size() == this.bulkSize) {
StringBuilder sqls = new StringBuilder(preSql);
for (int i = 0; i < this.events.size(); i++) {
Map e = events.get(i);
String value = "(";
for (int j =0; j < fields.size(); j++) {
String field = fields.get(j);
// 判断元数据中是否有对应的key

if (e.containsKey(field)) {
if (e.get(field) instanceof String) {
String fieldValue = e.get(field).toString();
if (this.replace_fields != null && this.replace_fields.contains(field)) {
if (!(fieldValue.indexOf("'") > 0)){
value += "'" + e.get(field) + "'";
} else {
if (fieldValue.indexOf("\\'") > 0) {
value += "'" + fieldValue.replace("'", "\\'").replace("\\\\'", "\\\\\\'") + "'";
}
else {
value += "'" + fieldValue.replace("'", "\\'") + "'";
}
}
} else {
value += "'" + fieldValue + "'";
}
protected StringBuilder makeUpSql(List<Map> events) {

StringBuilder sqls = new StringBuilder(preSql);
for(Map e: events) {
String value = "(";
for(String field: fields) {
if (e.containsKey(field)) {
if (e.get(field) instanceof String) {
String fieldValue = e.get(field).toString();
if ((this.replace_include_fields != null && this.replace_include_fields.contains(field)) ||
(this.replace_exclude_fields != null && !this.replace_exclude_fields.contains(field))) {
value += "'" + dealWithQuote(fieldValue) + "'";
} else {
if (e.get(field) == null){
value += "''";
} else {
value += e.get(field);
}
value += "'" + fieldValue + "'";
}
} else {
value += ClickhouseUtils.renderDefault(this.schema.get(field));
}
if(j != fields.size() -1 ) {
value += ",";
if (e.get(field) == null){
value += "''";
} else {
value += e.get(field);
}
}
} else {
value += ClickhouseUtils.renderDefault(this.schema.get(field));
}
if (fields.indexOf(field) != fields.size() -1) {
value += ",";
}
value += ")";
sqls.append(value);
}
value += ")";
sqls.append(value);
}
return sqls;
}

protected void bulkInsert(Map event) throws Exception{

this.events.add(event);
if(this.events.size() == this.bulkSize) {
StringBuilder sqls = makeUpSql(this.events);
ClickHouseProperties properties = new ClickHouseProperties();
BalancedClickhouseDataSource balanced = new BalancedClickhouseDataSource(this.jdbcLink, properties);

Connection conn = balanced.getConnection();
try {
conn.createStatement().execute(sqls.toString());
conn.close();
} catch (SQLException e){
System.out.println(sqls.toString());
System.out.println(e.toString());
for (int i = 0; i < this.events.size(); i++) {
System.out.println(events.get(i));
}
// System.out.println("insert error");
} catch (Exception e) {
System.out.println(e.toString());
System.out.println("error");
Expand Down

0 comments on commit f30ed03

Please sign in to comment.