You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm working with version 2.0.4 of the opensearch connector, to sink to AWS Opensearch cluster version 2.3.
We've activated TRACE level for org.apache.kafka.connect.runtime.WorkerSinkTask, and we're seeing that the batch size is never above 500, even when the specific consumer for that partition is tens of thousands of messages behind.
This is my elasticsearch sink configuration:
{
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"type.name": "_doc",
"behavior.on.null.values": "delete",
"tasks.max": "24",
"connection.timeout.ms": "3000",
"max.retries": "10",
"key.ignore": "false",
"retry.backoff.ms": "1000",
"max.buffered.records": "100000",
"errors.deadletterqueue.topic.replication.factor": "1",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
"read.timeout.ms": "10000",
"behavior.on.version.conflict": "warn",
"topics": "SEARCH",
"batch.size": "10000",
"max.in.flight.requests": "25",
"schema.ignore": "false",
"behavior.on.malformed.documents": "fail",
"value.converter.schema.registry.url": "http://cp-schema-registry:8081",
"flush.timeout.ms": "20000",
"errors.deadletterqueue.topic.name": "dlq_search",
"name": "ELASTICSEARCH_SINK",
"errors.tolerance": "all",
"connection.url": "https://....",
"linger.ms": "1000"
}
Is there some hard limitation on the batch size of 500? Even when I change the config value of batch.size to 400, I see in the trace "batch.size = 400 ....Delivering batch of 500 messages to task". Can you help me understand how to control the batch size?
Thank you.
The text was updated successfully, but these errors were encountered:
In order to see larger batches, you will need both the consumer and the connector configured to use larger batches.
You can configure this worker-wide, or just for one connector via the consumer.override.max.poll.records client overrides feature: https://kafka.apache.org/documentation/#connect_running
Based on the batch implementation in the connector, I expect that it is actually accumulating multiple read poll()s into a single write to OpenSearch, but if the bottleneck is the consumer, that won't substantially improve the throughput.
I'm working with version 2.0.4 of the opensearch connector, to sink to AWS Opensearch cluster version 2.3.
We've activated TRACE level for org.apache.kafka.connect.runtime.WorkerSinkTask, and we're seeing that the batch size is never above 500, even when the specific consumer for that partition is tens of thousands of messages behind.
This is my elasticsearch sink configuration:
{
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"type.name": "_doc",
"behavior.on.null.values": "delete",
"tasks.max": "24",
"connection.timeout.ms": "3000",
"max.retries": "10",
"key.ignore": "false",
"retry.backoff.ms": "1000",
"max.buffered.records": "100000",
"errors.deadletterqueue.topic.replication.factor": "1",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
"read.timeout.ms": "10000",
"behavior.on.version.conflict": "warn",
"topics": "SEARCH",
"batch.size": "10000",
"max.in.flight.requests": "25",
"schema.ignore": "false",
"behavior.on.malformed.documents": "fail",
"value.converter.schema.registry.url": "http://cp-schema-registry:8081",
"flush.timeout.ms": "20000",
"errors.deadletterqueue.topic.name": "dlq_search",
"name": "ELASTICSEARCH_SINK",
"errors.tolerance": "all",
"connection.url": "https://....",
"linger.ms": "1000"
}
Is there some hard limitation on the batch size of 500? Even when I change the config value of batch.size to 400, I see in the trace "batch.size = 400 ....Delivering batch of 500 messages to task". Can you help me understand how to control the batch size?
Thank you.
The text was updated successfully, but these errors were encountered: