-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
s3 object summary iterator improvements #340
s3 object summary iterator improvements #340
Conversation
b6ccad4
to
f3ac5a5
Compare
fcad7e7
to
a31b0ca
Compare
# Conflicts: # s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java # s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/FileReader.java # s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java # s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java # s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java
6c4e125
to
6d33432
Compare
…S3ObjectSummaryIterator_improvements
* @param request | ||
* the request object that defines the starting position for the object summary retrieval. | ||
*/ | ||
@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "stores mutable AmazeonS3 and ListObjectsV2Request objects") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Amazon misspelt
/** the ObjectRequest initially to start the iteration from later to retrieve more records */ | ||
private final ListObjectsV2Request request; | ||
|
||
/** The last key seen by this process. This allows us to restart when a new file is dropped in the direcotry */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say not necessarily when a new file is dropped, more like just let us query S3 from after the last known processed file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a nit, I just think wording is misleading.
objectListing = s3Client.listObjectsV2(request); | ||
innerIterator = objectListing.getObjectSummaries().iterator(); | ||
} | ||
if (!this.innerIterator.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice efficiency brought to the iterator so we only call when we run out of object summaries 👍
private static final String PATTERN_TOPIC_KEY = "topicName"; | ||
private static final String PATTERN_PARTITION_KEY = "partitionId"; | ||
|
||
private static final Pattern FILE_DEFAULT_PATTERN = Pattern.compile("(?<topicName>[^/]+?)-" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to create an issue for how this is hardcoded and how we are handling it, not sure if @RyanSkraba has this handled in his work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few nits but nothing major
This PR is outdated and subsumed into other changes. |
Changed iterator inside FileReader to prepare to handle offsets and other reset operations.
Reduced the custom code overhead.
Added ability to use Predicates to filter S3ObjectSummary objects.