-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
S3 source connector #317
base: main
Are you sure you want to change the base?
S3 source connector #317
Conversation
ee006b8
to
399c883
Compare
...ce-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java
Outdated
Show resolved
Hide resolved
...ce-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java
Outdated
Show resolved
Hide resolved
...ce-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
void multiPartUploadBytesTest(final TestInfo testInfo) throws ExecutionException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this test is required, as it is testing the multipart upload rather then the source connector.
The multipart upload writes the file on closing of the stream, so it appears as any old file to the source connector.
* AivenKafkaConnectS3SourceConnector is a Kafka Connect Connector implementation that watches a S3 bucket and generates | ||
* tasks to ingest contents. | ||
*/ | ||
public class AivenKafkaConnectS3SourceConnector extends SourceConnector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd personally still really like to rename this S3SourceConnector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was defined aligning with Sink Connector (AivenKafkaConnectS3SinkConnector).
Happy to change if we have a 2nd opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is a personal preference, I feel like we should remove erroneous use of Aiven in class names.
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/AvroTransformer.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/JsonTransformer.java
Outdated
Show resolved
Hide resolved
...urce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AivenS3SourceRecord.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java
Outdated
Show resolved
Hide resolved
@aindriu-aiven thanks for the review. PR #329 open to fix the review. |
|
||
private void waitForObjects() throws InterruptedException { | ||
while (!sourceRecordIterator.hasNext() && !connectorStopped.get()) { | ||
LOGGER.debug("Blocking until new S3 files are available."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this, should we break out of this loop after a certain amount of retries and return null, and then wait for the next polling thread again? It is much for muchness but I figure we shouldn't have a sleep that could in the event of some mistake in config potentially use a thread and sleep indefinitely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, we have a ticket for poll/synchronized which also mentioned about this sleep.
I believe this retries will be addressed in the same.
public interface IntegrationBase { | ||
|
||
String DOCKER_IMAGE_KAFKA = "confluentinc/cp-kafka:7.7.0"; | ||
String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/"; | |
String PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/"; |
This is actually from a point @AnatolyPopov made
|
||
String DOCKER_IMAGE_KAFKA = "confluentinc/cp-kafka:7.7.0"; | ||
String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/"; | ||
String S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-"; | |
String S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-"; |
|
||
long currentOffset; | ||
|
||
if (offsetManager.getOffsets().containsKey(partitionMap)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to move this if-else into the offset manager, we can create an interface, and re-use this functionality in all source connectors going forward?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would be better that way.
Partially fixes https://aiven.atlassian.net/browse/KCON-2 Currently when max.tasks is set to above 1, then each of those tasks are processing all objects in the bucket, which should not be the case. This pr does the below (bug fix for distributed mode) * based on the hash of the object key, assigns the objects to tasks * Updated integration tests with max tasks > 1
… all (#351) Currently the transformers load the files and get a list of records. This could cause performance issues for large files. * With Stream/StreamSupport, only when next() is called from iterator, a record is transformed.
fcad7e7
to
a31b0ca
Compare
* Migrating tests to Awaitility instead of plain Thread.sleep * Some refactoring to unify message consumption logic in tests where possible.
Current implementation cannot handle large Avro files, due to the initialisation of stream in try resources within transformer. - In this custom splitter - The tryAdvance method reads one record at a time and processes it. - Updated integration test with large number of avro records in one object
Move all the source common config so it can be re-used by other source connectors. * Adds two config fragments which are logically groupings for source connector configuration * Parquet is moved with library changes to use the same parquet version as the sink connector. --------- Signed-off-by: Aindriu Lavelle <[email protected]>
Signed-off-by: Aindriu Lavelle <[email protected]>
The update makes updates to the SourceRecordIterator to remove the requirement for a S3Client and specific S3 knowledge from the iterator. The iterator will now also call for more files after the initial set of files has been processed. The only remaining work to be done is to remove the construction of the S3Object into an iterator from the SourceRecordIterator in a follow up PR which will allow it to be completely re-useable. --------- Signed-off-by: dependabot[bot] <[email protected]> Signed-off-by: Aindriu Lavelle <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Aindriu Lavelle <[email protected]>
KCON-36 If a certain number of records are already processed in a file during dr, do not re process those records. Number of processed recs are already stored in offset storage. Retrieve that and skip in the stream.
Add Errors tolerance configuration. 1) Allows configuration of the connect framework feature and how it should handle source records which are malformed or unable to be added to a kafka topic. 2) Also checks RecordProcessor and if a failed or malformed record should be ignored or cause the failure of the Connector. --------- Signed-off-by: Aindriu Lavelle <[email protected]>
* This update means we can now use the PREFIX in the AWS API allowing users to configure it to be more specific about what they want processed by the connector. --------- Signed-off-by: Aindriu Lavelle <[email protected]>
Fix for KCON-82 This PR fixes the bugs and simplifies the creation of a proper transformer by relieving the developer of having to think about the stream and just think about how to create the item. Streaming Tests are included for all current Transformer implementations. Transformer is converted to an abstract class. An abstract Transformer.StreamSpliterator class is created to handle the common checking for end of stream and closing the input file(s).
KCON-25 Addresses #316 (comment) - Removed the converters instantiation - For avro using AvroData utils to create SchemaAndValue - For json, as there are no utils, relying on json converter - Deleted the transformation of data (serialization, toConnectData) in transformers With this change, redundant transformation is removed, making it flexible for consumers
Add Service Loader for quick start up Signed-off-by: Aindriu Lavelle <[email protected]>
KCON-9 Read me to configure aws s3 source connector
The AWS 1.X sdk is in maintenance mode and will be out of support by December 2025. Key differences are * Use of the builder pattern when creating objects * get and set removed from getters and setters e.g. getKey(), setKey(newKey) -> key(), key(newKey) * S3Client is immutable * different package names * Additional built in functionality removing some of the work from the connector implementation and having the existing library handle it. SDK 1.X still in use by sink connector but that will be required to be updated as well in the future, but this means the s3-commons code has both the 1.x and 2.x jars. --------- Signed-off-by: Aindriu Lavelle <[email protected]>
Increase performance of the S3 Integration tests by allowing the polling time to be dependent on the test and to take into account if their are more records left to be retrieved before waiting to collect the next batch of records. This allows the consuming of messages to occur faster if additional messages are already waiting to be retrieved. This allows different tests to allow a more subtle control of the time to wait before timing out. (previously all tests would wait the maximum 5 minutes even if only waiting for 5 messages from kafka) This should save 2-3 minutes per Integration test run and allow on a failure run to save between 12-15 minutes on each test run. --------- Signed-off-by: Aindriu Lavelle <[email protected]>
Fixes for KCON-26 - Backoff when no data available. Fixes for KCON-28 - Improve poll method Creates an AbstractSourceTask in commons to handle response to poll and backoff calculations as well as start, stop. Implementations need to implement an Iterator that poll will call to retrieve data. Private classes Timer and Backoff are created in AbstractSourceTask and may be moved out at a later date if needed elsewhere. Changes made to configurations to support configuration extraction in AbstractSourceTask. Modifications to S3SourceTask to operate under AbstractSourceTask. Additional tests added --------- Co-authored-by: ¨Claude <¨[email protected]¨> Co-authored-by: Jarkko Jaakola <[email protected]> Co-authored-by: Murali Basani <[email protected]>
No description provided.