Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

org.apache.kafka.common.config.ConfigException: Incrementing column: ID does not exist. #1446

Open
c0desurfer opened this issue Nov 21, 2024 · 1 comment

Comments

@c0desurfer
Copy link

This introduced check seems to work incorrectly. We have the following JDBC connector configuration.

{
	"numeric.mapping": "best_fit",
	"table.whitelist": <SNIP>",
	"auto.register.schemas": "false",
	"auth.type": "<SNIP>",
	"tasks.max": "1",
	"transforms.createKey.fields": "<SNIP>",
	"connector.class": "JdbcSourceConnector",
	"transforms.RenameField.renames": "<SNIP>",
	"consumer.override.security.protocol": "<SNIP>",
	"transforms.reroute.type": "org.apache.kafka.connect.transforms.RegexRouter",
	"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
	"producer.override.sasl.mechanism": "<SNIP>",
	"producer.override.bootstrap.servers": "<SNIP>",
	"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
	"topic.prefix": "<SNIP>",
	"value.converter.schema.registry.url": "<SNIP>",
	"incrementing.column.name": "ID",
	"value.converter.auto.register.schemas": "false",
	"timestamp.column.name": "<SNIP>",
	"validate.non.null": "false",
	"key.converter": "io.confluent.connect.json.JsonSchemaConverter",
	"consumer.override.sasl.jaas.config": "<SNIP>",
	"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
	"connection.attempts": "480",
	"connection.backoff.ms": "30000",
	"key.converter.schema.registry.url": "<SNIP>",
	"connection.url": "jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS_LIST = (FAILOVER = ON)(LOAD_BALANCE = OFF)(ADDRESS = (PROTOCOL = TCP)(HOST = <SNIP>)(PORT = <SNIP>))(ADDRESS = (PROTOCOL = TCP)(HOST = <SNIP>)(PORT = <SNIP>))) (CONNECT_DATA = (SERVICE_NAME = <SNIP>)))",
	"connection.user": "<SNIP>",
	"key.converter.schemas.enable": "true",
	"key.converter.auto.register.schemas": "false",
	"consumer.override.sasl.mechanism": "<SNIP>",
	"connection.password": "<SNIP>",
	"consumer.override.group.id": "<SNIP>",
	"producer.override.security.protocol": "SASL_SSL",
	"transforms.ReplaceField.blacklist": "ID,<SNIP>",
	"use.latest.version": "true",
	"producer.override.sasl.jaas.config": "<SNIP>",
	"value.converter.schemas.enable": "true",
	"schema.pattern": "<SNIP>",
	"mode": "timestamp+incrementing",
	"transforms.reroute.replacement": "<SNIP>",
	"transforms.reroute.regex": "<SNIP>",
	"transforms": "reroute,ReplaceField,RenameField,createKey",
	"consumer.override.bootstrap.servers": "<SNIP>",
	"db.timezone": "Europe/<SNIP>",
	"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
	"table.types": "SYNONYM"
}

Please note that the following properties are set.

"mode": "timestamp+incrementing"
"incrementing.column.name": "ID"
"timestamp.column.name": "<SNIP>"

The columns exist inside the DB.

image

However since update to 10.8.0 we are getting the following error and had to rollback to 10.7.4.

[<SNIP>|task-0] WorkerSourceTask{id=<SNIP>} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-<SNIP>-0]
org.apache.kafka.common.config.ConfigException: Incrementing column: ID does not exist.
	at io.confluent.connect.jdbc.source.JdbcSourceTask.validateColumnsExist(JdbcSourceTask.java:333)
	at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:201)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:280)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Not sure why the check fails, the code looks good at first sight. Any ideas?

@vdesabou
Copy link
Member

I can reproduce the issue when using SYNONYM like you're doing:

CREATE SYNONYM CUSTOMERS2 FOR CUSTOMERS;
{
     "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
     "tasks.max":"1",
     "connection.user": "C##MYUSER",
     "connection.password": "mypassword",
     "connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLCDB",
     "numeric.mapping":"best_fit",
     "mode":"timestamp",
     "poll.interval.ms":"1000",
     "validate.non.null":"false",
     "schema.pattern": "C##MYUSER",
     "table.whitelist":"CUSTOMERS2",
     "timestamp.column.name":"UPDATE_TS",
     "mode": "timestamp+incrementing",
     "table.types": "SYNONYM",
     "incrementing.column.name": "ID",
     "topic.prefix":"oracle-",
     "errors.log.enable": "true",
     "errors.log.include.messages": "true"
}
[2024-11-22 10:37:32,586] ERROR [oracle-source|task-0] WorkerSourceTask{id=oracle-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:237)
org.apache.kafka.common.config.ConfigException: Incrementing column: ID does not exist.
       at io.confluent.connect.jdbc.source.JdbcSourceTask.validateColumnsExist(JdbcSourceTask.java:333)
       at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:201)
       at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:283)
       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:227)
       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
       at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
       at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
       at java.base/java.lang.Thread.run(Thread.java:840)

I can confirm this is not happening when TABLE is used

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants