Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify code: Fix typos, refactor logic, and remove unused parameters. #329

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ record -> String.format("%s+%s+%s", record.topic(), record.kafkaPartition(), rec

private final Function<SinkRecord, String> docIdGenerator;

private DocumentIDStrategy(final String name, final String description,
DocumentIDStrategy(final String name, final String description,
final Function<SinkRecord, String> docIdGenerator) {
this.name = name.toLowerCase(Locale.ROOT);
this.description = description;
Expand Down Expand Up @@ -74,7 +74,7 @@ public static String describe() {
}

public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() {
private final String[] names = Arrays.stream(values()).map(v -> v.toString()).toArray(String[]::new);
private final String[] names = Arrays.stream(values()).map(Object::toString).toArray(String[]::new);
private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,10 @@ public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder h
.build();

final Collection<OpensearchClientConfigurator> configurators = ClientsConfiguratorProvider
.forOpensearch(config);
.forOpensearch();
configurators.forEach(configurator -> {
if (configurator.apply(config, httpClientBuilder)) {
LOGGER.debug("Successfuly applied " + configurator.getClass().getName()
LOGGER.debug("successfully applied " + configurator.getClass().getName()
+ " configurator to OpensearchClient");
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public Class<? extends Task> taskClass() {
@Override
public List<Map<String, String>> taskConfigs(final int maxTasks) {
final List<Map<String, String>> taskConfigs = new ArrayList<>();
final Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(configProperties);
final Map<String, String> taskProps = new HashMap<>(configProperties);
for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(taskProps);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static <T, E extends Exception> T callWithRetry(final String callName, fi
final long sleepTimeMs = computeRandomRetryWaitTimeInMillis(retryAttempts, retryBackoffMs);
final var msg = String.format("Failed to %s with attempt %s/%s, will attempt retry after %s ms. ",
callName, attempts, maxAttempts, sleepTimeMs);
LOGGER.warn(msg + "Failure reason: {}", e);
LOGGER.warn(msg + "Failure reason: ", e);
time.sleep(sleepTimeMs);
} else {
final var msg = String.format("Failed to %s after total of %s attempt(s)", callName, attempts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.ServiceLoader;

import io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig;

public final class ClientsConfiguratorProvider {
private ClientsConfiguratorProvider() {
Expand All @@ -30,19 +28,15 @@ private ClientsConfiguratorProvider() {
* Use {@link ServiceLoader} mechanism to discover available configurators for Opensearch (and possibly others)
* clients which are applicable to the provided configuration.
*
* @param config
* provided configuration
* @return the list of discovered {@link OpensearchClientConfigurator} configurators which are applicable to the
* provided configuration.
*/
public static Collection<OpensearchClientConfigurator> forOpensearch(final OpensearchSinkConnectorConfig config) {
public static Collection<OpensearchClientConfigurator> forOpensearch() {
final Collection<OpensearchClientConfigurator> configurators = new ArrayList<>();
final ServiceLoader<OpensearchClientConfigurator> loaders = ServiceLoader
.load(OpensearchClientConfigurator.class, ClientsConfiguratorProvider.class.getClassLoader());

final Iterator<OpensearchClientConfigurator> iterator = loaders.iterator();
while (iterator.hasNext()) {
final OpensearchClientConfigurator configurator = iterator.next();
for (OpensearchClientConfigurator configurator : loaders) {
configurators.add(configurator);
}

Expand Down