-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add ElasticSearch 8 compatibility #94
Conversation
1008ddd
to
cf41acb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jjaakola-aiven thanks! It looks good overall; though I think we should comment and/or create an issue to update the elasticsearch client to the Java API: https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/introduction.html
I've added a few mostly cosmetic comments, as I agree the usage of the deprecated rest api is a quicker workaround to get v8.x users onboard.
src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java
Outdated
Show resolved
Hide resolved
@@ -15,21 +15,18 @@ | |||
* limitations under the License. | |||
*/ | |||
|
|||
package io.aiven.connect.elasticsearch.jest; | |||
package io.aiven.connect.elasticsearch.clientwrapper; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe move it to i.a.c.elasticsearch.bulk
instead?
src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java
Outdated
Show resolved
Hide resolved
src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/connect/elasticsearch/clientwrapper/BulkRequestImpl.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a look, looks understandable and reasonable as structure. I feel I can debug/continue working on this if required
src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java
Outdated
Show resolved
Hide resolved
@@ -67,7 +67,7 @@ public static JsonObject getMapping(final ElasticsearchClient client, final Stri | |||
* | |||
* @param schema The schema used to infer mapping. | |||
*/ | |||
public static JsonNode inferMapping(final ElasticsearchClient client, final Schema schema) { | |||
public static JsonNode inferMapping(final ElasticsearchClient.Version version, final Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not blocking, I think this should be unit tested and we should compare the JsonNode coming out from before and after the update (to both help us keeping the backcompatibility and also to figure out faster which data do we send to elasticsearch with a certain input)
src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java
Outdated
Show resolved
Hide resolved
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
public class AivenElasticsearchClientWrapperTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note for me later: looked part of the tests, reason about all the cases and if those are covered here. Ask yourself, which interactions do change between version 7 and 8, are those covered here?
a7d9b51
to
9dc3dfc
Compare
kafkaVersion = "2.2.0" | ||
slf4jVersion = "2.0.12" | ||
log4jVersion = "2.23.0" | ||
elasticSearchVersion = "7.4.0" | ||
elasticClientVersion = "7.17.0" | ||
elasticJavaClientVersion = "7.17.18" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this correctly
client supports communicating with greater or equal minor versions of Elasticsearch without breaking
this PR makes connector plugin compatible with ES 7.x and requires elasticJavaClientVersion = "8.x.y"
to make it compatible to ES 8.x?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the 8.x
version of Java client is not required for 8.x
compatibility. The 7.17.18
is enough.
There is two test configurations running, the default which starts 8.x
in container and another defined in the Gradle build file that runs same tests with 7.x
container.
9dc3dfc
to
3492a6a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice improvements and well tested. Left a couple of requests, nothing really blocking. Let me know if you think are reasonable or not
} else if (esVersion.startsWith("8.")) { | ||
return Version.ES_V8; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a warning here? Like: The es version {esVersion} isn't explicitly supported, using the default version {defaultVersion}
. This will help us understanding if we are going to have compatibility problems due to that during RCA analysis for a customer in production
} | ||
|
||
private co.elastic.clients.elasticsearch.ElasticsearchClient getElasticsearchClient() { | ||
Objects.requireNonNull(this.elasticTransport); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldn't we do this in the constructor?
case '7': | ||
return ".*\"message\": \"started.*"; | ||
default: | ||
// Default to major version 8 log message | ||
return ".*\"message\":\"started.*"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if here we explicitly manage only from version 7
going on, previously, in the match statement for the versions shouldn't we raise an exception to say: "this connector its supposed to work only with versions >=7
?
I think its wrong to leave running the version of the connector towards a cluster version that isn't going to work and we know deterministically that its going to always fail, much better to raise an explicit error at the beginning and avoid leaving the debug to the user
src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java
Show resolved
Hide resolved
3492a6a
to
af53041
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect! Thanks a ton
The client to connect to Elasticsearch is changed from Jest to Elasticsearch Java Client API (version 7.17.18). This removes the compatibility with ES 6.x. The compatibility after is for Elasticsearch 7.x and 8.x.