Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Place offset manager in commons #373

Open
wants to merge 51 commits into
base: s3-source-release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
29b360d
intial changes
Dec 27, 2024
22fa3a0
Merge remote-tracking branch 'origin/s3-source-release' into polling
Dec 27, 2024
b6f2d08
fixed test cases
Dec 27, 2024
38fe822
Merge branch 's3-source-release' into polling_efficiency
Dec 27, 2024
0a10606
Added more complete tests
Dec 30, 2024
d7b8236
Merge branch 's3-source-release' into polling_efficiency
Claudenw Dec 31, 2024
9ccb1a8
Migrate to new AWS client
Dec 31, 2024
d3ce578
added AWS Integration test
Dec 31, 2024
e744e32
attempts to fix polling
Jan 2, 2025
df11418
sped up integration test
Jan 2, 2025
be775b0
fixed tests
Jan 6, 2025
ebf0e8a
removed unused class
Jan 6, 2025
9c73cd4
removed unused methods
Jan 6, 2025
d7b2f3c
Changes to allow Backoff to abort the timer
Jan 6, 2025
cf32e79
updated javadoc
Jan 6, 2025
a7c2570
Merge remote-tracking branch 'origin/s3-source-release' into polling_…
Jan 6, 2025
4dd9ad8
fixed PMD errors
Jan 6, 2025
f401478
fixed testing errors
Jan 6, 2025
1d73e7f
added javadoc
Jan 6, 2025
2c0426a
fixed backoff test issue
Jan 7, 2025
332c3f5
fixed testPollWithSlowProducer test issue
Jan 7, 2025
06a12a3
added test for AWSV2SourceClient rehydration
Jan 7, 2025
0edf114
Fix for slow polling test
Jan 7, 2025
da17e1f
Changes as per review
Jan 7, 2025
dc35ee4
Update commons/src/main/java/io/aiven/kafka/connect/common/source/Abs…
Claudenw Jan 9, 2025
7aa64fc
Update commons/src/main/java/io/aiven/kafka/connect/common/source/Abs…
Claudenw Jan 9, 2025
c25a877
Update commons/src/main/java/io/aiven/kafka/connect/common/source/Abs…
Claudenw Jan 9, 2025
37260b3
Update s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/so…
Claudenw Jan 9, 2025
21f713a
Update commons/src/main/java/io/aiven/kafka/connect/common/source/Abs…
Claudenw Jan 9, 2025
eb2c051
Update s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/so…
Claudenw Jan 9, 2025
a9d9380
Update s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/so…
Claudenw Jan 9, 2025
4f3f896
changes as per review requests
Jan 9, 2025
45bd6fd
fix for javadoc
Jan 9, 2025
08c3917
Merge remote-tracking branch 'refs/remotes/upstream/polling_efficienc…
Jan 9, 2025
316a824
initial changes
Jan 9, 2025
c69bb65
merged files for KCON-57
Jan 9, 2025
2637f64
fixed merge issues
Jan 9, 2025
99ffebf
Merge remote-tracking branch 'origin/s3-source-release' into offsetmgr3
Jan 9, 2025
5510586
Added handling of processed Offsets
Jan 9, 2025
3e948ba
updates as per review
Jan 10, 2025
81d46a4
removed unused methods
Jan 10, 2025
96d1523
Update commons/src/main/java/io/aiven/kafka/connect/common/source/Off…
Claudenw Jan 14, 2025
e429d8a
Update commons/src/test/java/io/aiven/kafka/connect/common/source/Off…
Claudenw Jan 14, 2025
1d4b5e4
Update s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/so…
Claudenw Jan 14, 2025
e4187f5
Update s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/so…
Claudenw Jan 14, 2025
b8a64d3
Update commons/src/main/java/io/aiven/kafka/connect/common/source/Off…
Claudenw Jan 14, 2025
cd1f25f
fixed topicName and bucketName
Jan 14, 2025
cd2650a
Changes as per review
Jan 14, 2025
b444b44
fixed spotless errors
Jan 14, 2025
a90fdcb
Merge branch 's3-source-release' into KCON-57_place_OffsetManager_in_…
Jan 15, 2025
394f2d1
modified to solve data change errors
Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private boolean tryAdd(final List<SourceRecord> results, final Iterator<SourceRe
*
* @return {@code true} if the connector is not stopped and the timer has not expired.
*/
protected boolean stillPolling() {
protected final boolean stillPolling() {
final boolean result = !connectorStopped.get() && !timer.isExpired();
logger.debug("Still polling: {}", result);
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OffsetManager<E extends OffsetManager.OffsetManagerEntry<E>> {
/** The logger to write to */
private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class);

/**
* The local manager data.
*/
private final ConcurrentMap<Map<String, Object>, Map<String, Object>> offsets;

/**
* The context in which this is running.
*/
private final SourceTaskContext context;

/**
* Constructor
*
* @param context
* the context for this instance to use.
*/
public OffsetManager(final SourceTaskContext context) {
this(context, new ConcurrentHashMap<>());
}

/**
* Package private for testing.
*
* @param context
* the context for this instance to use.
* @param offsets
* the offsets
*/
protected OffsetManager(final SourceTaskContext context,
final ConcurrentMap<Map<String, Object>, Map<String, Object>> offsets) {
this.context = context;
this.offsets = offsets;
}

/**
* Get an entry from the offset manager. This method will return the local copy if it has been created otherwise
* will get the data from Kafka. If there is not a local copy and not one from Kafka then an empty Optional is
* returned
*
* @param key
* the key for the entry.
* @param creator
* a function to create the connector defined offset entry from a Map of string to object.
* @return the entry.
*/
public Optional<E> getEntry(final OffsetManagerKey key, final Function<Map<String, Object>, E> creator) {
LOGGER.debug("getEntry: {}", key.getPartitionMap());
final Map<String, Object> data = offsets.compute(key.getPartitionMap(), (k, v) -> {
if (v == null) {
final Map<String, Object> kafkaData = context.offsetStorageReader().offset(key.getPartitionMap());
LOGGER.debug("Context stored offset map {}", kafkaData);
return kafkaData == null || kafkaData.isEmpty() ? null : kafkaData;
} else {
LOGGER.debug("Previously stored offset map {}", v);
return v;
}
});
return data == null ? Optional.empty() : Optional.of(creator.apply(data));
}

/**
* Copies the entry into the offset manager data.
*
* @param entry
* the entry to update.
*/
public void updateCurrentOffsets(final E entry) {
LOGGER.debug("Updating current offsets: {}", entry.getManagerKey().getPartitionMap());
offsets.compute(entry.getManagerKey().getPartitionMap(), (k, v) -> {
if (v == null) {
return new HashMap<>(entry.getProperties());
} else {
v.putAll(entry.getProperties());
return v;
}
});
}

/**
* Removes the specified entry from the in memory table. Does not impact the records stored in the
* {@link SourceTaskContext}.
*
* @param key
* the key for the entry to remove.
*/
public void removeEntry(final OffsetManagerKey key) {
LOGGER.debug("Removing: {}", key.getPartitionMap());
offsets.remove(key.getPartitionMap());
}

/**
* Removes the specified entry from the in memory table. Does not impact the records stored in the
* {@link SourceTaskContext}.
*
* @param sourceRecord
* the SourceRecord that contains the key to be removed.
*/
public void removeEntry(final SourceRecord sourceRecord) {
LOGGER.debug("Removing: {}", sourceRecord.sourcePartition());
offsets.remove(sourceRecord.sourcePartition());
}

/**
* The definition of an entry in the OffsetManager.
*/
public interface OffsetManagerEntry<T extends OffsetManagerEntry<T>> extends Comparable<T> {

/**
* Creates a new OffsetManagerEntry by wrapping the properties with the current implementation. This method may
* throw a RuntimeException if requried properties are not defined in the map.
*
* @param properties
* the properties to wrap. May be {@code null}.
* @return an OffsetManagerProperty
*/
T fromProperties(Map<String, Object> properties);

/**
* Extracts the data from the entry in the correct format to return to Kafka.
*
* @return the properties in a format to return to Kafka.
*/
Map<String, Object> getProperties();

/**
* Gets the value of the named property. The value returned from a {@code null} key is implementation dependant.
*
* @param key
* the property to retrieve.
* @return the value associated with the property or @{code null} if not set.
* @throws NullPointerException
* if a {@code null} key is not supported.
*/
Object getProperty(String key);

/**
* Sets a key/value pair. Will overwrite any existing value. Implementations of OffsetManagerEntry may declare
* specific keys as restricted. These are generally keys that are managed internally by the OffsetManagerEntry
* and may not be set except through provided setter methods or the constructor.
*
* @param key
* the key to set.
* @param value
* the value to set.
* @throws IllegalArgumentException
* if the key is restricted.
*/
void setProperty(String key, Object value);

/**
* Gets the value of the named property as an {@code int}.
*
* @param key
* the property to retrieve.
* @return the value associated with the property or @{code null} if not set.
* @throws NullPointerException
* if a {@code null} key is not supported.
*/
default int getInt(final String key) {
return ((Number) getProperty(key)).intValue();
}

/**
* Gets the value of the named property as a {@code long}
*
* @param key
* the property to retrieve.
* @return the value associated with the property or @{code null} if not set.
* @throws NullPointerException
* if a {@code null} key is not supported.
*/
default long getLong(final String key) {
return ((Number) getProperty(key)).longValue();
}

/**
* Gets the value of the named property as a String.
*
* @param key
* the property to retrieve.
* @return the value associated with the property or @{code null} if not set.
* @throws NullPointerException
* if a {@code null} key is not supported.
*/
default String getString(final String key) {
return getProperty(key).toString();
}

/**
* ManagerKey getManagerKey
*
* @return The offset manager key for this entry.
*/
OffsetManagerKey getManagerKey();

/**
* Gets the Kafka topic for this entry.
*
* @return The Kafka topic for this entry.
*/
String getTopic();

/**
* Gets the Kafka partition for this entry.
*
* @return The Kafka partition for this entry.
*/
int getPartition();

/**
* Increments the record count.
*/
void incrementRecordCount();
}

/**
* The OffsetManager Key. Must override hashCode() and equals().
*/
@FunctionalInterface
public interface OffsetManagerKey {
/**
* gets the partition map used by Kafka to identify this Offset entry.
*
* @return The partition map used by Kafka to identify this Offset entry.
*/
Map<String, Object> getPartitionMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.aiven.kafka.connect.common.source.input.utils;

import java.util.Optional;
import java.util.OptionalInt;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -66,14 +67,15 @@ public static Optional<String> getTopic(final Pattern filePattern, final String
return matchPattern(filePattern, sourceName).map(matcher -> matcher.group(PATTERN_TOPIC_KEY));
}

public static Optional<Integer> getPartitionId(final Pattern filePattern, final String sourceName) {
return matchPattern(filePattern, sourceName).flatMap(matcher -> {
public static OptionalInt getPartitionId(final Pattern filePattern, final String sourceName) {
Optional<Integer> result = matchPattern(filePattern, sourceName).flatMap(matcher -> {
try {
return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY)));
} catch (NumberFormatException e) {
return Optional.empty();
}
});
return result.isPresent() ? OptionalInt.of(result.get()) : OptionalInt.empty();
}

private static Optional<Matcher> matchPattern(final Pattern filePattern, final String sourceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.aiven.kafka.connect.common.source.task;

import java.util.Optional;
import java.util.OptionalInt;
import java.util.regex.Pattern;

import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils;
Expand Down Expand Up @@ -52,13 +53,13 @@ public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluat
LOG.warn("Ignoring as it is not passing a correct filename to be evaluated.");
return false;
}
final Optional<Integer> optionalPartitionId = FilePatternUtils.getPartitionId(filePattern,
final OptionalInt optionalPartitionId = FilePatternUtils.getPartitionId(filePattern,
sourceNameToBeEvaluated);

if (optionalPartitionId.isPresent()) {
return optionalPartitionId.get() < maxTasks
? taskMatchesPartition(taskId, optionalPartitionId.get())
: taskMatchesPartition(taskId, optionalPartitionId.get() % maxTasks);
return optionalPartitionId.getAsInt() < maxTasks
? taskMatchesPartition(taskId, optionalPartitionId.getAsInt())
: taskMatchesPartition(taskId, optionalPartitionId.getAsInt() % maxTasks);
}
LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public AbstractSourceTask.AbortTrigger getAbortTrigger() {
assertThat(backoff.estimatedDelay()).isEqualTo(expected);
backoff.delay();
expected *= 2;
assertThat(abortTrigger).isFalse();
}
assertThat(backoff.estimatedDelay()).isEqualTo(maxDelay);
assertThat(abortTrigger).isFalse();
}
}
Loading
Loading