Skip to content

Commit

Permalink
improved cloudwatch exporter retry functionality
Browse files Browse the repository at this point in the history
the retry will now timeout before any configured lambda timeout period
  • Loading branch information
cwensel committed Dec 19, 2023
1 parent 72fe2b2 commit 4de6ed3
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 6 deletions.
3 changes: 1 addition & 2 deletions clusterless-substrate-aws-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ dependencies {
implementation("com.fasterxml.jackson.datatype:jackson-datatype-joda")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")

implementation("io.github.resilience4j:resilience4j-retry")

api("io.github.resilience4j:resilience4j-retry")
api("com.jayway.jsonpath:json-path")

implementation("software.amazon.awssdk:s3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.AwsClient;
Expand All @@ -24,11 +25,45 @@ public class ClientRetry<C extends AwsClient> {
private final RetryConfig config;
private final String client;

@NotNull
public static IntervalFunction exponentialBackoff(Duration initialInterval, double multiplier, Duration maxInterval) {
return IntervalFunction.ofExponentialBackoff(initialInterval, multiplier, maxInterval);
}

public ClientRetry(String client, int maxAttempts, Duration fixed, Predicate<ClientBase<C>.Response> predicate) {
this(client, maxAttempts, IntervalFunction.of(fixed), predicate);
}

public ClientRetry(String client, int maxAttempts, Predicate<ClientBase<C>.Response> predicate) {
this(client, maxAttempts, exponentialBackoff(Duration.ofSeconds(30), 2.0, Duration.ofMinutes(5)), predicate);
}

public ClientRetry(String client, Duration maxDuration, IntervalFunction function, Predicate<ClientBase<C>.Response> predicate) {
this(client, maxAttempts(maxDuration, function), function, predicate);
}

protected static int maxAttempts(Duration maxDuration, IntervalFunction function) {
Duration duration = Duration.ZERO;

int count = 0;
while (duration.toMillis() < maxDuration.toMillis()) {
count++;
duration = duration.plus(Duration.ofMillis(function.apply(count)));
}

if (count == 0) {
LOG.warn("calculated max attempts are zero, for maxDuration: {}, returning value of 1 max attempts", maxDuration);
return 1;
}

return count;
}

public ClientRetry(String client, int maxAttempts, IntervalFunction function, Predicate<ClientBase<C>.Response> predicate) {
this.client = client;
this.config = RetryConfig.<ClientBase<C>.Response>custom()
.maxAttempts(maxAttempts)
.intervalFunction(IntervalFunction.ofExponentialBackoff(Duration.ofSeconds(30), 2.0, Duration.ofMinutes(5)))
.intervalFunction(function)
.consumeResultBeforeRetryAttempt((attempt, response) -> LOG.warn("got: {}, for retry attempt: {} of {}", response.errorMessage(), attempt, maxAttempts))
.retryOnResult(predicate)
.failAfterMaxAttempts(true)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2023 Chris K Wensel <[email protected]>. All Rights Reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package clusterless.cls.substrate.aws.sdk;

import io.github.resilience4j.core.IntervalFunction;
import org.junit.jupiter.api.Test;

import java.time.Duration;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class ClientRetryTest {
@Test
void max() {
assertEquals(2, ClientRetry.maxAttempts(Duration.ofSeconds(60), IntervalFunction.of(Duration.ofSeconds(30))));
assertEquals(2, ClientRetry.maxAttempts(Duration.ofSeconds(60), IntervalFunction.ofExponentialBackoff(30000, 2, 600000)));

IntervalFunction intervalFunction = IntervalFunction.ofExponentialBackoff(15000, 2, 600000);
// System.out.println("IntStream.range(1,3).mapToLong(intervalFunction::apply).sum() = " + IntStream.range(1, 3).mapToLong(intervalFunction::apply).peek(System.out::println).sum());
assertEquals(3, ClientRetry.maxAttempts(Duration.ofSeconds(60), intervalFunction));

IntervalFunction intervalFunction1 = IntervalFunction.ofExponentialBackoff(Duration.ofSeconds(3), 2, Duration.ofMinutes(1));
// System.out.println("IntStream.range(1,19).mapToLong(intervalFunction::apply).sum() = " + IntStream.range(1, 19).mapToLong(intervalFunction1::apply).peek(System.out::println).sum());
// System.out.println("Duration.ofMinutes(15).toMillis() = " + Duration.ofMinutes(15).toMillis());
assertEquals(19, ClientRetry.maxAttempts(Duration.ofMinutes(15), intervalFunction1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public CloudWatchExportActivityConstruct(@NotNull ManagedComponentContext contex
.withLogGroupName(model.logGroupName())
.withLogStreamPrefix(model.logStreamPrefix())
.withInterval(model.interval())
.withTimeoutMin(model().runtimeProps().timeoutMin())
.build();

Map<String, String> environment = Env.toEnv(activityProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class CloudWatchExportActivityProps implements Struct {
String logStreamPrefix;
@JsonRequiredProperty
URI pathURI;
int timeoutMin;

public static Builder builder() {
return Builder.builder();
Expand All @@ -42,11 +43,16 @@ public URI pathURI() {
return pathURI;
}

public int timeoutMin() {
return timeoutMin;
}

public static final class Builder {
String interval;
String logGroupName;
String logStreamPrefix;
URI pathURI;
int timeoutMin;

private Builder() {
}
Expand Down Expand Up @@ -75,11 +81,17 @@ public Builder withPathURI(URI pathURI) {
return this;
}

public Builder withTimeoutMin(int timeoutMin) {
this.timeoutMin = timeoutMin;
return this;
}

public CloudWatchExportActivityProps build() {
CloudWatchExportActivityProps cloudWatchExportActivityProps = new CloudWatchExportActivityProps();
cloudWatchExportActivityProps.pathURI = this.pathURI;
cloudWatchExportActivityProps.logGroupName = this.logGroupName;
cloudWatchExportActivityProps.logStreamPrefix = this.logStreamPrefix;
cloudWatchExportActivityProps.logGroupName = this.logGroupName;
cloudWatchExportActivityProps.pathURI = this.pathURI;
cloudWatchExportActivityProps.timeoutMin = this.timeoutMin;
cloudWatchExportActivityProps.interval = this.interval;
return cloudWatchExportActivityProps;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ protected CloudWatchExportActivityProps getProps() {
.withLogGroupName("test-log-group")
.withPathURI(URIs.create("s3", bucketName(), "/test-prefix/"))
.withInterval(IntervalUnit.TWELFTHS.name())
.withTimeoutMin(15)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ public class CloudWatchExportActivityHandler extends EventHandler<AWSEvent, Clou

protected final CloudWatchLogs cloudWatchLogs = new CloudWatchLogs();

protected final ClientRetry<CloudWatchLogsClient> retryClient = new ClientRetry<>("cloudwatchlogs", 5, r -> r.exception() instanceof LimitExceededException);
protected final ClientRetry<CloudWatchLogsClient> retryClient = new ClientRetry<>(
"cloudwatchlogs",
// attempt to match the lambda timeout period, minus a small fudge factor
Duration.ofMinutes(activityProps.timeoutMin()).minusSeconds(5),
ClientRetry.exponentialBackoff(Duration.ofSeconds(3), 2.0, Duration.ofMinutes(3)),
r -> r.exception() instanceof LimitExceededException
);

protected final IntervalBuilder intervalBuilder = new IntervalBuilder(activityProps.interval);

Expand Down

0 comments on commit 4de6ed3

Please sign in to comment.