Skip to content
This repository has been archived by the owner on Jan 6, 2022. It is now read-only.

Commit

Permalink
Merge pull request #3 from RickyHuo/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
RickyHuo authored Mar 7, 2018
2 parents 7ca933b + 6cf3d4f commit 1f316d2
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 76 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
language: java
jdk: oraclejdk8
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
| [bulk_size](#bulk_size-list) | int | no | 1000 |
| [database](#database-string) | string | yes | - |
| [fields](#fields-list) | list | yes | - |
| [format](#format-string) | string | no | TabSeparated |
| [host](#host-string) | string | yes | - |
| [replace_include_fields](#replace_include_fields-list) | list | no | - |
| [replace_exclude_fields](#replace_exclude_fields-list) | list | no | - |
Expand All @@ -34,6 +35,12 @@ database

table fields, 必须和Hangout清洗后的字段保持一致

##### format [string]

数据插入格式[Format Introduction](https://clickhouse.yandex/docs/en/formats/)

当前支持`TabSeparated`以及`JSONEachRow`

##### host [string]

ClickHouse cluster host
Expand Down Expand Up @@ -73,5 +80,17 @@ outputs:
bulk_size: 500
```

> 将fields中对应的字段写入ClickHouse,且对`_ping_big`字段中的单引号进行转义
> 使用默认的`TabSeparated`将fields中对应的字段写入ClickHouse,且对`_ping_big`字段中的单引号进行转义
```
outputs:
- com.sina.bip.hangout.outputs.Clickhouse:
host: clickhouse.bip.sina.com.cn:8123
username: user
password: passwd
database: apm
format: JSONEachRow
table: apm_netdiagno
bulk_size: 500
```
> 使用`JSONEachRow`将数据写入ClickHouse
177 changes: 102 additions & 75 deletions src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.sina.bip.hangout.outputs;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import com.ctrip.ops.sysdev.baseplugin.BaseOutput;
import com.ctrip.ops.sysdev.render.TemplateRender;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.simple.JSONObject;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnectionImpl;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
Expand All @@ -21,6 +19,7 @@

public class Clickhouse extends BaseOutput {

private static List<String> formats = Arrays.asList("TabSeparated", "JSONEachRow");
private static final Logger log = LogManager.getLogger(Clickhouse.class);
private final static int BULKSIZE = 1000;

Expand All @@ -29,6 +28,7 @@ public class Clickhouse extends BaseOutput {
private String database;
private String table;
private String jdbcLink;
private String format;
private List<String> fields;
private List<String> replace_include_fields;
private List<String> replace_exclude_fields;
Expand All @@ -46,7 +46,8 @@ public class Clickhouse extends BaseOutput {
public Clickhouse(Map config) { super (config); }

protected void prepare() {
this.events = new ArrayList<Map>();
this.events = new ArrayList<>();
this.templateRenderMap = new HashMap<>();

if (!this.config.containsKey("host")) {
log.error("hostname must be included in config");
Expand Down Expand Up @@ -83,30 +84,11 @@ protected void prepare() {
this.bulkSize = BULKSIZE;
}

if (this.config.containsKey("fields")) {
this.fields = (List<String>) this.config.get("fields");
} else {
log.error("fields must be included in config");
System.exit(1);
}

if (this.config.containsKey("replace_include_fields")) {
this.replace_include_fields = (List<String>) this.config.get("replace_include_fields");
}

if (this.config.containsKey("replace_exclude_fields")) {
this.replace_exclude_fields = (List<String>) this.config.get("replace_exclude_fields");
}

if (this.config.containsKey("replace_include_fields") && this.config.containsKey("replace_exclude_fields")) {
log.error("Replace_include_fields and replace_exclude_fields exist at the same time, " +
"please use one of them.");
System.exit(1);
}

// 连接验证
this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database);

ClickHouseProperties properties = new ClickHouseProperties();
// 避免每次INSERT操作取服务器时间
properties.setUseServerTimeZone(false);
this.dataSource = new BalancedClickhouseDataSource(this.jdbcLink, properties);
if (this.withCredit) {
Expand All @@ -117,7 +99,7 @@ protected void prepare() {
try {
this.conn = (ClickHouseConnectionImpl) dataSource.getConnection();
} catch (Exception e) {
log.error("cannot connection to datasource");
log.error("Cannot connection to datasource");
log.error(e);
System.exit(1);
}
Expand All @@ -129,35 +111,73 @@ protected void prepare() {
System.exit(1);
}

this.templateRenderMap = new HashMap<>();
for (String field: fields) {

// Check remote clickhouse tables whether contain all field or not
if (!this.schema.containsKey(ClickhouseUtils.realField(field))) {
String msg = String.format("table [%s] doesn't contain field '%s'", this.table, field);
log.error(msg);
if (this.config.containsKey("format")) {
this.format = (String) this.config.get("format");
if (!this.formats.contains(this.format)) {
log.error("Not support format: " + this.format);
System.exit(1);
}
try {
this.templateRenderMap.put(field, TemplateRender.getRender(field, false));
} catch (Exception e) {
String msg = String.format("cannot get templateRender of field [%s]", field);
log.warn(msg);
} else {
this.format = "TabSeparated";
}

if (this.format.equals("TabSeparated")) {

if (this.config.containsKey("fields")) {
this.fields = (List<String>) this.config.get("fields");
} else {
log.error("fields must be included in config");
System.exit(1);
}

if (this.config.containsKey("replace_include_fields")) {
this.replace_include_fields = (List<String>) this.config.get("replace_include_fields");
}

if (this.config.containsKey("replace_exclude_fields")) {
this.replace_exclude_fields = (List<String>) this.config.get("replace_exclude_fields");
}

if (this.config.containsKey("replace_include_fields") && this.config.containsKey("replace_exclude_fields")) {
log.error("Replace_include_fields and replace_exclude_fields exist at the same time, " +
"please use one of them.");
System.exit(1);
}

for (String field: fields) {
if (!this.schema.containsKey(ClickhouseUtils.realField(field))) {
String msg = String.format("table [%s] doesn't contain field '%s'", this.table, field);
log.error(msg);
System.exit(1);
}
try {
this.templateRenderMap.put(field, TemplateRender.getRender(field, false));
} catch (Exception e) {
String msg = String.format("cannot get templateRender of field [%s]", field);
log.warn(msg);
}
}
}

}

private String initSql() {

List<String> realFields = new ArrayList<>();
for(String field: fields) {
realFields.add(ClickhouseUtils.realField(field));
}
if (this.format.equals("TabSeparated")) {
List<String> realFields = new ArrayList<>();
for(String field: fields) {
realFields.add(ClickhouseUtils.realField(field));
}

String init = String.format("insert into %s (%s) values", this.table, String.join(" ,", realFields));
log.debug("init sql: "+ init);
return init;
String init = String.format("insert into %s (%s) values", this.table, String.join(" ,", realFields));
log.debug("init sql: "+ init);
return init;
} else {
String init = String.format("insert into %s format JSONEachRow ", this.table);
log.debug("init sql: "+ init);
return init;
}
}

private String dealWithQuote(String str) {
Expand All @@ -180,38 +200,45 @@ private String dealWithQuote(String str) {
private StringBuilder makeUpSql(List<Map> events) {

StringBuilder sqls = new StringBuilder(preSql);
for(Map e: events) {
StringBuilder value = new StringBuilder("(");
for(String field: fields) {
Object fieldValue = this.templateRenderMap.get(field).render(e);
if (fieldValue != null) {
if (fieldValue instanceof String) {
if ((this.replace_include_fields != null && this.replace_include_fields.contains(field)) ||
(this.replace_exclude_fields != null && !this.replace_exclude_fields.contains(field))) {
value.append("'");
value.append(dealWithQuote(fieldValue.toString()));
value.append("'");

if ((this.format.equals("TabSeparated"))) {
for(Map e: events) {
StringBuilder value = new StringBuilder("(");
for(String field: fields) {
Object fieldValue = this.templateRenderMap.get(field).render(e);
if (fieldValue != null) {
if (fieldValue instanceof String) {
if ((this.replace_include_fields != null && this.replace_include_fields.contains(field)) ||
(this.replace_exclude_fields != null && !this.replace_exclude_fields.contains(field))) {
value.append("'");
value.append(dealWithQuote(fieldValue.toString()));
value.append("'");
} else {
value.append("'");
value.append(fieldValue.toString());
value.append("'");
}
} else {
value.append("'");
value.append(fieldValue.toString());
value.append("'");
if (e.get(field) == null){
value.append("''");
} else {
value.append(e.get(field));
}
}
} else {
if (e.get(field) == null){
value.append("''");
} else {
value.append(e.get(field));
}
value.append(ClickhouseUtils.renderDefault(this.schema.get(ClickhouseUtils.realField(field))));
}
if (fields.indexOf(field) != fields.size() -1) {
value.append(",");
}
} else {
value.append(ClickhouseUtils.renderDefault(this.schema.get(ClickhouseUtils.realField(field))));
}
if (fields.indexOf(field) != fields.size() -1) {
value.append(",");
}
value.append(")");
sqls.append(value);
}
} else {
for(Map e: events) {
sqls.append(JSONObject.toJSONString(e));
}
value.append(")");
sqls.append(value);
}
return sqls;
}
Expand All @@ -227,11 +254,11 @@ private void bulkInsert(List<Map> events) throws Exception {
this.conn.createStatement().execute(sqls.toString());
} catch (SQLException e) {
log.error(e);
log.debug(sqls.toString());
log.error(sqls.toString());

for (int i = 0; i < this.events.size(); i++) {
log.debug(events.get(i));
}
// for (int i = 0; i < this.events.size(); i++) {
// log.debug(events.get(i));
// }
} catch (Exception e) {
log.error(e);
}
Expand Down

0 comments on commit 1f316d2

Please sign in to comment.