Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 source connector #317

Draft
wants to merge 89 commits into
base: main
Choose a base branch
from
Draft

S3 source connector #317

wants to merge 89 commits into from

Conversation

muralibasani
Copy link
Contributor

No description provided.

@muralibasani muralibasani marked this pull request as ready for review October 31, 2024 11:36
@muralibasani muralibasani requested review from a team as code owners October 31, 2024 11:36
}

@Test
void multiPartUploadBytesTest(final TestInfo testInfo) throws ExecutionException, InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test is required, as it is testing the multipart upload rather then the source connector.
The multipart upload writes the file on closing of the stream, so it appears as any old file to the source connector.

* AivenKafkaConnectS3SourceConnector is a Kafka Connect Connector implementation that watches a S3 bucket and generates
* tasks to ingest contents.
*/
public class AivenKafkaConnectS3SourceConnector extends SourceConnector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd personally still really like to rename this S3SourceConnector

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was defined aligning with Sink Connector (AivenKafkaConnectS3SinkConnector).
Happy to change if we have a 2nd opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a personal preference, I feel like we should remove erroneous use of Aiven in class names.

@muralibasani
Copy link
Contributor Author

@aindriu-aiven thanks for the review. PR #329 open to fix the review.


private void waitForObjects() throws InterruptedException {
while (!sourceRecordIterator.hasNext() && !connectorStopped.get()) {
LOGGER.debug("Blocking until new S3 files are available.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this, should we break out of this loop after a certain amount of retries and return null, and then wait for the next polling thread again? It is much for muchness but I figure we shouldn't have a sleep that could in the event of some mistake in config potentially use a thread and sleep indefinitely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we have a ticket for poll/synchronized which also mentioned about this sleep.
I believe this retries will be addressed in the same.

public interface IntegrationBase {

String DOCKER_IMAGE_KAFKA = "confluentinc/cp-kafka:7.7.0";
String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
String PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";

This is actually from a point @AnatolyPopov made


String DOCKER_IMAGE_KAFKA = "confluentinc/cp-kafka:7.7.0";
String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
String S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-";
String S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-";


long currentOffset;

if (offsetManager.getOffsets().containsKey(partitionMap)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to move this if-else into the offset manager, we can create an interface, and re-use this functionality in all source connectors going forward?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would be better that way.

@muralibasani muralibasani marked this pull request as draft November 7, 2024 09:55
muralibasani and others added 9 commits November 25, 2024 11:12
Partially fixes https://aiven.atlassian.net/browse/KCON-2

Currently when max.tasks is set to above 1, then each of those tasks are processing all objects in the bucket, which should not be the case.

This pr does the below (bug fix for distributed mode)

* based on the hash of the object key, assigns the objects to tasks
* Updated integration tests with max tasks > 1
… all (#351)

Currently the transformers load the files and get a list of records. This could cause performance issues for large files.

* With Stream/StreamSupport, only when next() is called from iterator, a record is transformed.
RyanSkraba and others added 20 commits November 25, 2024 16:03
* Migrating tests to Awaitility instead of plain Thread.sleep
* Some refactoring to unify message consumption logic in tests where
possible.
Current implementation cannot handle large Avro files, due to the
initialisation of stream in try resources within transformer.

- In this custom splitter - The tryAdvance method reads one record at a
time and processes it.
- Updated integration test with large number of avro records in one
object
Move all the source common config so it can be re-used by other source
connectors.
* Adds two config fragments which are logically groupings for source
connector configuration
* Parquet is moved with library changes to use the same parquet version
as the sink connector.

---------

Signed-off-by: Aindriu Lavelle <[email protected]>
The update makes updates to the SourceRecordIterator to remove the
requirement for a S3Client and specific S3 knowledge from the iterator.

The iterator will now also call for more files after the initial set of
files has been processed.


The only remaining work to be done is to remove the construction of the
S3Object into an iterator from the SourceRecordIterator in a follow up
PR which will allow it to be completely re-useable.

---------

Signed-off-by: dependabot[bot] <[email protected]>
Signed-off-by: Aindriu Lavelle <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Aindriu Lavelle <[email protected]>
KCON-36

If a certain number of records are already processed in a file during
dr, do not re process those records.
Number of processed recs are already stored in offset storage. Retrieve
that and skip in the stream.
Add Errors tolerance configuration.

1) Allows configuration of the connect framework feature and how it
should handle source records which are malformed or unable to be added
to a kafka topic.
2) Also checks RecordProcessor and if a failed or malformed record
should be ignored or cause the failure of the Connector.

---------

Signed-off-by: Aindriu Lavelle <[email protected]>
* This update means we can now use the PREFIX in the AWS API allowing
users to configure it to be more specific about what they want processed
by the connector.

---------

Signed-off-by: Aindriu Lavelle <[email protected]>
Fix for KCON-82

This PR fixes the bugs and simplifies the creation of a proper
transformer by relieving the developer of having to think about the
stream and just think about how to create the item.

Streaming Tests are included for all current Transformer
implementations.

Transformer is converted to an abstract class.

An abstract Transformer.StreamSpliterator class is created to handle the
common checking for end of stream and closing the input file(s).
KCON-25

Addresses
#316 (comment)

- Removed the converters instantiation
- For avro using AvroData utils to create SchemaAndValue
- For json, as there are no utils, relying on json converter
- Deleted the transformation of data (serialization, toConnectData) in
transformers

With this change, redundant transformation is removed, making it
flexible for consumers
Add Service Loader for quick start up

Signed-off-by: Aindriu Lavelle <[email protected]>
KCON-9 Read me  to configure aws s3 source connector
The AWS 1.X sdk is in maintenance mode and will be out of support by
December 2025.

Key differences are
* Use of the builder pattern when creating objects
* get and set removed from getters and setters e.g. getKey(),
setKey(newKey) -> key(), key(newKey)
* S3Client is immutable
* different package names
* Additional built in functionality removing some of the work from the
connector implementation and having the existing library handle it.

SDK 1.X still in use by sink connector but that will be required to be
updated as well in the future, but this means the s3-commons code has
both the 1.x and 2.x jars.

---------

Signed-off-by: Aindriu Lavelle <[email protected]>
Increase performance of the S3 Integration tests by allowing the polling
time to be dependent on the test and to take into account if their are
more records left to be retrieved before waiting to collect the next
batch of records.

This allows the consuming of messages to occur faster if additional
messages are already waiting to be retrieved.
This allows different tests to allow a more subtle control of the time
to wait before timing out. (previously all tests would wait the maximum
5 minutes even if only waiting for 5 messages from kafka)

This should save 2-3 minutes per Integration test run and allow on a
failure run to save between 12-15 minutes on each test run.

---------

Signed-off-by: Aindriu Lavelle <[email protected]>
Fixes for KCON-26 - Backoff when no data available.
Fixes for KCON-28 - Improve poll method

Creates an AbstractSourceTask in commons to handle response to poll and
backoff calculations as well as start, stop. Implementations need to
implement an Iterator that poll will call to retrieve data.

Private classes Timer and Backoff are created in AbstractSourceTask and
may be moved out at a later date if needed elsewhere.

Changes made to configurations to support configuration extraction in
AbstractSourceTask.

Modifications to S3SourceTask to operate under AbstractSourceTask.

Additional tests added

---------

Co-authored-by: ¨Claude <¨[email protected]¨>
Co-authored-by: Jarkko Jaakola <[email protected]>
Co-authored-by: Murali Basani <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants