Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 source connector #317

Draft
wants to merge 93 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
c78d1aa
fix conflicts
muralibasani Oct 7, 2024
093a013
rename props file
muralibasani Sep 9, 2024
0b6b75d
EC-243 Rename package
muralibasani Sep 10, 2024
0eaa7fb
Fix conflicts
muralibasani Oct 7, 2024
a67a559
fix conflicts
muralibasani Sep 17, 2024
6870569
Adding required files
muralibasani Sep 12, 2024
8142014
Adding s3 iterator
muralibasani Sep 13, 2024
4d62790
Updating test
muralibasani Sep 13, 2024
f9b5041
Update tests
muralibasani Sep 13, 2024
495f5cc
Adding integration test
muralibasani Sep 14, 2024
a378fb4
Deleted unused package
muralibasani Sep 22, 2024
e408c19
byte array reader
muralibasani Sep 23, 2024
fd65b38
update offset topic, pattern for topic
muralibasani Sep 23, 2024
b7c6ef5
Fix offsets, topic details from key
muralibasani Sep 23, 2024
179f26f
Fix spotbugs
muralibasani Sep 23, 2024
31fb337
Update properties
muralibasani Sep 23, 2024
d77c3a4
offset storage fix
muralibasani Sep 24, 2024
3c827e7
Adding avro support, test
muralibasani Sep 26, 2024
8c2b87a
Update avro format
muralibasani Sep 27, 2024
f8e2ac3
Adding json support
muralibasani Sep 28, 2024
0102aa5
Refactor source task
muralibasani Sep 30, 2024
0687bec
Refactor classes
muralibasani Oct 1, 2024
914c2cd
Adding parquet
muralibasani Oct 3, 2024
828d0ea
Refactor with new output writer classes
muralibasani Oct 5, 2024
4dabbf6
Topic derive if not found
muralibasani Oct 5, 2024
f9aed28
Add enum for output formats
muralibasani Oct 6, 2024
97551a5
Update from review
muralibasani Oct 7, 2024
58bcc39
update deprecated code
muralibasani Oct 7, 2024
0568832
s3-source-connector/build.gradle.kts
muralibasani Oct 8, 2024
daf9bdb
From review, refactor, improve
muralibasani Oct 8, 2024
00ca293
Refactor based on review
muralibasani Oct 8, 2024
25e5208
Adding unit tests for sourcetask
muralibasani Oct 9, 2024
6d307ff
Add update unit tests
muralibasani Oct 9, 2024
7c6743b
Add tests for FileReader, OffsetMgr
muralibasani Oct 9, 2024
e1e15bf
Adding tests
muralibasani Oct 9, 2024
8bcfa72
Adding parquet unit tests
muralibasani Oct 9, 2024
148ae64
Skip failed objects
muralibasani Oct 9, 2024
db7187f
Refactor writer classes
muralibasani Oct 10, 2024
e668c5f
fix offset storage
muralibasani Oct 11, 2024
6953c75
refactor offset calls
muralibasani Oct 12, 2024
1f6056f
Fix offsets for unique object keys
muralibasani Oct 13, 2024
4b23dcc
fix object key in map
muralibasani Oct 14, 2024
8e618ee
fix for invalid object names
muralibasani Oct 14, 2024
626e5ee
update pattern for uniqueness
muralibasani Oct 14, 2024
816a393
fix offset maps on final source recs
muralibasani Oct 14, 2024
b398804
Fix iterator
muralibasani Oct 15, 2024
fee503e
Introduce max message bytes config for bytes format
muralibasani Oct 15, 2024
1017d35
Add test for bytearray with chunks based on max messsage bytes
muralibasani Oct 15, 2024
af7ff5d
Fixes from review
muralibasani Oct 16, 2024
7ad5af7
Changed output dir to input, added new test for max message bytes
muralibasani Oct 17, 2024
9bc403e
Remove logic for processed recs
muralibasani Oct 17, 2024
fe6a180
Removed intermediate ConsumerRecord
muralibasani Oct 17, 2024
440c18d
Removed commented code
muralibasani Oct 17, 2024
bce849f
Updated from review
muralibasani Oct 18, 2024
6e12b3d
Rename writers
muralibasani Oct 18, 2024
9dda155
Lazy file reader iterator, review changes
muralibasani Oct 23, 2024
6332682
chore: enabling GH actions for PRs into S3 source feature branch
AnatolyPopov Nov 5, 2024
b4f91f9
Improve logging and some exception handling for clarity
muralibasani Nov 6, 2024
f7b097c
tests: migration to EmbeddedConnectCluster for integration tests
AnatolyPopov Nov 7, 2024
68dee0e
FileReader improvements
AnatolyPopov Nov 8, 2024
274fc34
Updating with tasks test
muralibasani Nov 8, 2024
0079f28
From review
muralibasani Nov 12, 2024
387ba6c
Inline invoke
muralibasani Nov 12, 2024
ced53a2
Integration tests clean up
AnatolyPopov Nov 12, 2024
eba30cf
feat: Use karapace schema registry for testing
RyanSkraba Nov 8, 2024
4940cbc
Use recent version of Karapace docker
RyanSkraba Nov 14, 2024
49ec94e
Flaky integration tests fix
AnatolyPopov Nov 13, 2024
8801bba
Multi tasks distribution of s3 objs (#327)
muralibasani Nov 21, 2024
a31b0ca
[KCON35] : Improvement : Read files with stream instead of loading it…
muralibasani Nov 22, 2024
4afb061
Use AssertJ for tests
RyanSkraba Nov 25, 2024
517b8d0
Use AssertJ for tests (#357)
muralibasani Nov 25, 2024
413b1fe
Integration tests improvements (#354)
AnatolyPopov Nov 26, 2024
d826306
Read large avro files [KCON-64] (#359)
muralibasani Nov 28, 2024
d2b8e55
Move source configuration and transformers into common config (#360)
aindriu-aiven Dec 5, 2024
17d69bb
Update to add errors.tolerance configuration
aindriu-aiven Dec 5, 2024
076e424
Split api layer from file reader (#365)
aindriu-aiven Dec 6, 2024
a67f27b
Correct check on validator
aindriu-aiven Dec 6, 2024
3a1853a
Skip records if already processed [KCON-36] (#356)
muralibasani Dec 10, 2024
31dd0ab
Update to add errors.tolerance configuration (#369)
RyanSkraba Dec 11, 2024
00064c6
Changes to fix Transformer streaming
Dec 13, 2024
eb519c3
removed unneeded test
Dec 16, 2024
24df6ab
Configure s3 api to use AWS Prefix (#370)
aindriu-aiven Dec 16, 2024
b99218d
Transformer streaming fix (#372)
muralibasani Dec 16, 2024
7ef4e31
Remove converters instantiation [KCON-25] (#368)
muralibasani Dec 17, 2024
9d78391
Add Service Loader for quick start up (#375)
aindriu-aiven Dec 19, 2024
e0184bb
Adding readme for s3 source [KCON-9] (#376)
muralibasani Dec 24, 2024
b4475c9
AWS SDK 2.X migration for source connector [KCON-84] (#374)
aindriu-aiven Dec 30, 2024
f6d3087
Improve S3 Source Integration tests (#382)
aindriu-aiven Jan 3, 2025
c4604f4
Polling efficiency (#378)
Claudenw Jan 9, 2025
6b967d3
Tasks assignment strategy - commons integration - [KCON-63] (#384)
muralibasani Jan 14, 2025
343f23c
Update the S3 source connector to use a later version of Kafka (#383)
aindriu-aiven Jan 20, 2025
f68b0a4
Adding the context to the source connectors (#388)
aindriu-aiven Jan 20, 2025
1ec8ed7
Attempt to fix backoff testing issue (#389)
Claudenw Jan 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ name: "CodeQL"

on:
push:
branches: [main]
branches:
- main
- s3-source-release
pull_request:
# The branches below must be a subset of the branches above
branches: [main]
branches:
- main
- s3-source-release
schedule:
- cron: "42 20 * * 6"

Expand Down
10 changes: 7 additions & 3 deletions .github/workflows/main_push_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
name: Main and pull request checks
on:
push:
branches: [ main ]
branches:
- main
- s3-source-release
pull_request:
branches: [ main ]
branches:
- main
- s3-source-release
jobs:
build:
strategy:
Expand All @@ -30,4 +34,4 @@ jobs:
run: ./gradlew build test
- name: Build in Linux
if: runner.os == 'Linux'
run: ./gradlew build check test integrationTest
run: ./gradlew build check test integrationTest -i
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [Aiven GCS Sink Connector](./gcs-sink-connector/README.md)
- [Aiven S3 Sink Connector](./s3-sink-connector/README.md)
- [Aiven Azure Blob Sink Connector](./azure-sink-connector/README.md)
- [Aiven S3 Source Connector](./s3-source-connector/README.md)

# Development

Expand Down
6 changes: 4 additions & 2 deletions commons/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dependencies {
implementation(confluent.kafka.connect.avro.data) {
exclude(group = "org.apache.kafka", module = "kafka-clients")
}

implementation("commons-io:commons-io:2.18.0")
implementation(tools.spotbugs.annotations)
implementation(compressionlibs.snappy)
implementation(compressionlibs.zstd.jni)
Expand All @@ -41,6 +41,7 @@ dependencies {
exclude(group = "org.slf4j", module = "slf4j-api")
exclude(group = "org.apache.avro", module = "avro")
}

implementation(apache.hadoop.common) {
exclude(group = "org.apache.hadoop.thirdparty", module = "hadoop-shaded-protobuf_3_7")
exclude(group = "com.google.guava", module = "guava")
Expand Down Expand Up @@ -86,11 +87,12 @@ dependencies {
testImplementation(jackson.databind)
testImplementation(testinglibs.mockito.core)
testImplementation(testinglibs.assertj.core)
testImplementation(testinglibs.awaitility)
testImplementation(testFixtures(project(":commons")))

testImplementation(testinglibs.woodstox.stax2.api)
testImplementation(apache.hadoop.mapreduce.client.core)
testImplementation(confluent.kafka.connect.avro.converter)
testImplementation("org.mockito:mockito-junit-jupiter:5.14.2")

testRuntimeOnly(testinglibs.junit.jupiter.engine)
testRuntimeOnly(logginglibs.logback.classic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
public class CommonConfig extends AbstractConfig {
protected static final String GROUP_COMPRESSION = "File Compression";
protected static final String GROUP_FORMAT = "Format";
public static final String TASK_ID = "task.id";
public static final String MAX_TASKS = "tasks.max";

/**
* @deprecated No longer needed.
Expand Down Expand Up @@ -58,4 +60,25 @@ public Long getKafkaRetryBackoffMs() {
return new BackoffPolicyConfig(this).getKafkaRetryBackoffMs();
}

/**
*
* Get the maximum number of tasks that should be run by this connector configuration Max Tasks is set within the
* Kafka Connect framework and so is retrieved slightly differently in ConnectorConfig.java
*
* @return The maximum number of tasks that should be run by this connector configuration
*/
public int getMaxTasks() {
// TODO when Connect framework is upgraded it will be possible to retrieve this information from the configDef
// as tasksMax
return Integer.parseInt(this.originalsStrings().get(MAX_TASKS));
}
/**
* Get the task id for this configuration
*
* @return The task id for this configuration
*/
public int getTaskId() {
return Integer.parseInt(this.originalsStrings().get(TASK_ID));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ public final class FileNameFragment extends ConfigFragment {
static final String FILE_MAX_RECORDS = "file.max.records";
static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}";

public static final String FILE_PATH_PREFIX_TEMPLATE_CONFIG = "file.prefix.template";
static final String DEFAULT_FILE_PATH_PREFIX_TEMPLATE = "topics/{{topic}}/partition={{partition}}/";

public FileNameFragment(final AbstractConfig cfg) {
super(cfg);
}
Expand Down Expand Up @@ -109,9 +112,18 @@ public void ensureValid(final String name, final Object value) {
configDef.define(FILE_NAME_TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(),
new TimestampSourceValidator(), ConfigDef.Importance.LOW,
"Specifies the the timestamp variable source. Default is wall-clock.", GROUP_FILE, fileGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE);

configDef.define(FILE_PATH_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, DEFAULT_FILE_PATH_PREFIX_TEMPLATE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"The template for file prefix on S3. "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic` and `partition` "
+ "and are mandatory to have these in the directory structure."
+ "Example prefix : topics/{{topic}}/partition/{{partition}}/",
GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.LONG, FILE_PATH_PREFIX_TEMPLATE_CONFIG);

return configDef;
}

Expand Down Expand Up @@ -185,4 +197,8 @@ public int getMaxRecordsPerFile() {
return cfg.getInt(FILE_MAX_RECORDS);
}

public String getFilePathPrefixTemplateConfig() {
return cfg.getString(FILE_PATH_PREFIX_TEMPLATE_CONFIG);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.config;

import java.util.Locale;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.source.input.InputFormat;

public final class SchemaRegistryFragment extends ConfigFragment {
private static final String SCHEMAREGISTRY_GROUP = "Schema registry group";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String VALUE_CONVERTER_SCHEMA_REGISTRY_URL = "value.converter.schema.registry.url";
public static final String AVRO_VALUE_SERIALIZER = "value.serializer";
public static final String INPUT_FORMAT_KEY = "input.format";
public static final String SCHEMAS_ENABLE = "schemas.enable";

/**
* Construct the ConfigFragment..
*
* @param cfg
* the configuration that this fragment is associated with.
*/
public SchemaRegistryFragment(final AbstractConfig cfg) {
super(cfg);
}

public static ConfigDef update(final ConfigDef configDef) {
int srCounter = 0;
configDef.define(SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", SCHEMAREGISTRY_GROUP, srCounter++,
ConfigDef.Width.NONE, SCHEMA_REGISTRY_URL);
configDef.define(VALUE_CONVERTER_SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL",
SCHEMAREGISTRY_GROUP, srCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL);
configDef.define(INPUT_FORMAT_KEY, ConfigDef.Type.STRING, InputFormat.BYTES.getValue(),
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"Input format of messages read from source avro/json/parquet/bytes", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
ConfigDef.Width.NONE, INPUT_FORMAT_KEY);

configDef.define(AVRO_VALUE_SERIALIZER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM,
"Avro value serializer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, AVRO_VALUE_SERIALIZER);
return configDef;
}

public InputFormat getInputFormat() {
return InputFormat.valueOf(cfg.getString(INPUT_FORMAT_KEY).toUpperCase(Locale.ROOT));
}

public String getSchemaRegistryUrl() {
return cfg.getString(SCHEMA_REGISTRY_URL);
}

public Class<?> getAvroValueSerializer() {
return cfg.getClass(AVRO_VALUE_SERIALIZER);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,66 @@

import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.common.source.task.DistributionType;

public class SourceCommonConfig extends CommonConfig {

private final SchemaRegistryFragment schemaRegistryFragment;
private final SourceConfigFragment sourceConfigFragment;
private final FileNameFragment fileNameFragment;
private final OutputFormatFragment outputFormatFragment;

public SourceCommonConfig(ConfigDef definition, Map<?, ?> originals) {// NOPMD
super(definition, originals);
// Construct Fragments
schemaRegistryFragment = new SchemaRegistryFragment(this);
sourceConfigFragment = new SourceConfigFragment(this);
fileNameFragment = new FileNameFragment(this);
outputFormatFragment = new OutputFormatFragment(this);

validate(); // NOPMD ConstructorCallsOverridableMethod
}

private void validate() {
schemaRegistryFragment.validate();
sourceConfigFragment.validate();
fileNameFragment.validate();
outputFormatFragment.validate();
}

public InputFormat getInputFormat() {
return schemaRegistryFragment.getInputFormat();
}

public String getSchemaRegistryUrl() {
return schemaRegistryFragment.getSchemaRegistryUrl();
}

public String getTargetTopics() {
return sourceConfigFragment.getTargetTopics();
}
public String getTargetTopicPartitions() {
return sourceConfigFragment.getTargetTopicPartitions();
}

public ErrorsTolerance getErrorsTolerance() {
return sourceConfigFragment.getErrorsTolerance();
}

public DistributionType getDistributionType() {
return sourceConfigFragment.getDistributionType();
}

public int getMaxPollRecords() {
return sourceConfigFragment.getMaxPollRecords();
}

public Transformer getTransformer() {
return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat());
}

}
Loading
Loading