Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3 object summary iterator improvements #340

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions commons/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {
implementation(logginglibs.slf4j)

implementation(apache.commons.text)
implementation(apache.commons.collection4)

implementation(apache.parquet.avro) {
exclude(group = "org.xerial.snappy", module = "snappy-java")
Expand Down
1 change: 1 addition & 0 deletions s3-source-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ dependencies {
compileOnly(apache.kafka.connect.api)
compileOnly(apache.kafka.connect.runtime)

implementation(apache.commons.collection4)
implementation(project(":commons"))
implementation(project(":s3-commons"))
implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;

import io.aiven.kafka.connect.s3.source.config.S3ClientFactory;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
Expand All @@ -30,6 +28,7 @@
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.commons.collections4.IteratorUtils;
import org.codehaus.plexus.util.StringUtils;

/**
Expand All @@ -53,11 +52,7 @@ public class AWSV2SourceClient {
* all objectKeys which have already been tried but have been unable to process.
*/
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys) {
this.s3SourceConfig = s3SourceConfig;
final S3ClientFactory s3ClientFactory = new S3ClientFactory();
this.s3Client = s3ClientFactory.createAmazonS3Client(s3SourceConfig);
this.bucketName = s3SourceConfig.getAwsS3BucketName();
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
this(new S3ClientFactory().createAmazonS3Client(s3SourceConfig), s3SourceConfig, failedObjectKeys);
}

/**
Expand All @@ -76,58 +71,86 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String>
this.s3Client = s3Client;
this.bucketName = s3SourceConfig.getAwsS3BucketName();
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
this.filterPredicate = filterPredicate.and(new FailedObjectFilter());
}

public Iterator<String> getListOfObjectKeys(final String startToken) {
/**
* Gets an iterator of S3Objects. Performs the filtering based on the filters provided. Always filters Objects with
* a size of 0 (zero), and objects that have been added to the failed objects list.
*
* @param startToken
* the token (key) to start from.
* @return an iterator of S3Objects.
*/
public Iterator<S3Object> getObjectIterator(final String startToken) {
final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR);

if (StringUtils.isNotBlank(startToken)) {
request.withStartAfter(startToken);
}
// perform the filtering of S3ObjectSummaries
final Iterator<S3ObjectSummary> s3ObjectSummaryIterator = IteratorUtils
.filteredIterator(new S3ObjectSummaryIterator(s3Client, request), filterPredicate::test);
// transform S3ObjectSummary to S3Object
return IteratorUtils.transformedIterator(s3ObjectSummaryIterator,
objectSummary -> getObject(objectSummary.getKey()));
}

final Stream<String> s3ObjectKeyStream = Stream
.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> {
// This is called every time next() is called on the iterator.
if (response.isTruncated()) {
return s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR)
.withContinuationToken(response.getNextContinuationToken()));
} else {
return null;
}

})
.flatMap(response -> response.getObjectSummaries()
.stream()
.filter(filterPredicate)
.filter(objectSummary -> assignObjectToTask(objectSummary.getKey()))
.filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey())))
.map(S3ObjectSummary::getKey);
return s3ObjectKeyStream.iterator();
/**
* Adds a filter by "AND"ing it to the existing filters.
*
* @param other
* the filter to add.
*/
public void andFilter(final Predicate<S3ObjectSummary> other) {
filterPredicate = filterPredicate.and(other);
}

/**
* Adds a filter by "OR"ing it with the existing filters.
*
* @param other
* the filter to add.
*/
public void orFilter(final Predicate<S3ObjectSummary> other) {
filterPredicate = filterPredicate.or(other);
}

/**
* Get the S3Object from the source.
*
* @param objectKey
* the object key to retrieve.
* @return the S3Object.
*/
public S3Object getObject(final String objectKey) {
return s3Client.getObject(bucketName, objectKey);
}

/**
* Add an object key to the list of failed keys. These will be ignored during re-reads of the data stream.
*
* @param objectKey
* the key to ignore
*/
public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

public void setFilterPredicate(final Predicate<S3ObjectSummary> predicate) {
filterPredicate = predicate;
}

private boolean assignObjectToTask(final String objectKey) {
final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString());
final int taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks;
final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks);
return taskAssignment == taskId;
}

/**
* Shuts down the system
*/
public void shutdown() {
s3Client.shutdown();
}

/**
* Filter to remove objects that are in the failed object keys list.
*/
class FailedObjectFilter implements Predicate<S3ObjectSummary> {
@Override
public boolean test(final S3ObjectSummary objectSummary) {
return !failedObjectKeys.contains(objectSummary.getKey());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.s3.source.utils;

import java.util.Iterator;
import java.util.NoSuchElementException;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

/**
* Implements a ObjectSummaryIterator on an S3 bucket. Implementation reads summaries in blocks and iterates over each
* block. When block is empty a new block is retrieved and processed until no more data is available.
*/
public class S3ObjectSummaryIterator implements Iterator<S3ObjectSummary> {
/** The client we are using */
private final AmazonS3 s3Client;
/** The object listing from the last call to the client */
private ListObjectsV2Result objectListing;
/** The inner iterator on the object summaries. When it is empty a new one is read from object listing. */
private Iterator<S3ObjectSummary> innerIterator;

/** the ObjectRequest initially to start the iteration from later to retrieve more records */
private final ListObjectsV2Request request;

/** The last key seen by this process. This allows us to restart when a new file is dropped in the direcotry */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say not necessarily when a new file is dropped, more like just let us query S3 from after the last known processed file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a nit, I just think wording is misleading.

private String lastObjectSummaryKey;

/**
* Constructs the s3ObjectSummaryIterator based on the Amazon se client.
*
* @param s3Client
* the Amazon client to read use for access.
* @param request
* the request object that defines the starting position for the object summary retrieval.
*/
@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "stores mutable AmazeonS3 and ListObjectsV2Request objects")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Amazon misspelt

public S3ObjectSummaryIterator(final AmazonS3 s3Client, final ListObjectsV2Request request) {
this.s3Client = s3Client;
this.request = request;
}

@Override
public boolean hasNext() {
// delay creating objectListing until we need it.
if (objectListing == null) {
objectListing = s3Client.listObjectsV2(request);
innerIterator = objectListing.getObjectSummaries().iterator();
}
if (!this.innerIterator.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice efficiency brought to the iterator so we only call when we run out of object summaries 👍

if (objectListing.isTruncated()) {
// get the next set of data and create an iterator on it.
request.withContinuationToken(objectListing.getContinuationToken());
objectListing = s3Client.listObjectsV2(request);
} else {
// there is no more data -- reread the bucket
request.withContinuationToken(null);
if (lastObjectSummaryKey != null) {
request.withStartAfter(lastObjectSummaryKey);
}
objectListing = s3Client.listObjectsV2(request);
}
innerIterator = objectListing.getObjectSummaries().iterator();
}
// innerIterator is configured. Does it have more?
return innerIterator.hasNext();
}

@Override
public S3ObjectSummary next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
final S3ObjectSummary result = innerIterator.next();
lastObjectSummaryKey = result.getKey();
return result;
}
}
Loading
Loading