Skip to content

Commit

Permalink
chore: support shadow rejected in CISShadowMonitorTest (#407)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 authored Oct 26, 2023
1 parent beae012 commit f71adfe
Showing 1 changed file with 87 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
import software.amazon.awssdk.crt.mqtt.MqttMessage;
import software.amazon.awssdk.crt.mqtt.QualityOfService;
import software.amazon.awssdk.iot.Timestamp;
import software.amazon.awssdk.iot.iotshadow.IotShadowClient;
import software.amazon.awssdk.iot.iotshadow.model.ErrorResponse;
import software.amazon.awssdk.iot.iotshadow.model.GetShadowResponse;
import software.amazon.awssdk.iot.iotshadow.model.ShadowDeltaUpdatedEvent;
import software.amazon.awssdk.iot.iotshadow.model.ShadowStateWithDelta;
Expand All @@ -42,6 +44,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -56,6 +59,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -77,6 +81,7 @@ class CISShadowMonitorTest {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String SHADOW_NAME = "testThing-gci";
private static final String UPDATE_SHADOW_TOPIC = String.format("$aws/things/%s/shadow/update", SHADOW_NAME);
private static final String GET_SHADOW_TOPIC = String.format("$aws/things/%s/shadow/get", SHADOW_NAME);
private final FakeIotShadowClient shadowClient = spy(new FakeIotShadowClient());
private final MqttClientConnection shadowClientConnection = shadowClient.getConnection();
private final ExecutorService executor = Executors.newCachedThreadPool();
Expand Down Expand Up @@ -127,6 +132,12 @@ static class Scenario {
*/
int numShadowUpdatePublishFailures;

/**
* Amount of times to fail monitor's attempts
* to get the CIS shadow.
*/
int numGetRequestFailures;

/**
* If true, simulate monitor receiving duplicate
* shadow delta update messages from IoT Core.
Expand All @@ -150,6 +161,26 @@ public static Stream<Arguments> cisShadowMonitorScenarios() {
Arguments.of(Scenario.builder()
.receiveDuplicateShadowDeltaUpdates(true)
.build()),
// when monitor can't get shadow on startup,
// it'll recover on subsequent shadow updates
Arguments.of(Scenario.builder()
.numGetRequestFailures(1)
.serialShadowUpdates(true)
.build()),
// if shadow is never updated,
// monitor still works because it fetches shadow on startup
Arguments.of(Scenario.builder()
.numShadowUpdates(0)
.build()),
// TODO add support in CISShadowMonitor
// if shadow is never updated,
// monitor still works because it fetches shadow on startup.
// if shadow fetching fails, it will be retried
// Arguments.of(Scenario.builder()
// .numShadowUpdates(0)
// .numGetRequestFailures(1)
// .serialShadowUpdates(true)
// .build()),
Arguments.of(Scenario.builder()
.numShadowUpdatePublishFailures(1)
.serialShadowUpdates(true)
Expand Down Expand Up @@ -190,17 +221,21 @@ void GIVEN_monitor_WHEN_cis_shadow_changes_THEN_monitor_updates_certificates(Sce
// since the monitor is not listening at this point
// TODO handle case where shadow doesn't exist on startup
updateShadowDesiredState(
Utils.immutableMap("version", "INITIAL_STATE"),
Utils.immutableMap("version", "-1"),
scenario.isReceiveDuplicateShadowDeltaUpdates()
);

shadowClient.failOnGet(GET_SHADOW_TOPIC, scenario.getNumGetRequestFailures());

cisShadowMonitor.addToMonitor(certificateGenerator);
cisShadowMonitor.startMonitor();

// on startup, the monitor directly requests a shadow and processes it.
// optionally wait for the monitor to process the get shadow response.
if (scenario.isSerialShadowUpdates()) {
boolean monitorExpectedToUpdateReportedState = scenario.getConnectivityProviderMode() != FakeConnectivityInformation.Mode.FAIL_ONCE;
boolean monitorExpectedToUpdateReportedState =
scenario.getConnectivityProviderMode() != FakeConnectivityInformation.Mode.FAIL_ONCE
&& scenario.getNumGetRequestFailures() == 0;
waitForMonitorToProcessUpdate(updateProcessedByMonitor, monitorExpectedToUpdateReportedState);
}

Expand Down Expand Up @@ -317,6 +352,7 @@ static class FakeIotShadowClient extends IotShadowClient {
private final Map<String, Shadow> shadowsByThingName = new ConcurrentHashMap<>();
private final Map<String, Consumer<MqttMessage>> subscriptions = new ConcurrentHashMap<>();
private final AtomicReference<String> failOnPublish = new AtomicReference<>();
private final AtomicReference<Pair<String, AtomicInteger>> failOnGet = new AtomicReference<>();
private final List<Consumer<MqttMessage>> onPublish = new ArrayList<>();

@Getter(AccessLevel.PACKAGE)
Expand Down Expand Up @@ -362,20 +398,44 @@ private FakeIotShadowClient(MqttClientConnection connection) {

private void handleShadowGetRequest(MqttMessage message) {
String thingName = extractThingName(message.getTopic());
String acceptedTopic = acceptedTopic(thingName);
Consumer<MqttMessage> subscription = subscriptions.get(acceptedTopic);
Shadow shadow = shadowsByThingName.get(thingName);
if (subscription != null && shadow != null) {
GetShadowResponse response = new GetShadowResponse();
response.version = shadow.version;
response.state = new ShadowStateWithDelta();
response.state.desired = shadow.getDesired();
response.state.reported = shadow.getReported();
response.state.delta = shadow.getDelta();
subscription.accept(asMessage(acceptedTopic, response));
if (shadow == null) {
return;
}

String respTopic;
MqttMessage respMessage;

if (failGetOperation(message.getTopic())) {
ErrorResponse resp = new ErrorResponse();
resp.message = "get shadow failed";
resp.timestamp = new Timestamp(new Date());
respTopic = rejectedTopic(thingName);
respMessage = asMessage(respTopic, resp);
} else {
GetShadowResponse resp = new GetShadowResponse();
resp.version = shadow.version;
resp.state = new ShadowStateWithDelta();
resp.state.desired = shadow.getDesired();
resp.state.reported = shadow.getReported();
resp.state.delta = shadow.getDelta();
respTopic = acceptedTopic(thingName);
respMessage = asMessage(respTopic, resp);
}

Consumer<MqttMessage> subscription = subscriptions.get(respTopic);
if (subscription != null) {
subscription.accept(respMessage);
}
}

private boolean failGetOperation(String topic) {
Pair<String, AtomicInteger> failOnGet = this.failOnGet.get();
return failOnGet != null
&& Objects.equals(topic, failOnGet.getLeft())
&& failOnGet.getRight().getAndDecrement() > 0;
}

private void handleShadowUpdateRequest(MqttMessage message) {
UpdateShadowRequest request = readValue(message, UpdateShadowRequest.class);
updateShadow(
Expand Down Expand Up @@ -424,6 +484,17 @@ void failOnPublish(String topic) {
failOnPublish.set(topic);
}

/**
* When a get request is made to the provided topic, fail the operation.
* Shadow get result will be sent to rejected topic instead of accepted topic.
*
* @param topic topic
* @param times number of times to fail the get request
*/
void failOnGet(String topic, int times) {
failOnGet.set(new Pair<>(topic, new AtomicInteger(times)));
}

void onPublish(Consumer<MqttMessage> callback) {
onPublish.add(callback);
}
Expand All @@ -446,6 +517,10 @@ private static String acceptedTopic(String thingName) {
return String.format("$aws/things/%s/shadow/get/accepted", thingName);
}

private static String rejectedTopic(String thingName) {
return String.format("$aws/things/%s/shadow/get/rejected", thingName);
}

private static String updateDeltaTopic(String thingName) {
return String.format("$aws/things/%s/shadow/update/delta", thingName);
}
Expand Down

0 comments on commit f71adfe

Please sign in to comment.