Skip to content

Commit

Permalink
fix: retry get shadow accepted operation
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 committed Oct 26, 2023
1 parent f71adfe commit d022870
Showing 1 changed file with 151 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.inject.Inject;

Expand All @@ -60,7 +62,6 @@ public class CISShadowMonitor implements Consumer<NetworkStateProvider.Connectio
private static final long WAIT_TIME_TO_SUBSCRIBE_AGAIN_IN_MS = Duration.ofMinutes(2).toMillis();
private static final Random JITTER = new Random();
static final String SHADOW_UPDATE_DELTA_TOPIC = "$aws/things/%s/shadow/update/delta";
static final String SHADOW_GET_ACCEPTED_TOPIC = "$aws/things/%s/shadow/get/accepted";

private static final RetryUtils.RetryConfig GET_CONNECTIVITY_RETRY_CONFIG =
RetryUtils.RetryConfig.builder().initialRetryInterval(Duration.ofMinutes(1L))
Expand All @@ -70,6 +71,7 @@ public class CISShadowMonitor implements Consumer<NetworkStateProvider.Connectio

private MqttClientConnection connection;
private IotShadowClient iotShadowClient;
private RefreshConnectivityInfoTask refreshConnectivityInfoTask;
private String lastVersion;
private Future<?> subscribeTaskFuture;
private final List<CertificateGenerator> monitoredCertificateGenerators = new CopyOnWriteArrayList<>();
Expand All @@ -92,15 +94,20 @@ public CISShadowMonitor(MqttClient mqttClient, ExecutorService executorService,
connectivityInformation);
this.connection = new WrapperMqttClientConnection(mqttClient);
this.iotShadowClient = new IotShadowClient(this.connection);
this.refreshConnectivityInfoTask = new RefreshConnectivityInfoTask(
shadowName, this::processCISShadow, iotShadowClient, executorService);
}

// TODO cleanup constructors
CISShadowMonitor(MqttClientConnection connection, IotShadowClient iotShadowClient, ExecutorService executorService,
String shadowName, ConnectivityInformation connectivityInformation) {
this.connection = connection;
this.iotShadowClient = iotShadowClient;
this.executorService = executorService;
this.shadowName = shadowName;
this.connectivityInformation = connectivityInformation;
this.refreshConnectivityInfoTask = new RefreshConnectivityInfoTask(
shadowName, this::processCISShadow, iotShadowClient, executorService);
}

/**
Expand All @@ -113,12 +120,12 @@ public void startMonitor() {
subscribeTaskFuture = executorService.submit(() -> {
try {
subscribeToShadowTopics();
publishToGetCISShadowTopic();
} catch (InterruptedException e) {
LOGGER.atWarn().cause(e).log("Interrupted while subscribing to CIS shadow topics");
Thread.currentThread().interrupt();
}
});
refreshConnectivityInfoTask.run();
}

/**
Expand All @@ -128,6 +135,7 @@ public void stopMonitor() {
if (subscribeTaskFuture != null) {
subscribeTaskFuture.cancel(true);
}
refreshConnectivityInfoTask.stop();
unsubscribeFromShadowTopics();
}

Expand Down Expand Up @@ -165,14 +173,6 @@ private void subscribeToShadowTopics() throws InterruptedException {
.log("Error processing shadowDeltaUpdatedSubscription Response", e))
.get(TIMEOUT_FOR_SUBSCRIBING_TO_TOPICS_SECONDS, TimeUnit.SECONDS);
LOGGER.info("Subscribed to shadow update delta topic");

GetShadowSubscriptionRequest getShadowSubscriptionRequest = new GetShadowSubscriptionRequest();
getShadowSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToGetShadowAccepted(getShadowSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE, this::processCISShadow,
(e) -> LOGGER.atError().log("Error processing getShadowSubscription Response", e))
.get(TIMEOUT_FOR_SUBSCRIBING_TO_TOPICS_SECONDS, TimeUnit.SECONDS);
LOGGER.info("Subscribed to shadow get accepted topic");
return;

} catch (ExecutionException e) {
Expand Down Expand Up @@ -315,31 +315,158 @@ private void updateCISShadowReportedState(Map<String, Object> reportedState) {
});
}

private void publishToGetCISShadowTopic() {
LOGGER.atDebug().log("Publishing to get shadow topic");
GetShadowRequest getShadowRequest = new GetShadowRequest();
getShadowRequest.thingName = shadowName;
iotShadowClient.PublishGetShadow(getShadowRequest, QualityOfService.AT_LEAST_ONCE).exceptionally(e -> {
LOGGER.atWarn().cause(e).log("Unable to retrieve CIS shadow");
return null;
});
}

private void unsubscribeFromShadowTopics() {
if (connection != null) {
LOGGER.atDebug().log("Unsubscribing from CIS shadow topics");
String topic = String.format(SHADOW_UPDATE_DELTA_TOPIC, shadowName);
connection.unsubscribe(topic);

topic = String.format(SHADOW_GET_ACCEPTED_TOPIC, shadowName);
connection.unsubscribe(topic);
}
}

@Override
public void accept(NetworkStateProvider.ConnectionState state) {
if (state == NetworkStateProvider.ConnectionState.NETWORK_UP) {
publishToGetCISShadowTopic();
refreshConnectivityInfoTask.run();
}
}

private static class RefreshConnectivityInfoTask {

private CompletableFuture<Integer> subscribeToShadowTopic;
private CompletableFuture<Integer> publishToShadowTopic;
private CompletableFuture<?> task;

private final AtomicReference<CountDownLatch> shadowReceived = new AtomicReference<>();

private final String shadowName;
private final Consumer<GetShadowResponse> handler;
private final IotShadowClient client;
private final ExecutorService executor;

RefreshConnectivityInfoTask(String shadowName,
Consumer<GetShadowResponse> handler,
IotShadowClient client,
ExecutorService executor) {
this.shadowName = shadowName;
this.handler = handler;
this.client = client;
this.executor = executor;
}

synchronized CompletableFuture<?> run() {
if (task == null || task.isDone()) {
task = subscribeToGetShadowTopic()
.thenCompose(ignore -> {
resetShadowReceived();
return getCISShadow();
});
}
return task;
}

synchronized void stop() {
if (task != null) {
task.cancel(false);
}
}

// TODO refactor pattern into utility class
private synchronized CompletableFuture<Integer> subscribeToGetShadowTopic() {
if (subscribeToShadowTopic == null) {
subscribeToShadowTopic = doSubscribeToGetShadowTopic();
}
return subscribeToShadowTopic;
}

private CompletableFuture<Integer> doSubscribeToGetShadowTopic() {
GetShadowSubscriptionRequest request = new GetShadowSubscriptionRequest();
request.thingName = shadowName;
return client.SubscribeToGetShadowAccepted(
request,
QualityOfService.AT_LEAST_ONCE,
this::onShadow,
e -> LOGGER.atError().cause(e)
.log("Unexpected error while processing get shadow accepted response"))
.handleAsync((res, e) -> {
if (e == null) {
LOGGER.atDebug().kv("thingName", shadowName)
.log("subscribed to get shadow accepted topic");
return CompletableFuture.completedFuture(res);
} else {
// TODO backoff
return doSubscribeToGetShadowTopic();
}
}, executor)
.thenCompose(x -> x);
}

private void onShadow(GetShadowResponse response) {
reportShadowReceived();
handler.accept(response);
}

private void reportShadowReceived() {
CountDownLatch shadowReceived = this.shadowReceived.get();
if (shadowReceived != null) {
shadowReceived.countDown();
}
}

private void resetShadowReceived() {
shadowReceived.set(new CountDownLatch(1));
}

private CompletableFuture<Void> getCISShadow() {
// getting the CIS shadow is a two part operation:
// 1) request the shadow over QOS1 MQTT connection
// 2) receive shadow over QOS1 MQTT connection
//
// Since these are separate MQTT operations, we need to track success of both,
// hence waiting for a latch.
//
// Retrying is important here, as we get CIS shadow on startup, and we may be offline,
// or there may be race conditions where nucleus MQTT client resets
// due to a configuration change.
//
// The consequence of not updating certs on startup is that TLS handshakes involving the certs
// will always fail until the next CIS update, which may never happen.
return publishToGetShadowTopic()
.handleAsync((ignore, e) -> {
try {
if (shadowReceived.get().await(10L, TimeUnit.SECONDS)) {
return CompletableFuture.<Void>completedFuture(null);
}
} catch (InterruptedException e2) {
CompletableFuture<Void> failed = new CompletableFuture<>();
failed.completeExceptionally(e2);
return failed;
}
// TODO backoff
return getCISShadow();
}, executor)
.thenCompose(x -> x);
}

private synchronized CompletableFuture<Integer> publishToGetShadowTopic() {
if (publishToShadowTopic == null || publishToShadowTopic.isDone()) {
publishToShadowTopic = doPublishToGetShadowTopic();
}
return publishToShadowTopic;
}

private CompletableFuture<Integer> doPublishToGetShadowTopic() {
GetShadowRequest getShadowRequest = new GetShadowRequest();
getShadowRequest.thingName = shadowName;
return client.PublishGetShadow(getShadowRequest, QualityOfService.AT_LEAST_ONCE)
.handle((res, e) -> {
if (e == null) {
return CompletableFuture.completedFuture(0);
} else {
// TODO exponential backoff, not on crt thread
return doPublishToGetShadowTopic();
}
})
.thenCompose(x -> x);
}
}
}

0 comments on commit d022870

Please sign in to comment.