Skip to content

Commit

Permalink
chore: don't subscribe when offline
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 committed Oct 27, 2023
1 parent bfb99c9 commit 4064718
Showing 1 changed file with 107 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand All @@ -57,13 +58,11 @@ public class CISShadowMonitor implements Consumer<NetworkStateProvider.Connectio
private static final Logger LOGGER = LogManager.getLogger(CISShadowMonitor.class);
private static final String CIS_SHADOW_SUFFIX = "-gci";
private static final String VERSION = "version";
static final String SHADOW_UPDATE_DELTA_TOPIC = "$aws/things/%s/shadow/update/delta";
static final String SHADOW_GET_ACCEPTED_TOPIC = "$aws/things/%s/shadow/get/accepted";

private static final RetryUtils.RetryConfig SUBSCRIBE_TO_TOPICS_RETRY_CONFIG =
RetryUtils.RetryConfig.builder()
.initialRetryInterval(Duration.ofSeconds(1L))
.maxRetryInterval(Duration.ofMinutes(30L))
.maxRetryInterval(Duration.ofSeconds(30L))
.maxAttempt(Integer.MAX_VALUE)
.retryableExceptions(Collections.singletonList(Exception.class))
.build();
Expand All @@ -75,17 +74,33 @@ public class CISShadowMonitor implements Consumer<NetworkStateProvider.Connectio
private static final RetryUtils.RetryConfig GET_CIS_SHADOW_RETRY_CONFIG =
RetryUtils.RetryConfig.builder()
.initialRetryInterval(Duration.ofSeconds(1L))
.maxRetryInterval(Duration.ofMinutes(30L))
.maxRetryInterval(Duration.ofSeconds(30L))
.maxAttempt(Integer.MAX_VALUE)
.retryableExceptions(Collections.singletonList(Exception.class))
.build();

private final Consumer<ShadowDeltaUpdatedEvent> onShadowDeltaUpdated = this::processCISShadow;
// we don't need to unsubscribe, because mqtt client reconnects with clean session on startup.
// instead, we can cancel current operations and block new ones on shutdown.
private final AtomicBoolean stopped = new AtomicBoolean();
private final Consumer<ShadowDeltaUpdatedEvent> onShadowDeltaUpdated = event -> {
if (stopped.get()) {
return;
}
processCISShadow(event);
};
private final Consumer<GetShadowResponse> onGetShadowAccepted = resp -> {
if (stopped.get()) {
return;
}
reportShadowReceived();
processCISShadow(resp);
};
private final Consumer<ErrorResponse> onGetShadowRejected = err -> reportShadowReceived();
private final Consumer<ErrorResponse> onGetShadowRejected = err -> {
if (stopped.get()) {
return;
}
reportShadowReceived();
};

private final Object getShadowLock = new Object();
private Future<?> getShadowTask;
Expand All @@ -95,7 +110,7 @@ public class CISShadowMonitor implements Consumer<NetworkStateProvider.Connectio
private MqttClientConnection connection;
private IotShadowClient iotShadowClient;
private String lastVersion;
private Future<?> subscribeTaskFuture;
private final AtomicBoolean subscribed = new AtomicBoolean();
private final NetworkStateProvider networkStateProvider;
private final List<CertificateGenerator> monitoredCertificateGenerators = new CopyOnWriteArrayList<>();
private final ExecutorService executorService;
Expand Down Expand Up @@ -151,43 +166,16 @@ public CISShadowMonitor(MqttClient mqttClient,
*/
@SuppressWarnings("PMD.AvoidCatchingGenericException")
public void startMonitor() {
if (subscribeTaskFuture != null) {
subscribeTaskFuture.cancel(true);
}
subscribeTaskFuture = executorService.submit(() -> {
try {
RetryUtils.runWithRetry(
SUBSCRIBE_TO_TOPICS_RETRY_CONFIG,
() -> {
subscribeToShadowTopics();
return null;
},
"subscribe-to-cis-topics",
LOGGER);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
LOGGER.atError().cause(e).log("Unexpected failure when subscribing to shadow topics");
return;
}

try {
fetchCISShadowWithRetriesAsync();
} catch (Exception e) {
LOGGER.atError().cause(e).log("Unable to fetch shadow on monitor startup");
}
});
stopped.set(false);
fetchCISShadowAsync();
}

/**
* Stop shadow monitor.
*/
public void stopMonitor() {
if (subscribeTaskFuture != null) {
subscribeTaskFuture.cancel(true);
}
unsubscribeFromShadowTopics();
stopped.set(true);
cancelFetchCISShadow();
}

/**
Expand All @@ -208,40 +196,6 @@ public void removeFromMonitor(CertificateGenerator certificateGenerator) {
monitoredCertificateGenerators.remove(certificateGenerator);
}

private void subscribeToShadowTopics() throws InterruptedException, ExecutionException {
ShadowDeltaUpdatedSubscriptionRequest shadowDeltaUpdatedSubscriptionRequest =
new ShadowDeltaUpdatedSubscriptionRequest();
shadowDeltaUpdatedSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToShadowDeltaUpdatedEvents(
shadowDeltaUpdatedSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
onShadowDeltaUpdated,
(e) -> LOGGER.atError()
.log("Error processing shadowDeltaUpdatedSubscription Response", e))
.get();
LOGGER.atDebug().log("Subscribed to shadow update delta topic");

GetShadowSubscriptionRequest getShadowSubscriptionRequest = new GetShadowSubscriptionRequest();
getShadowSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToGetShadowAccepted(
getShadowSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
onGetShadowAccepted,
(e) -> LOGGER.atError().log("Error processing getShadowSubscription Response", e))
.get();
LOGGER.atDebug().log("Subscribed to shadow get accepted topic");

GetShadowSubscriptionRequest getShadowRejectedSubscriptionRequest = new GetShadowSubscriptionRequest();
getShadowRejectedSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToGetShadowRejected(
getShadowRejectedSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
onGetShadowRejected,
(e) -> LOGGER.atError().log("Error processing get shadow rejected response", e))
.get();
LOGGER.atDebug().log("Subscribed to shadow get rejected topic");
}

private void processCISShadow(GetShadowResponse response) {
String cisVersion = Coerce.toString(response.state.desired.get("version"));
processCISShadow(cisVersion, response.state.desired);
Expand All @@ -267,6 +221,7 @@ private synchronized void processCISShadow(String version, Map<String, Object> d

LOGGER.atInfo().log("New CIS version: {}", version);

// TODO make this cancellable
// NOTE: This method executes in an MQTT CRT thread. Since certificate generation is a compute intensive
// operation (particularly on low end devices) it is imperative that we process this event asynchronously
// to avoid blocking other MQTT subscribers in the Nucleus
Expand Down Expand Up @@ -348,31 +303,8 @@ private synchronized void processCISShadow(String version, Map<String, Object> d
});
}

/**
* Asynchronously update the CIS shadow's reported state for the given shadow version.
*
* @param reportedState CIS shadow reported state
*/
private void updateCISShadowReportedState(Map<String, Object> reportedState) {
UpdateShadowRequest updateShadowRequest = new UpdateShadowRequest();
updateShadowRequest.thingName = shadowName;
updateShadowRequest.state = new ShadowState();
updateShadowRequest.state.reported = reportedState == null ? null : new HashMap<>(reportedState);
iotShadowClient.PublishUpdateShadow(updateShadowRequest, QualityOfService.AT_LEAST_ONCE).exceptionally(e -> {
LOGGER.atWarn().cause(e).log("Unable to update CIS shadow reported state");
return null;
});
}

private void reportShadowReceived() {
CompletableFuture<?> shadowReceived = this.shadowGetResponseReceived.get();
if (shadowReceived != null) {
shadowReceived.complete(null);
}
}

@SuppressWarnings("PMD.AvoidCatchingGenericException")
private void fetchCISShadowWithRetriesAsync() {
private void fetchCISShadowAsync() {
if (networkStateProvider.getConnectionState() == NetworkStateProvider.ConnectionState.NETWORK_DOWN) {
// will be retried when online again
return;
Expand All @@ -383,6 +315,23 @@ private void fetchCISShadowWithRetriesAsync() {
return;
}
getShadowTask = executorService.submit(() -> {
try {
RetryUtils.runWithRetry(
SUBSCRIBE_TO_TOPICS_RETRY_CONFIG,
() -> {
subscribeToShadowTopics();
return null;
},
"subscribe-to-cis-topics",
LOGGER);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
LOGGER.atError().cause(e).log("Unexpected failure when subscribing to shadow topics");
return;
}

try {
RetryUtils.runWithRetry(
GET_CIS_SHADOW_RETRY_CONFIG,
Expand Down Expand Up @@ -417,6 +366,53 @@ private void cancelFetchCISShadow() {
}
}

private void subscribeToShadowTopics() throws InterruptedException, ExecutionException {
if (subscribed.get()) {
// mqtt client is in charge of resubscribing, we only need to subscribe once
return;
}

ShadowDeltaUpdatedSubscriptionRequest shadowDeltaUpdatedSubscriptionRequest =
new ShadowDeltaUpdatedSubscriptionRequest();
shadowDeltaUpdatedSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToShadowDeltaUpdatedEvents(
shadowDeltaUpdatedSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
onShadowDeltaUpdated,
(e) -> LOGGER.atError()
.log("Error processing shadowDeltaUpdatedSubscription Response", e))
.get();
LOGGER.atDebug().log("Subscribed to shadow update delta topic");

GetShadowSubscriptionRequest getShadowSubscriptionRequest = new GetShadowSubscriptionRequest();
getShadowSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToGetShadowAccepted(
getShadowSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
onGetShadowAccepted,
(e) -> LOGGER.atError().log("Error processing getShadowSubscription Response", e))
.get();
LOGGER.atDebug().log("Subscribed to shadow get accepted topic");

GetShadowSubscriptionRequest getShadowRejectedSubscriptionRequest = new GetShadowSubscriptionRequest();
getShadowRejectedSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToGetShadowRejected(
getShadowRejectedSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
onGetShadowRejected,
(e) -> LOGGER.atError().log("Error processing get shadow rejected response", e))
.get();
LOGGER.atDebug().log("Subscribed to shadow get rejected topic");
subscribed.set(true);
}

private void reportShadowReceived() {
CompletableFuture<?> shadowReceived = this.shadowGetResponseReceived.get();
if (shadowReceived != null) {
shadowReceived.complete(null);
}
}

private CompletableFuture<Integer> publishToGetCISShadowTopic() {
LOGGER.atDebug().log("Publishing to get shadow topic");
GetShadowRequest getShadowRequest = new GetShadowRequest();
Expand All @@ -428,21 +424,26 @@ private CompletableFuture<Integer> publishToGetCISShadowTopic() {
});
}

private void unsubscribeFromShadowTopics() {
if (connection != null) {
LOGGER.atDebug().log("Unsubscribing from CIS shadow topics");
String topic = String.format(SHADOW_UPDATE_DELTA_TOPIC, shadowName);
connection.unsubscribe(topic);

topic = String.format(SHADOW_GET_ACCEPTED_TOPIC, shadowName);
connection.unsubscribe(topic);
}
/**
* Asynchronously update the CIS shadow's reported state for the given shadow version.
*
* @param reportedState CIS shadow reported state
*/
private void updateCISShadowReportedState(Map<String, Object> reportedState) {
UpdateShadowRequest updateShadowRequest = new UpdateShadowRequest();
updateShadowRequest.thingName = shadowName;
updateShadowRequest.state = new ShadowState();
updateShadowRequest.state.reported = reportedState == null ? null : new HashMap<>(reportedState);
iotShadowClient.PublishUpdateShadow(updateShadowRequest, QualityOfService.AT_LEAST_ONCE).exceptionally(e -> {
LOGGER.atWarn().cause(e).log("Unable to update CIS shadow reported state");
return null;
});
}

@Override
public void accept(NetworkStateProvider.ConnectionState state) {
if (state == NetworkStateProvider.ConnectionState.NETWORK_UP) {
fetchCISShadowWithRetriesAsync();
fetchCISShadowAsync();
} else if (state == NetworkStateProvider.ConnectionState.NETWORK_DOWN) {
cancelFetchCISShadow();
}
Expand Down

0 comments on commit 4064718

Please sign in to comment.