Skip to content
This repository has been archived by the owner on Jan 6, 2022. It is now read-only.

Commit

Permalink
Merge branch 'rickyhuo.fea'
Browse files Browse the repository at this point in the history
  • Loading branch information
RickyHuo committed Dec 6, 2017
2 parents 427c783 + a150a60 commit 34f69db
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 25 deletions.
29 changes: 20 additions & 9 deletions src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class Clickhouse extends BaseOutput{
private String table;
private String jdbcLink;
private List<String> fields;
private List<String> replace_fields = new ArrayList<String>();
private int bulkSize;
private String preSql = initSql();
private List<Map> events;
Expand Down Expand Up @@ -63,8 +64,15 @@ protected void prepare() {
System.out.println("fields must be included in config");
}

if(this.config.containsKey("replace_fields")) {
this.replace_fields = (List<String>) this.config.get("replace_fields");
}
this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database);
ClickHouseDataSource dataSource = new ClickHouseDataSource(this.jdbcLink);

// ClickHouseDataSource 不支持逗号","分割的多个节点

String databaseSource = String.format("jdbc:clickhouse://%s/%s", this.host.split(",")[0], this.database);
ClickHouseDataSource dataSource = new ClickHouseDataSource(databaseSource);
try {
this.schema = ClickhouseUtils.getSchema(dataSource, this.table);
} catch (SQLException e) {
Expand Down Expand Up @@ -101,13 +109,17 @@ protected void bulkInsert(Map event) throws Exception{
if (e.containsKey(field)) {
if (e.get(field) instanceof String) {
String fieldValue = e.get(field).toString();
if (!(fieldValue.indexOf("'") > 0)){
value += "'" + e.get(field) + "'";
if (replace_fields.contains(field)) {
if (!(fieldValue.indexOf("'") > 0)){
value += "'" + e.get(field) + "'";
} else {
if (fieldValue.indexOf("\\'") > 0)
value += "'" + fieldValue.replace("'", "\\'").replace("\\\\'", "\\\\\\'") + "'";
else
value += "'" + fieldValue.replace("'", "\\'") + "'";
}
} else {
if (fieldValue.indexOf("\\'") > 0)
value += "'" + fieldValue.replace("'", "\\'").replace("\\\\'", "\\\\\\'") + "'";
else
value += "'" + fieldValue.replace("'", "\\'") + "'";
value += "'" + fieldValue + "'";
}
} else {
if (e.get(field) == null){
Expand All @@ -127,8 +139,7 @@ protected void bulkInsert(Map event) throws Exception{
sqls.append(value);
}
ClickHouseProperties properties = new ClickHouseProperties();
String jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database);
BalancedClickhouseDataSource balanced = new BalancedClickhouseDataSource(jdbcLink, properties);
BalancedClickhouseDataSource balanced = new BalancedClickhouseDataSource(this.jdbcLink, properties);

Connection conn = balanced.getConnection();
try {
Expand Down
16 changes: 0 additions & 16 deletions src/test/java/TestMain.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.sina.bip.hangout.outputs;

import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static ru.yandex.clickhouse.ClickhouseJdbcUrlParser.JDBC_CLICKHOUSE_PREFIX;

public class BlancedClickhouseDataSourceTest {
public static void main(String args[]) throws Exception {

String h = "localhost:8123";
String db = "test";
ClickHouseProperties properties = new ClickHouseProperties();
//String jdbcLink = String.format("jdbc:clickhouse://%s/%s", h, db);
String jdbcLink = "jdbc:clickhouse://10.19.0.120:8123,10.13.56.170:8123,10.13.0.235:8123,10.13.0.232:8123/test";

Pattern URL_TEMPLATE = Pattern.compile(JDBC_CLICKHOUSE_PREFIX + "//([a-zA-Z0-9_:,.]+)(/[a-zA-Z0-9_]+)?");
Matcher m = URL_TEMPLATE.matcher(jdbcLink);
if (!m.matches()) {
throw new IllegalArgumentException("Incorrect url");
}
String database = m.group(2);
if (database == null) {
database = "";
}
String[] hosts = m.group(1).split(",");
final List<String> result = new ArrayList<String>(hosts.length);
for (final String host : hosts) {
result.add(JDBC_CLICKHOUSE_PREFIX + "//" + host + database);
}

String JDBC_PREFIX = "jdbc:";
String strUri = result.get(0).substring(JDBC_PREFIX.length());
URI uri = new URI(strUri);
System.out.println(uri.getHost());
}
}

0 comments on commit 34f69db

Please sign in to comment.