Skip to content

Commit

Permalink
Updated for latest s3-source-release
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 11, 2024
1 parent bda8854 commit 6d33432
Show file tree
Hide file tree
Showing 9 changed files with 867 additions and 319 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;

import io.aiven.kafka.connect.s3.source.config.S3ClientFactory;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
Expand All @@ -30,6 +28,7 @@
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.commons.collections4.IteratorUtils;
import org.codehaus.plexus.util.StringUtils;

/**
Expand All @@ -53,11 +52,7 @@ public class AWSV2SourceClient {
* all objectKeys which have already been tried but have been unable to process.
*/
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys) {
this.s3SourceConfig = s3SourceConfig;
final S3ClientFactory s3ClientFactory = new S3ClientFactory();
this.s3Client = s3ClientFactory.createAmazonS3Client(s3SourceConfig);
this.bucketName = s3SourceConfig.getAwsS3BucketName();
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
this(new S3ClientFactory().createAmazonS3Client(s3SourceConfig), s3SourceConfig, failedObjectKeys);
}

/**
Expand All @@ -76,58 +71,86 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String>
this.s3Client = s3Client;
this.bucketName = s3SourceConfig.getAwsS3BucketName();
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
this.filterPredicate = filterPredicate.and(new FailedObjectFilter());
}

public Iterator<String> getListOfObjectKeys(final String startToken) {
/**
* Gets an iterator of S3Objects. Performs the filtering based on the filters provided. Always filters Objects with
* a size of 0 (zero), and objects that have been added to the failed objects list.
*
* @param startToken
* the token (key) to start from.
* @return an iterator of S3Objects.
*/
public Iterator<S3Object> getObjectIterator(final String startToken) {
final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR);

if (StringUtils.isNotBlank(startToken)) {
request.withStartAfter(startToken);
}
// perform the filtering of S3ObjectSummaries
final Iterator<S3ObjectSummary> s3ObjectSummaryIterator = IteratorUtils
.filteredIterator(new S3ObjectSummaryIterator(s3Client, request), filterPredicate::test);
// transform S3ObjectSummary to S3Object
return IteratorUtils.transformedIterator(s3ObjectSummaryIterator,
objectSummary -> getObject(objectSummary.getKey()));
}

final Stream<String> s3ObjectKeyStream = Stream
.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> {
// This is called every time next() is called on the iterator.
if (response.isTruncated()) {
return s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR)
.withContinuationToken(response.getNextContinuationToken()));
} else {
return null;
}

})
.flatMap(response -> response.getObjectSummaries()
.stream()
.filter(filterPredicate)
.filter(objectSummary -> assignObjectToTask(objectSummary.getKey()))
.filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey())))
.map(S3ObjectSummary::getKey);
return s3ObjectKeyStream.iterator();
/**
* Adds a filter by "AND"ing it to the existing filters.
*
* @param other
* the filter to add.
*/
public void andFilter(final Predicate<S3ObjectSummary> other) {
filterPredicate = filterPredicate.and(other);
}

/**
* Adds a filter by "OR"ing it with the existing filters.
*
* @param other
* the filter to add.
*/
public void orFilter(final Predicate<S3ObjectSummary> other) {
filterPredicate = filterPredicate.or(other);
}

/**
* Get the S3Object from the source.
*
* @param objectKey
* the object key to retrieve.
* @return the S3Object.
*/
public S3Object getObject(final String objectKey) {
return s3Client.getObject(bucketName, objectKey);
}

/**
* Add an object key to the list of failed keys. These will be ignored during re-reads of the data stream.
*
* @param objectKey
* the key to ignore
*/
public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

public void setFilterPredicate(final Predicate<S3ObjectSummary> predicate) {
filterPredicate = predicate;
}

private boolean assignObjectToTask(final String objectKey) {
final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString());
final int taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks;
final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks);
return taskAssignment == taskId;
}

/**
* Shuts down the system
*/
public void shutdown() {
s3Client.shutdown();
}

/**
* Filter to remove objects that are in the failed object keys list.
*/
class FailedObjectFilter implements Predicate<S3ObjectSummary> {
@Override
public boolean test(final S3ObjectSummary objectSummary) {
return !failedObjectKeys.contains(objectSummary.getKey());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.connect.data.SchemaAndValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class S3ObjectSummaryIterator implements Iterator<S3ObjectSummary> {
/** the ObjectRequest initially to start the iteration from later to retrieve more records */
private final ListObjectsV2Request request;

/** The last key seen by this process. This allows us to restart when a new file is dropped in the direcotry */
private String lastObjectSummaryKey;

/**
* Constructs the s3ObjectSummaryIterator based on the Amazon se client.
*
Expand All @@ -58,17 +61,22 @@ public S3ObjectSummaryIterator(final AmazonS3 s3Client, final ListObjectsV2Reque
public boolean hasNext() {
// delay creating objectListing until we need it.
if (objectListing == null) {
this.objectListing = s3Client.listObjectsV2(request);
this.innerIterator = objectListing.getObjectSummaries().iterator();
objectListing = s3Client.listObjectsV2(request);
innerIterator = objectListing.getObjectSummaries().iterator();
}
if (!this.innerIterator.hasNext()) {
if (!objectListing.isTruncated()) {
// there is no more data
return false;
if (objectListing.isTruncated()) {
// get the next set of data and create an iterator on it.
request.withContinuationToken(objectListing.getContinuationToken());
objectListing = s3Client.listObjectsV2(request);
} else {
// there is no more data -- reread the bucket
request.withContinuationToken(null);
if (lastObjectSummaryKey != null) {
request.withStartAfter(lastObjectSummaryKey);
}
objectListing = s3Client.listObjectsV2(request);
}
// get the next set of data and create an iterator on it.
request.withContinuationToken(objectListing.getContinuationToken());
objectListing = s3Client.listObjectsV2(request);
innerIterator = objectListing.getObjectSummaries().iterator();
}
// innerIterator is configured. Does it have more?
Expand All @@ -80,6 +88,8 @@ public S3ObjectSummary next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return innerIterator.next();
final S3ObjectSummary result = innerIterator.next();
lastObjectSummaryKey = result.getKey();
return result;
}
}
Loading

0 comments on commit 6d33432

Please sign in to comment.