Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batches limited to 500 messages #168

Open
sheryl-susman opened this issue Jan 17, 2023 · 2 comments
Open

batches limited to 500 messages #168

sheryl-susman opened this issue Jan 17, 2023 · 2 comments

Comments

@sheryl-susman
Copy link

sheryl-susman commented Jan 17, 2023

I'm working with version 2.0.4 of the opensearch connector, to sink to AWS Opensearch cluster version 2.3.
We've activated TRACE level for org.apache.kafka.connect.runtime.WorkerSinkTask, and we're seeing that the batch size is never above 500, even when the specific consumer for that partition is tens of thousands of messages behind.
This is my elasticsearch sink configuration:
{
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"type.name": "_doc",
"behavior.on.null.values": "delete",
"tasks.max": "24",
"connection.timeout.ms": "3000",
"max.retries": "10",
"key.ignore": "false",
"retry.backoff.ms": "1000",
"max.buffered.records": "100000",
"errors.deadletterqueue.topic.replication.factor": "1",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
"read.timeout.ms": "10000",
"behavior.on.version.conflict": "warn",
"topics": "SEARCH",
"batch.size": "10000",
"max.in.flight.requests": "25",
"schema.ignore": "false",
"behavior.on.malformed.documents": "fail",
"value.converter.schema.registry.url": "http://cp-schema-registry:8081",
"flush.timeout.ms": "20000",
"errors.deadletterqueue.topic.name": "dlq_search",
"name": "ELASTICSEARCH_SINK",
"errors.tolerance": "all",
"connection.url": "https://....",
"linger.ms": "1000"
}

Is there some hard limitation on the batch size of 500? Even when I change the config value of batch.size to 400, I see in the trace "batch.size = 400 ....Delivering batch of 500 messages to task". Can you help me understand how to control the batch size?

Thank you.

@willyborankin
Copy link
Collaborator

Looks like a bug. I will create an internal issue for our team.

@gharris1727
Copy link

@sheryl-susman You may be encountering the default limit of 500 records defined by the max.poll.records configuration: https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records .

In order to see larger batches, you will need both the consumer and the connector configured to use larger batches.
You can configure this worker-wide, or just for one connector via the consumer.override.max.poll.records client overrides feature: https://kafka.apache.org/documentation/#connect_running

Based on the batch implementation in the connector, I expect that it is actually accumulating multiple read poll()s into a single write to OpenSearch, but if the bottleneck is the consumer, that won't substantially improve the throughput.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants