-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
s3 object summary iterator improvements #340
Closed
Claudenw
wants to merge
8
commits into
Aiven-Open:s3-source-release
from
Claudenw:KCON-51_S3ObjectSummaryIterator_improvements
Closed
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
f21887c
Changes to use S3ObjectSummaryIterator
ee11e19
fixed spotless error, fixed FileReader return error
121b8b8
migrated failed object management outside of FileReader
879e1c4
fixed PMD issues
e0c72cc
fixed Spotbugs issues
bda8854
fixed spotless issues
6d33432
Updated for latest s3-source-release
1f50b1c
Merge remote-tracking branch 'origin/s3-source-release' into KCON-51_…
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
95 changes: 95 additions & 0 deletions
95
...nnector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
* Copyright 2024 Aiven Oy | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.aiven.kafka.connect.s3.source.utils; | ||
|
||
import java.util.Iterator; | ||
import java.util.NoSuchElementException; | ||
|
||
import com.amazonaws.services.s3.AmazonS3; | ||
import com.amazonaws.services.s3.model.ListObjectsV2Request; | ||
import com.amazonaws.services.s3.model.ListObjectsV2Result; | ||
import com.amazonaws.services.s3.model.S3ObjectSummary; | ||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; | ||
|
||
/** | ||
* Implements a ObjectSummaryIterator on an S3 bucket. Implementation reads summaries in blocks and iterates over each | ||
* block. When block is empty a new block is retrieved and processed until no more data is available. | ||
*/ | ||
public class S3ObjectSummaryIterator implements Iterator<S3ObjectSummary> { | ||
/** The client we are using */ | ||
private final AmazonS3 s3Client; | ||
/** The object listing from the last call to the client */ | ||
private ListObjectsV2Result objectListing; | ||
/** The inner iterator on the object summaries. When it is empty a new one is read from object listing. */ | ||
private Iterator<S3ObjectSummary> innerIterator; | ||
|
||
/** the ObjectRequest initially to start the iteration from later to retrieve more records */ | ||
private final ListObjectsV2Request request; | ||
|
||
/** The last key seen by this process. This allows us to restart when a new file is dropped in the direcotry */ | ||
private String lastObjectSummaryKey; | ||
|
||
/** | ||
* Constructs the s3ObjectSummaryIterator based on the Amazon se client. | ||
* | ||
* @param s3Client | ||
* the Amazon client to read use for access. | ||
* @param request | ||
* the request object that defines the starting position for the object summary retrieval. | ||
*/ | ||
@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "stores mutable AmazeonS3 and ListObjectsV2Request objects") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: Amazon misspelt |
||
public S3ObjectSummaryIterator(final AmazonS3 s3Client, final ListObjectsV2Request request) { | ||
this.s3Client = s3Client; | ||
this.request = request; | ||
} | ||
|
||
@Override | ||
public boolean hasNext() { | ||
// delay creating objectListing until we need it. | ||
if (objectListing == null) { | ||
objectListing = s3Client.listObjectsV2(request); | ||
innerIterator = objectListing.getObjectSummaries().iterator(); | ||
} | ||
if (!this.innerIterator.hasNext()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a nice efficiency brought to the iterator so we only call when we run out of object summaries 👍 |
||
if (objectListing.isTruncated()) { | ||
// get the next set of data and create an iterator on it. | ||
request.withContinuationToken(objectListing.getContinuationToken()); | ||
objectListing = s3Client.listObjectsV2(request); | ||
} else { | ||
// there is no more data -- reread the bucket | ||
request.withContinuationToken(null); | ||
if (lastObjectSummaryKey != null) { | ||
request.withStartAfter(lastObjectSummaryKey); | ||
} | ||
objectListing = s3Client.listObjectsV2(request); | ||
} | ||
innerIterator = objectListing.getObjectSummaries().iterator(); | ||
} | ||
// innerIterator is configured. Does it have more? | ||
return innerIterator.hasNext(); | ||
} | ||
|
||
@Override | ||
public S3ObjectSummary next() { | ||
if (!hasNext()) { | ||
throw new NoSuchElementException(); | ||
} | ||
final S3ObjectSummary result = innerIterator.next(); | ||
lastObjectSummaryKey = result.getKey(); | ||
return result; | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say not necessarily when a new file is dropped, more like just let us query S3 from after the last known processed file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a nit, I just think wording is misleading.