Skip to content

Commit

Permalink
add cassandra batch rows
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Mar 6, 2024
1 parent 23e0388 commit 93fb9d4
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ CREATE table user_behaviors (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_id.end = '10000000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
datagen.rows.per.second = '1000000'
) FORMAT PLAIN ENCODE JSON;

CREATE TABLE cassandra_types (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class CassandraConfig extends CommonSinkConfig {
@JsonProperty(value = "cassandra.password")
private String password;

@JsonProperty(value = "cassandra.max_batch_rows")
private Integer maxBatchRows = 512;

@JsonCreator
public CassandraConfig(
@JsonProperty(value = "cassandra.url") String url,
Expand Down Expand Up @@ -93,4 +96,13 @@ public CassandraConfig withPassword(String password) {
this.password = password;
return this;
}

public Integer getMaxBatchRows() {
return maxBatchRows;
}

public CassandraConfig withMaxBatchRows(Integer maxBatchRows) {
this.maxBatchRows = maxBatchRows;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

public class CassandraSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
private static final Integer MAX_BATCH_SIZE = 1024 * 16;

private final CqlSession session;
private final List<SinkRow> updateRowCache = new ArrayList<>(1);
Expand Down Expand Up @@ -163,7 +162,7 @@ private void write_upsert(Iterator<SinkRow> rows) {
}

private void tryCommit() {
if (batchBuilder.getStatementsCount() >= MAX_BATCH_SIZE) {
if (batchBuilder.getStatementsCount() >= config.getMaxBatchRows()) {
sync();
}
}
Expand Down

0 comments on commit 93fb9d4

Please sign in to comment.