diff --git a/integration_tests/cassandra-and-scylladb-sink/create_source.sql b/integration_tests/cassandra-and-scylladb-sink/create_source.sql index 460e616aed358..94b69b9d586c6 100644 --- a/integration_tests/cassandra-and-scylladb-sink/create_source.sql +++ b/integration_tests/cassandra-and-scylladb-sink/create_source.sql @@ -11,10 +11,10 @@ CREATE table user_behaviors ( connector = 'datagen', fields.user_id.kind = 'sequence', fields.user_id.start = '1', - fields.user_id.end = '1000', + fields.user_id.end = '10000000', fields.user_name.kind = 'random', fields.user_name.length = '10', - datagen.rows.per.second = '10' + datagen.rows.per.second = '1000000' ) FORMAT PLAIN ENCODE JSON; CREATE TABLE cassandra_types ( diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java index 7c883335cfc23..58568bc14df2b 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java @@ -42,6 +42,9 @@ public class CassandraConfig extends CommonSinkConfig { @JsonProperty(value = "cassandra.password") private String password; + @JsonProperty(value = "cassandra.max_batch_rows") + private Integer maxBatchRows = 512; + @JsonCreator public CassandraConfig( @JsonProperty(value = "cassandra.url") String url, @@ -93,4 +96,13 @@ public CassandraConfig withPassword(String password) { this.password = password; return this; } + + public Integer getMaxBatchRows() { + return maxBatchRows; + } + + public CassandraConfig withMaxBatchRows(Integer maxBatchRows) { + this.maxBatchRows = maxBatchRows; + return this; + } } diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java index b0b7fb93c7b51..5a007ea0d8f27 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -34,7 +34,6 @@ public class CassandraSink extends SinkWriterBase { private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class); - private static final Integer MAX_BATCH_SIZE = 1024 * 16; private final CqlSession session; private final List updateRowCache = new ArrayList<>(1); @@ -163,7 +162,7 @@ private void write_upsert(Iterator rows) { } private void tryCommit() { - if (batchBuilder.getStatementsCount() >= MAX_BATCH_SIZE) { + if (batchBuilder.getStatementsCount() >= config.getMaxBatchRows()) { sync(); } }