Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support shadow rejected in CISShadowMonitorTest #407

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
import software.amazon.awssdk.crt.mqtt.MqttMessage;
import software.amazon.awssdk.crt.mqtt.QualityOfService;
import software.amazon.awssdk.iot.Timestamp;
import software.amazon.awssdk.iot.iotshadow.IotShadowClient;
import software.amazon.awssdk.iot.iotshadow.model.ErrorResponse;
import software.amazon.awssdk.iot.iotshadow.model.GetShadowResponse;
import software.amazon.awssdk.iot.iotshadow.model.ShadowDeltaUpdatedEvent;
import software.amazon.awssdk.iot.iotshadow.model.ShadowStateWithDelta;
Expand All @@ -42,6 +44,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -56,6 +59,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -77,6 +81,7 @@ class CISShadowMonitorTest {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String SHADOW_NAME = "testThing-gci";
private static final String UPDATE_SHADOW_TOPIC = String.format("$aws/things/%s/shadow/update", SHADOW_NAME);
private static final String GET_SHADOW_TOPIC = String.format("$aws/things/%s/shadow/get", SHADOW_NAME);
private final FakeIotShadowClient shadowClient = spy(new FakeIotShadowClient());
private final MqttClientConnection shadowClientConnection = shadowClient.getConnection();
private final ExecutorService executor = Executors.newCachedThreadPool();
Expand Down Expand Up @@ -127,6 +132,12 @@ static class Scenario {
*/
int numShadowUpdatePublishFailures;

/**
* Amount of times to fail monitor's attempts
* to get the CIS shadow.
*/
int numGetRequestFailures;

/**
* If true, simulate monitor receiving duplicate
* shadow delta update messages from IoT Core.
Expand All @@ -150,6 +161,26 @@ public static Stream<Arguments> cisShadowMonitorScenarios() {
Arguments.of(Scenario.builder()
.receiveDuplicateShadowDeltaUpdates(true)
.build()),
// when monitor can't get shadow on startup,
// it'll recover on subsequent shadow updates
Arguments.of(Scenario.builder()
.numGetRequestFailures(1)
.serialShadowUpdates(true)
.build()),
// if shadow is never updated,
// monitor still works because it fetches shadow on startup
Arguments.of(Scenario.builder()
.numShadowUpdates(0)
.build()),
// TODO add support in CISShadowMonitor
// if shadow is never updated,
// monitor still works because it fetches shadow on startup.
// if shadow fetching fails, it will be retried
// Arguments.of(Scenario.builder()
// .numShadowUpdates(0)
// .numGetRequestFailures(1)
// .serialShadowUpdates(true)
// .build()),
Arguments.of(Scenario.builder()
.numShadowUpdatePublishFailures(1)
.serialShadowUpdates(true)
Expand Down Expand Up @@ -190,17 +221,21 @@ void GIVEN_monitor_WHEN_cis_shadow_changes_THEN_monitor_updates_certificates(Sce
// since the monitor is not listening at this point
// TODO handle case where shadow doesn't exist on startup
updateShadowDesiredState(
Utils.immutableMap("version", "INITIAL_STATE"),
Utils.immutableMap("version", "-1"),
scenario.isReceiveDuplicateShadowDeltaUpdates()
);

shadowClient.failOnGet(GET_SHADOW_TOPIC, scenario.getNumGetRequestFailures());

cisShadowMonitor.addToMonitor(certificateGenerator);
cisShadowMonitor.startMonitor();

// on startup, the monitor directly requests a shadow and processes it.
// optionally wait for the monitor to process the get shadow response.
if (scenario.isSerialShadowUpdates()) {
boolean monitorExpectedToUpdateReportedState = scenario.getConnectivityProviderMode() != FakeConnectivityInformation.Mode.FAIL_ONCE;
boolean monitorExpectedToUpdateReportedState =
scenario.getConnectivityProviderMode() != FakeConnectivityInformation.Mode.FAIL_ONCE
&& scenario.getNumGetRequestFailures() == 0;
waitForMonitorToProcessUpdate(updateProcessedByMonitor, monitorExpectedToUpdateReportedState);
}

Expand Down Expand Up @@ -317,6 +352,7 @@ static class FakeIotShadowClient extends IotShadowClient {
private final Map<String, Shadow> shadowsByThingName = new ConcurrentHashMap<>();
private final Map<String, Consumer<MqttMessage>> subscriptions = new ConcurrentHashMap<>();
private final AtomicReference<String> failOnPublish = new AtomicReference<>();
private final AtomicReference<Pair<String, AtomicInteger>> failOnGet = new AtomicReference<>();
private final List<Consumer<MqttMessage>> onPublish = new ArrayList<>();

@Getter(AccessLevel.PACKAGE)
Expand Down Expand Up @@ -362,20 +398,44 @@ private FakeIotShadowClient(MqttClientConnection connection) {

private void handleShadowGetRequest(MqttMessage message) {
String thingName = extractThingName(message.getTopic());
String acceptedTopic = acceptedTopic(thingName);
Consumer<MqttMessage> subscription = subscriptions.get(acceptedTopic);
Shadow shadow = shadowsByThingName.get(thingName);
if (subscription != null && shadow != null) {
GetShadowResponse response = new GetShadowResponse();
response.version = shadow.version;
response.state = new ShadowStateWithDelta();
response.state.desired = shadow.getDesired();
response.state.reported = shadow.getReported();
response.state.delta = shadow.getDelta();
subscription.accept(asMessage(acceptedTopic, response));
if (shadow == null) {
return;
}

String respTopic;
MqttMessage respMessage;

if (failGetOperation(message.getTopic())) {
ErrorResponse resp = new ErrorResponse();
resp.message = "get shadow failed";
resp.timestamp = new Timestamp(new Date());
respTopic = rejectedTopic(thingName);
respMessage = asMessage(respTopic, resp);
} else {
GetShadowResponse resp = new GetShadowResponse();
resp.version = shadow.version;
resp.state = new ShadowStateWithDelta();
resp.state.desired = shadow.getDesired();
resp.state.reported = shadow.getReported();
resp.state.delta = shadow.getDelta();
respTopic = acceptedTopic(thingName);
respMessage = asMessage(respTopic, resp);
}

Consumer<MqttMessage> subscription = subscriptions.get(respTopic);
if (subscription != null) {
subscription.accept(respMessage);
}
}

private boolean failGetOperation(String topic) {
Pair<String, AtomicInteger> failOnGet = this.failOnGet.get();
return failOnGet != null
&& Objects.equals(topic, failOnGet.getLeft())
&& failOnGet.getRight().getAndDecrement() > 0;
}

private void handleShadowUpdateRequest(MqttMessage message) {
UpdateShadowRequest request = readValue(message, UpdateShadowRequest.class);
updateShadow(
Expand Down Expand Up @@ -424,6 +484,17 @@ void failOnPublish(String topic) {
failOnPublish.set(topic);
}

/**
* When a get request is made to the provided topic, fail the operation.
* Shadow get result will be sent to rejected topic instead of accepted topic.
*
* @param topic topic
* @param times number of times to fail the get request
*/
void failOnGet(String topic, int times) {
failOnGet.set(new Pair<>(topic, new AtomicInteger(times)));
}

void onPublish(Consumer<MqttMessage> callback) {
onPublish.add(callback);
}
Expand All @@ -446,6 +517,10 @@ private static String acceptedTopic(String thingName) {
return String.format("$aws/things/%s/shadow/get/accepted", thingName);
}

private static String rejectedTopic(String thingName) {
return String.format("$aws/things/%s/shadow/get/rejected", thingName);
}

private static String updateDeltaTopic(String thingName) {
return String.format("$aws/things/%s/shadow/update/delta", thingName);
}
Expand Down