Skip to content
This repository has been archived by the owner on Jan 6, 2022. It is now read-only.

Commit

Permalink
Modify output shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
RickyHuo committed Jan 11, 2018
1 parent 58a5250 commit c7bdc85
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions src/main/java/com/sina/bip/hangout/outputs/Clickhouse.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class Clickhouse extends BaseOutput {
private static final Logger log = LogManager.getLogger(Clickhouse.class);
private final static int BULKSIZE = 1000;

private int bulkNum = 0;
private String host;
private String database;
private String table;
Expand All @@ -35,7 +36,6 @@ public class Clickhouse extends BaseOutput {
private int bulkSize;
private String preSql = initSql();
private List<Map> events;
private StringBuilder sql = new StringBuilder(preSql);
private Map<String, String> schema;
private Boolean withCredit;
private String user;
Expand Down Expand Up @@ -207,10 +207,11 @@ private StringBuilder makeUpSql(List<Map> events) {
return sqls;
}


private void bulkInsert(List<Map> events) throws Exception {

log.debug("make up SQL start, number: " + this.bulkNum);
StringBuilder sqls = makeUpSql(events);
log.debug("make up SQL end, number: " + this.bulkNum);
ClickHouseProperties properties = new ClickHouseProperties();

BalancedClickhouseDataSource balanced = new BalancedClickhouseDataSource(this.jdbcLink, properties);
Expand All @@ -224,15 +225,15 @@ private void bulkInsert(List<Map> events) throws Exception {
try {
conn.createStatement().execute(sqls.toString());
conn.close();
} catch (SQLException e){
log.error(e.toString());
} catch (SQLException e) {
log.error(e);
log.debug(sqls.toString());

for (int i = 0; i < this.events.size(); i++) {
log.debug(events.get(i));
}
} catch (Exception e) {
log.error("error");
log.error(e);
}
conn.close();
}
Expand All @@ -245,20 +246,29 @@ private void eventInsert(Map event, int eventSize) throws Exception {

this.events.add(event);
if(this.events.size() == eventSize) {
log.info("Insert bulk start, number: " + this.bulkNum);
bulkInsert(this.events);
log.info("Insert bulk end, number: " + this.bulkNum);
this.events.clear();
this.bulkNum += 1;
}
}

protected void emit(Map event) {
try {
eventInsert(event);
} catch (Exception e) {
log.error(e.toString());
log.error(e);
log.warn("insert error");
}
}

public void shutdown() {
try {
bulkInsert(this.events);
} catch (Exception e) {
log.info("failed to bulk events before shutdown");
log.debug(e);
}
}
}

0 comments on commit c7bdc85

Please sign in to comment.