Skip to content
This repository has been archived by the owner on Jan 6, 2022. It is now read-only.

Commit

Permalink
Merge branch 'rickyhuo.fea.native'
Browse files Browse the repository at this point in the history
  • Loading branch information
RickyHuo committed Jun 21, 2018
2 parents e8e7fce + 541334e commit 56016f5
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

* Author: rickyHuo
* Homepage: https://github.com/RickyHuo/hangout-output-clickhouse
* Version: 0.0.6
* Version: 0.0.7

### Description

Expand Down
9 changes: 7 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>sina</groupId>
<artifactId>hangout-output-plugins-clickhouse</artifactId>
<version>0.0.6</version>
<version>0.0.7</version>
<packaging>jar</packaging>

<name>hangout-output-plugins-clickhouse</name>
Expand All @@ -22,10 +22,15 @@
<scope>system</scope>
<systemPath>C:\Users\huochen\.m2\repository\ctrip\hangout-baseplugin\0.3.0\hangout-baseplugin-0.3.0.jar</systemPath>
</dependency>
<dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-native-jdbc</artifactId>
<version>1.1-testing</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.36</version>
<version>0.1.40</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
21 changes: 10 additions & 11 deletions src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

public class Clickhouse extends BaseOutput {

private static List<String> formats = Arrays.asList("Values", "JSONEachRow", "TabSeparated");
private static final Logger log = LogManager.getLogger(Clickhouse.class);
private final static int BULKSIZE = 1000;

Expand All @@ -29,15 +28,8 @@ public Clickhouse(Map config) {

protected void prepare() {

String format = "Values";

if (this.config.containsKey("format")) {
format = (String) this.config.get("format");
if (!this.formats.contains(format)) {
log.error("Not support format: " + format);
System.exit(1);
}
}
String format = "TabSeparated";
format = (String) this.config.get("format");

switch (format) {
case "JSONEachRow":
Expand All @@ -49,6 +41,13 @@ protected void prepare() {
case "TabSeparated":
this.formatParse = new TabSeparated(config);
break;
case "Native":
this.formatParse = new Native(config);
break;
default:
log.error("Unknown format <%s>", format);
System.exit(1);
break;
}

this.formatParse.prepare();
Expand Down Expand Up @@ -98,7 +97,7 @@ public void shutdown() {
this.formatParse.bulkInsert(this.events);
} catch (Exception e) {
log.info("failed to bulk events before shutdown");
log.debug(e);
log.error(e);
}
}
}
13 changes: 12 additions & 1 deletion src/main/java/com/sina/bip/hangout/outputs/ClickhouseUtils.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.sina.bip.hangout.outputs;

import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import com.github.housepower.jdbc.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseConnectionImpl;

import java.sql.ResultSet;
Expand Down Expand Up @@ -28,6 +28,17 @@ public static Map<String, String> getSchema(ClickHouseConnectionImpl connection,
return schema;
}

public static Map<String, String> getSchema(ClickHouseConnection connection, String db, String table) throws SQLException {
String sql = String.format("desc %s.%s", db, table);
ResultSet resultSet = connection.createStatement().executeQuery(sql);
Map schema = new HashMap<String, String>();
while(resultSet.next()) {

schema.put(resultSet.getString(1), resultSet.getString(2));
}
return schema;
}

public static String renderDefault(String dataType) {

if (dataType.equals("String"))
Expand Down
243 changes: 243 additions & 0 deletions src/main/java/com/sina/bip/hangout/outputs/Native.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package com.sina.bip.hangout.outputs;

import com.ctrip.ops.sysdev.render.TemplateRender;
import com.github.housepower.jdbc.ClickHouseConnection;
import com.github.housepower.jdbc.settings.ClickHouseConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.*;

public class Native implements FormatParse {

private static final Logger log = LogManager.getLogger(Native.class);
private Map config;
private String host;
private String database;
private String table;
private String jdbcLink;
private List<String> fields;
private Map<String, String> schema;
private Boolean withCredit;
private String user;
private String password;
private ClickHouseConnection conn;
private Map<String, TemplateRender> templateRenderMap;
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
private SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public Native(Map config) {
this.config = config;
}

public void prepare() {
this.templateRenderMap = new HashMap<>();

if (!this.config.containsKey("host")) {
log.error("hostname must be included in config");
System.exit(1);
}
this.host = (String) this.config.get("host");

if (!this.config.containsKey("database")) {
log.error("database must be included in config");
System.exit(1);
}
this.database = (String) this.config.get("database");

if (!this.config.containsKey("table")) {
log.error("table must be included in config");
System.exit(1);
}
this.table = (String) this.config.get("table");

if(this.config.containsKey("username") && this.config.containsKey("password")) {
this.user = (String) this.config.get("username");
this.password = (String) this.config.get("password");
this.withCredit = true;

} else if (this.config.containsKey("username") || this.config.containsKey("password")) {
log.warn("username and password must be included in config at same time");
} else {
this.withCredit = false;
}

// 连接验证
this.jdbcLink = String.format("jdbc:clickhouse://%s", this.host);
log.info(this.jdbcLink);

Properties p = new Properties();
if (this.withCredit) {
p.put("user", this.user);
p.put("password", this.password);
}

try {
ClickHouseConfig cf = new ClickHouseConfig(this.jdbcLink, p);
this.conn = ClickHouseConnection.createClickHouseConnection(cf);
} catch (SQLException e) {
log.error(e);
System.exit(1);
}

try {
this.schema = ClickhouseUtils.getSchema(this.conn, this.database, this.table);
} catch (SQLException e) {
log.error(e);
log.error("input table is not valid");
System.exit(1);
}


if (this.config.containsKey("fields")) {
this.fields = (List<String>) this.config.get("fields");
} else {
log.error("fields must be included in config");
System.exit(1);
}

for (String field: fields) {
if (!this.schema.containsKey(ClickhouseUtils.realField(field))) {
String msg = String.format("table [%s] doesn't contain field '%s'", this.table, field);
log.error(msg);
System.exit(1);
}
try {
this.templateRenderMap.put(field, TemplateRender.getRender(field, false));
} catch (Exception e) {
String msg = String.format("cannot get templateRender of field [%s]", field);
log.warn(msg);
}
}
}

public String initSql() {

List<String> realFields = new ArrayList<>();
for(String field: this.fields) {
realFields.add(ClickhouseUtils.realField(field));
}

String init = String.format("insert into %s.%s (%s) values (%s)", this.database,
this.table,
String.join(", ", realFields),
ClickhouseUtils.tabSeparatedPreSql(this.fields.size()));

log.debug("init sql: " + init);
return init;
}

public void bulkInsert(List<Map> events) throws Exception {

PreparedStatement statement = this.conn.prepareStatement(this.initSql());

int size = fields.size();
for (Map e: events) {
//statement.setString(1, "sd");

for (int i=0; i<size; i++) {
String field = fields.get(i);
String fieldType = this.schema.get(ClickhouseUtils.realField(field));
Object fieldValue = this.templateRenderMap.get(field).render(e);
switch (fieldType) {
case "Int8":
case "Int16":
case "Int32":
case "UInt8":
case "UInt16":
if (fieldValue != null) {
try {
int v = ((Number) fieldValue).intValue();
statement.setInt(i + 1, v);

} catch (Exception exp) {
String msg = String.format("Cannot Convert %s %s to integer, render default. Field is %s", fieldValue.getClass(), fieldValue, field);
log.warn(msg);
log.error(exp);
statement.setInt(i + 1, 0);
}
} else {
statement.setInt(i + 1, 0);
}

break;

case "UInt64":
case "Int64":
case "UInt32":
if (fieldValue != null) {
try {
int v = ((Number) fieldValue).intValue();
statement.setInt(i + 1, v);

} catch (Exception exp) {
String msg = String.format("Cannot Convert %s %s to long, render default", fieldValue.getClass(), fieldValue);
log.warn(msg);
log.error(exp);
statement.setInt(i + 1, 0);
}
} else {
statement.setInt(i + 1, 0);
}

break;
case "String":
if (fieldValue != null) {
statement.setString(i + 1, fieldValue.toString());
} else {
statement.setString(i + 1, "");
}
break;
case "DateTime":
if (fieldValue != null) {
Date date = this.datetimeFormat.parse(fieldValue.toString());
statement.setTimestamp(i + 1, new java.sql.Timestamp(date.getTime()));
} else {
statement.setTimestamp(i + 1, new java.sql.Timestamp(System.currentTimeMillis()));
}
break;
case "Date":
if (fieldValue != null) {
Date date = this.dateFormat.parse(fieldValue.toString());
statement.setDate(i + 1, new java.sql.Date(date.getTime()));
} else {
statement.setDate(i + 1, new java.sql.Date(System.currentTimeMillis()));
}
break;
case "Float32":
case "Float64":
if (fieldValue != null) {
try {
float v = ((Number) fieldValue).floatValue();
statement.setFloat(i + 1, v);
} catch (Exception exp) {
String msg = String.format("Cannot Convert %s %s to float, render default", fieldValue.getClass(), fieldValue);
log.warn(msg);
log.error(exp);
statement.setFloat(i + 1, 0f);
}
} else {
statement.setFloat(i + 1, 0f);
}

break;
case "Array(String)":
if (fieldValue != null) {
List<String> v = (List) fieldValue;
String [] array = v.toArray(new String[v.size()]);
statement.setArray(i + 1, this.conn.createArrayOf("string", array));
} else {
statement.setArray(i + 1, this.conn.createArrayOf("string", new String[1]));
}

}
}
statement.addBatch();
}
statement.executeBatch();
}

}
3 changes: 2 additions & 1 deletion src/main/java/com/sina/bip/hangout/outputs/TabSeparated.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public String initSql() {
realFields.add(ClickhouseUtils.realField(field));
}

String init = String.format("insert into %s (%s) values (%s)", this.table,
String init = String.format("insert into %s.%s (%s) values (%s)", this.database,
this.table,
String.join(", ", realFields),
ClickhouseUtils.tabSeparatedPreSql(this.fields.size()));

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/sina/bip/hangout/outputs/Values.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
import java.util.List;
import java.util.Map;

@Deprecated
/*
* It can be replaced by TabSeparated with better performance
*
* */
public class Values implements FormatParse {

private static final Logger log = LogManager.getLogger(Values.class);
Expand Down

0 comments on commit 56016f5

Please sign in to comment.