Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect | Inconsistent behaviour with REST API in Distributed Mode #50

Open
1riatsila1 opened this issue Aug 25, 2022 · 2 comments

Comments

@1riatsila1
Copy link

1riatsila1 commented Aug 25, 2022

When running the following request against the kafka connect cluster in distributed mode (2 worker pods) in k8s, the following can be observed:

curl http://kafka-connect:8083/connectors/test -X DELETE
{"error_code":500,"message":"Error trying to forward REST request: Connector test not found"}

curl http://kafka-connect:8083/connectors/test -X DELETE
{"error_code":500,"message":"Error trying to forward REST request: Connector test not found"}

curl http://kafka-connect:8083/connectors/test -X DELETE
{"error_code":404,"message":"Connector test not found"}

Where the connector called test does not exist.

The requests are running in quick succession. As you can see, occasionally the server responds with a 500 and other times, a 404. The working theory is that occasionally, the follower worker gets the request, forwards the request to the leader, receives a 404 back and then returns a 500. Is this desired behaviour, or should the follower simply be echoing the leader's response back? I would expect to see a 404 in all cases.

I am not sure if this is related, but the behaviour was first noticed after upgrading to cp-kafka-connect 7.2.1

@moisescastellano
Copy link

Hi: we have a similar issue :

Most of it has been temporarily solved by downloading the replicas of the Kafka cluster (Kafka Connect pods) from 3 to 1 - so this seems like a problem with balancing - however this is not ok for Prod environment and we are still searching for the root cause.

Our architecture needs a Kafka connector that provisions a topic when there are changes in Cloudant, we had also to (in real time) create a connector when the user logs in for the first time.

We have been able to create the Kafka Connectors via the Kafka Connect REST API (https://docs.confluent.io/platform/current/connect/references/restapi.html),
however it randomly fails and provides "java.net.SocketTimeoutException: Connect Timeout".

Even worst, after the first fail it behaves badly providing the SocketTimeoutException to subsequent POST creation requests,
even when Kafka Connect API is up and responding ok to e.g. GET connectors requests.

We have been googling this issue ("SocketTimeoutException kafka connect api") and a few people got it but a clear solution is not available beyond obvious ones like changing timeouts.

The POST request to the Kafka Connect API:
{
"name": "xxxxxxxxxxx",
"config": {
"connector.class": "com.ibm.cloudant.kafka.connect.CloudantSourceConnector",
"cloudant.db.url": "https://xxxxxxxxxx.cloudantnosqldb.appdomain.cloud/xxxxxxxxx",
"cloudant.db.username": "xxxxxxxxxxxx",
"cloudant.db.password": "xxxxxxxxxxxx",
"topics": "mapis-dev-cloudant-test-provisioning-connector",
"connection.timeout.ms": 5000,
"read.timeout.ms": 5000
}
}

Our code is just creating the previous request:

         URL apiUrl = new URL(kafkaConnectApiUrl.get());
         HttpsURLConnection http = (HttpsURLConnection)apiUrl.openConnection();
         http.setRequestMethod("POST");
         http.setDoOutput(true);
         http.setRequestProperty("Accept", "application/json");
         http.setRequestProperty("Content-Type", "application/json");

         String data = "{\n "name": "" + dbName + """
                     + ",\n "config": {"
                           + "\n \t "connector.class": "com.ibm.cloudant.kafka.connect.CloudantSourceConnector""
                           + ",\n \t "cloudant.db.url": "" + cloudantUrl.get() + "/" + dbName + """
                           + ",\n \t "cloudant.db.username": "" + username.get() + """
                           + ",\n \t "cloudant.db.password": "" + password.get() + """
                           + ",\n \t "topics": "" + kafkaTopic.get() + """
                           + ",\n \t "connection.timeout.ms": 5000"
                           + ",\n \t "read.timeout.ms": 5000"
                     + "\n \t}"
                     + "\n}"
                     ;
      
         byte[] out = data.getBytes(StandardCharsets.UTF_8);
      
         OutputStream stream = http.getOutputStream();
         stream.write(out);
         stream.flush();
         stream.close();
     int responseCode = http.getResponseCode();
         http.disconnect();

@joelmin93
Copy link

Hi @moisescastellano we have the same issue with Connect Timeout. Have you figured out a solution for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants