Skip to content
This repository has been archived by the owner on Jan 6, 2022. It is now read-only.

Commit

Permalink
Support ClickHouse-Native-JDBC
Browse files Browse the repository at this point in the history
  • Loading branch information
RickyHuo committed Jun 21, 2018
1 parent d6f280f commit 541334e
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 60 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

* Author: rickyHuo
* Homepage: https://github.com/RickyHuo/hangout-output-clickhouse
* Version: 0.0.6
* Version: 0.0.7

### Description

Expand Down
14 changes: 7 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>sina</groupId>
<artifactId>hangout-output-plugins-clickhouse</artifactId>
<version>0.0.6</version>
<version>0.0.7</version>
<packaging>jar</packaging>

<name>hangout-output-plugins-clickhouse</name>
Expand All @@ -22,10 +22,15 @@
<scope>system</scope>
<systemPath>C:\Users\huochen\.m2\repository\ctrip\hangout-baseplugin\0.3.0\hangout-baseplugin-0.3.0.jar</systemPath>
</dependency>
<dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-native-jdbc</artifactId>
<version>1.1-testing</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.36</version>
<version>0.1.40</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand All @@ -39,11 +44,6 @@
<version>2.7</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-native-jdbc</artifactId>
<version>1.0-testing</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.sina.bip.hangout.outputs;

import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import com.github.housepower.jdbc.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseConnectionImpl;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
Expand All @@ -29,8 +28,8 @@ public static Map<String, String> getSchema(ClickHouseConnectionImpl connection,
return schema;
}

public static Map<String, String> getSchema(Connection connection, String table) throws SQLException {
String sql = String.format("desc %s", table);
public static Map<String, String> getSchema(ClickHouseConnection connection, String db, String table) throws SQLException {
String sql = String.format("desc %s.%s", db, table);
ResultSet resultSet = connection.createStatement().executeQuery(sql);
Map schema = new HashMap<String, String>();
while(resultSet.next()) {
Expand Down
65 changes: 38 additions & 27 deletions src/main/java/com/sina/bip/hangout/outputs/Native.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package com.sina.bip.hangout.outputs;

import com.ctrip.ops.sysdev.render.TemplateRender;
import com.github.housepower.jdbc.ClickHouseConnection;
import com.github.housepower.jdbc.settings.ClickHouseConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.text.SimpleDateFormat;
import java.util.*;

public class Native implements FormatParse {

Expand All @@ -26,8 +24,10 @@ public class Native implements FormatParse {
private Boolean withCredit;
private String user;
private String password;
private Connection conn;
private ClickHouseConnection conn;
private Map<String, TemplateRender> templateRenderMap;
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
private SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public Native(Map config) {
this.config = config;
Expand Down Expand Up @@ -65,32 +65,28 @@ public void prepare() {
this.withCredit = false;
}


// 连接验证
this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database);
this.jdbcLink = String.format("jdbc:clickhouse://%s", this.host);
log.info(this.jdbcLink);

try {
Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
} catch (ClassNotFoundException e) {
log.error(e);
System.exit(1);
Properties p = new Properties();
if (this.withCredit) {
p.put("user", this.user);
p.put("password", this.password);
}

try {
this.conn = DriverManager.getConnection(this.jdbcLink);

if (this.withCredit) {
this.conn = DriverManager.getConnection(this.jdbcLink, this.user, this.password);
}
} catch (Exception e) {
ClickHouseConfig cf = new ClickHouseConfig(this.jdbcLink, p);
this.conn = ClickHouseConnection.createClickHouseConnection(cf);
} catch (SQLException e) {
log.error(e);
log.error("Cannot connect to data source");
System.exit(1);
}

try {
this.schema = ClickhouseUtils.getSchema(this.conn, this.table);
this.schema = ClickhouseUtils.getSchema(this.conn, this.database, this.table);
} catch (SQLException e) {
log.error(e);
log.error("input table is not valid");
System.exit(1);
}
Expand Down Expand Up @@ -125,7 +121,8 @@ public String initSql() {
realFields.add(ClickhouseUtils.realField(field));
}

String init = String.format("insert into %s (%s) values (%s)", this.table,
String init = String.format("insert into %s.%s (%s) values (%s)", this.database,
this.table,
String.join(", ", realFields),
ClickhouseUtils.tabSeparatedPreSql(this.fields.size()));

Expand Down Expand Up @@ -173,8 +170,8 @@ public void bulkInsert(List<Map> events) throws Exception {
case "UInt32":
if (fieldValue != null) {
try {
long v = ((Number) fieldValue).longValue();
statement.setLong(i + 1, v);
int v = ((Number) fieldValue).intValue();
statement.setInt(i + 1, v);

} catch (Exception exp) {
String msg = String.format("Cannot Convert %s %s to long, render default", fieldValue.getClass(), fieldValue);
Expand All @@ -188,14 +185,28 @@ public void bulkInsert(List<Map> events) throws Exception {

break;
case "String":
case "DateTime":
case "Date":
if (fieldValue != null) {
statement.setString(i + 1, fieldValue.toString());
} else {
statement.setString(i + 1, "");
}
break;
case "DateTime":
if (fieldValue != null) {
Date date = this.datetimeFormat.parse(fieldValue.toString());
statement.setTimestamp(i + 1, new java.sql.Timestamp(date.getTime()));
} else {
statement.setTimestamp(i + 1, new java.sql.Timestamp(System.currentTimeMillis()));
}
break;
case "Date":
if (fieldValue != null) {
Date date = this.dateFormat.parse(fieldValue.toString());
statement.setDate(i + 1, new java.sql.Date(date.getTime()));
} else {
statement.setDate(i + 1, new java.sql.Date(System.currentTimeMillis()));
}
break;
case "Float32":
case "Float64":
if (fieldValue != null) {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/sina/bip/hangout/outputs/TabSeparated.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public String initSql() {
realFields.add(ClickhouseUtils.realField(field));
}

String init = String.format("insert into %s (%s) values (%s)", this.table,
String init = String.format("insert into %s.%s (%s) values (%s)", this.database,
this.table,
String.join(", ", realFields),
ClickhouseUtils.tabSeparatedPreSql(this.fields.size()));

Expand Down
20 changes: 0 additions & 20 deletions src/test/java/com/sina/bip/hangout/outputs/NativeTest.java

This file was deleted.

0 comments on commit 541334e

Please sign in to comment.