Skip to content

Commit

Permalink
Update method signatures to match Kafka Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 27, 2025
1 parent 0a79c08 commit b37885f
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
jobs:
build-and-publish:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.43.0
uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.56.0
with:
java-version: 17
secrets:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:
jobs:
java-gradle-release:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.43.0
uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.56.0
with:
java-version: 17
release-type: "${{ inputs.release-type }}"
Expand Down
8 changes: 4 additions & 4 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
plugins {
id("com.bakdata.release") version "1.4.0"
id("com.bakdata.sonar") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.1"
id("io.freefair.lombok") version "8.4"
id("com.bakdata.release") version "1.6.1"
id("com.bakdata.sonar") version "1.6.1"
id("com.bakdata.sonatype") version "1.6.1"
id("io.freefair.lombok") version "8.11"
}

allprojects {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2020 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -46,7 +46,9 @@
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class ErrorCapturingFlatKeyValueMapper<K, V, KR, VR>
implements KeyValueMapper<K, V, Iterable<KeyValue<KR, ProcessedKeyValue<K, V, VR>>>> {
private final @NonNull KeyValueMapper<? super K, ? super V, ? extends Iterable<KeyValue<KR, VR>>> wrapped;
private final @NonNull KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ?
extends VR>>>
wrapped;
private final @NonNull Predicate<Exception> errorFilter;

/**
Expand All @@ -63,7 +65,9 @@ public final class ErrorCapturingFlatKeyValueMapper<K, V, KR, VR>
* @see ErrorUtil#isRecoverable(Exception)
*/
public static <K, V, KR, VR> KeyValueMapper<K, V, Iterable<KeyValue<KR, ProcessedKeyValue<K, V, VR>>>>
captureErrors(final @NonNull KeyValueMapper<? super K, ? super V, ? extends Iterable<KeyValue<KR, VR>>> mapper) {
captureErrors(
final @NonNull KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ?
extends VR>>> mapper) {
return captureErrors(mapper, ErrorUtil::isRecoverable);
}

Expand All @@ -87,15 +91,18 @@ public final class ErrorCapturingFlatKeyValueMapper<K, V, KR, VR>
* @return {@code KeyValueMapper}
*/
public static <K, V, KR, VR> KeyValueMapper<K, V, Iterable<KeyValue<KR, ProcessedKeyValue<K, V, VR>>>>
captureErrors(final @NonNull KeyValueMapper<? super K, ? super V, ? extends Iterable<KeyValue<KR, VR>>> mapper,
captureErrors(
final @NonNull KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ?
extends VR>>> mapper,
final @NonNull Predicate<Exception> errorFilter) {
return new ErrorCapturingFlatKeyValueMapper<>(mapper, errorFilter);
}

@Override
public Iterable<KeyValue<KR, ProcessedKeyValue<K, V, VR>>> apply(final K key, final V value) {
try {
final Iterable<KeyValue<KR, VR>> newKeyValues = this.wrapped.apply(key, value);
final Iterable<? extends KeyValue<? extends KR, ? extends VR>> newKeyValues =
this.wrapped.apply(key, value);
return Seq.seq(newKeyValues)
.map(kv -> KeyValue.pair(kv.key, SuccessKeyValue.of(kv.value)));
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2020 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -43,7 +43,7 @@
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class ErrorCapturingFlatValueMapper<V, VR> implements ValueMapper<V, Iterable<ProcessedValue<V, VR>>> {
private final @NonNull ValueMapper<? super V, ? extends Iterable<VR>> wrapped;
private final @NonNull ValueMapper<? super V, ? extends Iterable<? extends VR>> wrapped;
private final @NonNull Predicate<Exception> errorFilter;

/**
Expand All @@ -58,7 +58,7 @@ public final class ErrorCapturingFlatValueMapper<V, VR> implements ValueMapper<V
* @see ErrorUtil#isRecoverable(Exception)
*/
public static <V, VR> ValueMapper<V, Iterable<ProcessedValue<V, VR>>> captureErrors(
final @NonNull ValueMapper<? super V, ? extends Iterable<VR>> mapper) {
final @NonNull ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper) {
return captureErrors(mapper, ErrorUtil::isRecoverable);
}

Expand All @@ -80,15 +80,15 @@ public static <V, VR> ValueMapper<V, Iterable<ProcessedValue<V, VR>>> captureErr
* @return {@code ValueMapper}
*/
public static <V, VR> ValueMapper<V, Iterable<ProcessedValue<V, VR>>> captureErrors(
final @NonNull ValueMapper<? super V, ? extends Iterable<VR>> mapper,
final @NonNull ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper,
final @NonNull Predicate<Exception> errorFilter) {
return new ErrorCapturingFlatValueMapper<>(mapper, errorFilter);
}

@Override
public Iterable<ProcessedValue<V, VR>> apply(final V value) {
try {
final Iterable<VR> newValues = this.wrapped.apply(value);
final Iterable<? extends VR> newValues = this.wrapped.apply(value);
return seq(newValues).map(SuccessValue::of);
} catch (final Exception e) {
if (this.errorFilter.test(e)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2020 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -44,7 +44,7 @@
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class ErrorCapturingFlatValueMapperWithKey<K, V, VR>
implements ValueMapperWithKey<K, V, Iterable<ProcessedValue<V, VR>>> {
private final @NonNull ValueMapperWithKey<? super K, ? super V, ? extends Iterable<VR>> wrapped;
private final @NonNull ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> wrapped;
private final @NonNull Predicate<Exception> errorFilter;

/**
Expand All @@ -60,7 +60,7 @@ public final class ErrorCapturingFlatValueMapperWithKey<K, V, VR>
* @see ErrorUtil#isRecoverable(Exception)
*/
public static <K, V, VR> ValueMapperWithKey<K, V, Iterable<ProcessedValue<V, VR>>> captureErrors(
final @NonNull ValueMapperWithKey<? super K, ? super V, ? extends Iterable<VR>> mapper) {
final @NonNull ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) {
return captureErrors(mapper, ErrorUtil::isRecoverable);
}

Expand All @@ -83,15 +83,15 @@ public static <K, V, VR> ValueMapperWithKey<K, V, Iterable<ProcessedValue<V, VR>
* @return {@code ValueMapperWithKey}
*/
public static <K, V, VR> ValueMapperWithKey<K, V, Iterable<ProcessedValue<V, VR>>> captureErrors(
final @NonNull ValueMapperWithKey<? super K, ? super V, ? extends Iterable<VR>> mapper,
final @NonNull ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper,
final @NonNull Predicate<Exception> errorFilter) {
return new ErrorCapturingFlatValueMapperWithKey<>(mapper, errorFilter);
}

@Override
public Iterable<ProcessedValue<V, VR>> apply(final K key, final V value) {
try {
final Iterable<VR> newValues = this.wrapped.apply(key, value);
final Iterable<? extends VR> newValues = this.wrapped.apply(key, value);
return Seq.seq(newValues).map(SuccessValue::of);
} catch (final Exception e) {
if (this.errorFilter.test(e)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2020 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -44,7 +44,7 @@
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class ErrorCapturingKeyValueMapper<K, V, KR, VR>
implements KeyValueMapper<K, V, KeyValue<KR, ProcessedKeyValue<K, V, VR>>> {
private final @NonNull KeyValueMapper<? super K, ? super V, ? extends KeyValue<KR, VR>> wrapped;
private final @NonNull KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> wrapped;
private final @NonNull Predicate<Exception> errorFilter;

/**
Expand All @@ -61,7 +61,7 @@ public final class ErrorCapturingKeyValueMapper<K, V, KR, VR>
* @see ErrorUtil#isRecoverable(Exception)
*/
public static <K, V, KR, VR> KeyValueMapper<K, V, KeyValue<KR, ProcessedKeyValue<K, V, VR>>> captureErrors(
final @NonNull KeyValueMapper<? super K, ? super V, ? extends KeyValue<KR, VR>> mapper) {
final @NonNull KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
return captureErrors(mapper, ErrorUtil::isRecoverable);
}

Expand All @@ -85,15 +85,15 @@ public static <K, V, KR, VR> KeyValueMapper<K, V, KeyValue<KR, ProcessedKeyValue
* @return {@code KeyValueMapper}
*/
public static <K, V, KR, VR> KeyValueMapper<K, V, KeyValue<KR, ProcessedKeyValue<K, V, VR>>> captureErrors(
final @NonNull KeyValueMapper<? super K, ? super V, ? extends KeyValue<KR, VR>> mapper,
final @NonNull KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
final @NonNull Predicate<Exception> errorFilter) {
return new ErrorCapturingKeyValueMapper<>(mapper, errorFilter);
}

@Override
public KeyValue<KR, ProcessedKeyValue<K, V, VR>> apply(final K key, final V value) {
try {
final KeyValue<KR, VR> newKeyValue = this.wrapped.apply(key, value);
final KeyValue<? extends KR, ? extends VR> newKeyValue = this.wrapped.apply(key, value);
final ProcessedKeyValue<K, V, VR> recordWithOldKey = SuccessKeyValue.of(newKeyValue.value);
return KeyValue.pair(newKeyValue.key, recordWithOldKey);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -44,6 +44,8 @@
* @param <VR> type of output values
* @see #captureErrors(Processor)
* @see #captureErrors(Processor, Predicate)
* @see #captureErrors(ProcessorSupplier)
* @see #captureErrors(ProcessorSupplier, Predicate)
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class ErrorCapturingProcessor<K, V, KR, VR>
Expand All @@ -66,7 +68,7 @@ public final class ErrorCapturingProcessor<K, V, KR, VR>
* @see ErrorUtil#isRecoverable(Exception)
*/
public static <K, V, KR, VR> Processor<K, V, KR, ProcessedKeyValue<K, V, VR>> captureErrors(
final @NonNull Processor<K, V, KR, VR> processor) {
final @NonNull Processor<? super K, ? super V, KR, VR> processor) {
return captureErrors(processor, ErrorUtil::isRecoverable);
}

Expand All @@ -89,9 +91,9 @@ public static <K, V, KR, VR> Processor<K, V, KR, ProcessedKeyValue<K, V, VR>> ca
* @return {@code Processor}
*/
public static <K, V, KR, VR> Processor<K, V, KR, ProcessedKeyValue<K, V, VR>> captureErrors(
final @NonNull Processor<K, V, KR, VR> processor,
final @NonNull Processor<? super K, ? super V, KR, VR> processor,
final @NonNull Predicate<Exception> errorFilter) {
return new ErrorCapturingProcessor<>(processor, errorFilter);
return new ErrorCapturingProcessor<>((Processor<K, V, KR, VR>) processor, errorFilter);
}

/**
Expand All @@ -108,7 +110,7 @@ public static <K, V, KR, VR> Processor<K, V, KR, ProcessedKeyValue<K, V, VR>> ca
* @see ErrorUtil#isRecoverable(Exception)
*/
public static <K, V, KR, VR> ProcessorSupplier<K, V, KR, ProcessedKeyValue<K, V, VR>> captureErrors(
final @NonNull ProcessorSupplier<K, V, KR, VR> supplier) {
final @NonNull ProcessorSupplier<? super K, ? super V, KR, VR> supplier) {
return captureErrors(supplier, ErrorUtil::isRecoverable);
}

Expand All @@ -132,7 +134,7 @@ public static <K, V, KR, VR> ProcessorSupplier<K, V, KR, ProcessedKeyValue<K, V,
* @return {@code ProcessorSupplier}
*/
public static <K, V, KR, VR> ProcessorSupplier<K, V, KR, ProcessedKeyValue<K, V, VR>> captureErrors(
final @NonNull ProcessorSupplier<K, V, KR, VR> supplier,
final @NonNull ProcessorSupplier<? super K, ? super V, KR, VR> supplier,
final @NonNull Predicate<Exception> errorFilter) {
return new ProcessorSupplier<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -43,6 +43,8 @@
* @param <VR> type of output values
* @see #captureErrors(FixedKeyProcessor)
* @see #captureErrors(FixedKeyProcessor, Predicate)
* @see #captureErrors(FixedKeyProcessorSupplier)
* @see #captureErrors(FixedKeyProcessorSupplier, Predicate)
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class ErrorCapturingValueProcessor<K, V, VR>
Expand All @@ -64,7 +66,7 @@ public final class ErrorCapturingValueProcessor<K, V, VR>
* @see ErrorUtil#isRecoverable(Exception)
*/
public static <K, V, VR> FixedKeyProcessor<K, V, ProcessedValue<V, VR>> captureErrors(
final @NonNull FixedKeyProcessor<K, V, VR> processor) {
final @NonNull FixedKeyProcessor<? super K, ? super V, VR> processor) {
return captureErrors(processor, ErrorUtil::isRecoverable);
}

Expand All @@ -86,9 +88,9 @@ public static <K, V, VR> FixedKeyProcessor<K, V, ProcessedValue<V, VR>> captureE
* @return {@code FixedKeyProcessor}
*/
public static <K, V, VR> FixedKeyProcessor<K, V, ProcessedValue<V, VR>> captureErrors(
final @NonNull FixedKeyProcessor<K, V, VR> processor,
final @NonNull FixedKeyProcessor<? super K, ? super V, VR> processor,
final @NonNull Predicate<Exception> errorFilter) {
return new ErrorCapturingValueProcessor<>(processor, errorFilter);
return new ErrorCapturingValueProcessor<>((FixedKeyProcessor<K, V, VR>) processor, errorFilter);
}

/**
Expand All @@ -104,7 +106,7 @@ public static <K, V, VR> FixedKeyProcessor<K, V, ProcessedValue<V, VR>> captureE
* @see ErrorUtil#isRecoverable(Exception)
*/
public static <K, V, VR> FixedKeyProcessorSupplier<K, V, ProcessedValue<V, VR>> captureErrors(
final @NonNull FixedKeyProcessorSupplier<K, V, VR> supplier) {
final @NonNull FixedKeyProcessorSupplier<? super K, ? super V, VR> supplier) {
return captureErrors(supplier, ErrorUtil::isRecoverable);
}

Expand All @@ -127,7 +129,7 @@ public static <K, V, VR> FixedKeyProcessorSupplier<K, V, ProcessedValue<V, VR>>
* @return {@code FixedKeyProcessorSupplier}
*/
public static <K, V, VR> FixedKeyProcessorSupplier<K, V, ProcessedValue<V, VR>> captureErrors(
final @NonNull FixedKeyProcessorSupplier<K, V, VR> supplier,
final @NonNull FixedKeyProcessorSupplier<? super K, ? super V, VR> supplier,
final @NonNull Predicate<Exception> errorFilter) {
return new FixedKeyProcessorSupplier<>() {
@Override
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
3 changes: 2 additions & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.12.1-all.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Loading

0 comments on commit b37885f

Please sign in to comment.