diff --git a/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java b/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java index 09c8477..6d58ae1 100644 --- a/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java +++ b/src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java @@ -22,6 +22,7 @@ public class Clickhouse extends BaseOutput{ private String table; private String jdbcLink; private List fields; + private List replace_fields = new ArrayList(); private int bulkSize; private String preSql = initSql(); private List events; @@ -63,8 +64,15 @@ protected void prepare() { System.out.println("fields must be included in config"); } + if(this.config.containsKey("replace_fields")) { + this.replace_fields = (List) this.config.get("replace_fields"); + } this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database); - ClickHouseDataSource dataSource = new ClickHouseDataSource(this.jdbcLink); + + // ClickHouseDataSource 不支持逗号","分割的多个节点 + + String databaseSource = String.format("jdbc:clickhouse://%s/%s", this.host.split(",")[0], this.database); + ClickHouseDataSource dataSource = new ClickHouseDataSource(databaseSource); try { this.schema = ClickhouseUtils.getSchema(dataSource, this.table); } catch (SQLException e) { @@ -101,13 +109,17 @@ protected void bulkInsert(Map event) throws Exception{ if (e.containsKey(field)) { if (e.get(field) instanceof String) { String fieldValue = e.get(field).toString(); - if (!(fieldValue.indexOf("'") > 0)){ - value += "'" + e.get(field) + "'"; + if (replace_fields.contains(field)) { + if (!(fieldValue.indexOf("'") > 0)){ + value += "'" + e.get(field) + "'"; + } else { + if (fieldValue.indexOf("\\'") > 0) + value += "'" + fieldValue.replace("'", "\\'").replace("\\\\'", "\\\\\\'") + "'"; + else + value += "'" + fieldValue.replace("'", "\\'") + "'"; + } } else { - if (fieldValue.indexOf("\\'") > 0) - value += "'" + fieldValue.replace("'", "\\'").replace("\\\\'", "\\\\\\'") + "'"; - else - value += "'" + fieldValue.replace("'", "\\'") + "'"; + value += "'" + fieldValue + "'"; } } else { if (e.get(field) == null){ @@ -127,8 +139,7 @@ protected void bulkInsert(Map event) throws Exception{ sqls.append(value); } ClickHouseProperties properties = new ClickHouseProperties(); - String jdbcLink = String.format("jdbc:clickhouse://%s/%s", this.host, this.database); - BalancedClickhouseDataSource balanced = new BalancedClickhouseDataSource(jdbcLink, properties); + BalancedClickhouseDataSource balanced = new BalancedClickhouseDataSource(this.jdbcLink, properties); Connection conn = balanced.getConnection(); try { diff --git a/src/test/java/TestMain.java b/src/test/java/TestMain.java deleted file mode 100644 index 40c4e46..0000000 --- a/src/test/java/TestMain.java +++ /dev/null @@ -1,16 +0,0 @@ -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -/** - * Created by huochen on 2017/9/26. - */ -public class TestMain { - public static void main(String args[]) { - - String src = "a='b+\\'+c'"; - System.out.println(src); - System.out.println(src.replace("'", "\\'")); - System.out.println(src.replace("'", "\\'").replace("\\\\'", "\\\\\\'")); - } -} diff --git a/src/test/java/com/sina/bip/hangout/outputs/BlancedClickhouseDataSourceTest.java b/src/test/java/com/sina/bip/hangout/outputs/BlancedClickhouseDataSourceTest.java new file mode 100644 index 0000000..428cab8 --- /dev/null +++ b/src/test/java/com/sina/bip/hangout/outputs/BlancedClickhouseDataSourceTest.java @@ -0,0 +1,43 @@ +package com.sina.bip.hangout.outputs; + +import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import ru.yandex.clickhouse.settings.ClickHouseProperties; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static ru.yandex.clickhouse.ClickhouseJdbcUrlParser.JDBC_CLICKHOUSE_PREFIX; + +public class BlancedClickhouseDataSourceTest { + public static void main(String args[]) throws Exception { + + String h = "localhost:8123"; + String db = "test"; + ClickHouseProperties properties = new ClickHouseProperties(); + //String jdbcLink = String.format("jdbc:clickhouse://%s/%s", h, db); + String jdbcLink = "jdbc:clickhouse://10.19.0.120:8123,10.13.56.170:8123,10.13.0.235:8123,10.13.0.232:8123/test"; + + Pattern URL_TEMPLATE = Pattern.compile(JDBC_CLICKHOUSE_PREFIX + "//([a-zA-Z0-9_:,.]+)(/[a-zA-Z0-9_]+)?"); + Matcher m = URL_TEMPLATE.matcher(jdbcLink); + if (!m.matches()) { + throw new IllegalArgumentException("Incorrect url"); + } + String database = m.group(2); + if (database == null) { + database = ""; + } + String[] hosts = m.group(1).split(","); + final List result = new ArrayList(hosts.length); + for (final String host : hosts) { + result.add(JDBC_CLICKHOUSE_PREFIX + "//" + host + database); + } + + String JDBC_PREFIX = "jdbc:"; + String strUri = result.get(0).substring(JDBC_PREFIX.length()); + URI uri = new URI(strUri); + System.out.println(uri.getHost()); + } +}