diff --git a/src/main/java/com/aws/greengrass/clientdevices/auth/connectivity/CISShadowMonitor.java b/src/main/java/com/aws/greengrass/clientdevices/auth/connectivity/CISShadowMonitor.java index 7163c7e2c..0eadf9ad2 100644 --- a/src/main/java/com/aws/greengrass/clientdevices/auth/connectivity/CISShadowMonitor.java +++ b/src/main/java/com/aws/greengrass/clientdevices/auth/connectivity/CISShadowMonitor.java @@ -15,13 +15,11 @@ import com.aws.greengrass.mqttclient.WrapperMqttClientConnection; import com.aws.greengrass.util.Coerce; import com.aws.greengrass.util.RetryUtils; -import lombok.AccessLevel; -import lombok.Getter; -import lombok.Setter; import software.amazon.awssdk.crt.mqtt.MqttClientConnection; import software.amazon.awssdk.crt.mqtt.MqttException; import software.amazon.awssdk.crt.mqtt.QualityOfService; import software.amazon.awssdk.iot.iotshadow.IotShadowClient; +import software.amazon.awssdk.iot.iotshadow.model.ErrorResponse; import software.amazon.awssdk.iot.iotshadow.model.GetShadowRequest; import software.amazon.awssdk.iot.iotshadow.model.GetShadowResponse; import software.amazon.awssdk.iot.iotshadow.model.GetShadowSubscriptionRequest; @@ -79,14 +77,18 @@ public class CISShadowMonitor implements Consumer onShadowDeltaUpdated = this::processCISShadow; + private final Consumer onGetShadowAccepted = resp -> { + reportShadowReceived(); + processCISShadow(resp); + }; + private final Consumer onGetShadowRejected = err -> reportShadowReceived(); private final Object getShadowLock = new Object(); private Future getShadowTask; AtomicReference> shadowGetResponseReceived = new AtomicReference<>(); + private MqttClient mqttClient; private MqttClientConnection connection; private IotShadowClient iotShadowClient; private String lastVersion; @@ -109,6 +111,7 @@ public CISShadowMonitor(MqttClient mqttClient, ExecutorService executorService, DeviceConfiguration deviceConfiguration, ConnectivityInformation connectivityInformation) { this(null, null, executorService, Coerce.toString(deviceConfiguration.getThingName()) + CIS_SHADOW_SUFFIX, connectivityInformation); + this.mqttClient = mqttClient; this.connection = new WrapperMqttClientConnection(mqttClient); this.iotShadowClient = new IotShadowClient(this.connection); } @@ -180,10 +183,10 @@ private void subscribeToShadowTopics() throws InterruptedException { iotShadowClient.SubscribeToShadowDeltaUpdatedEvents( shadowDeltaUpdatedSubscriptionRequest, QualityOfService.AT_LEAST_ONCE, - this::processCISShadow, + onShadowDeltaUpdated, (e) -> LOGGER.atError() .log("Error processing shadowDeltaUpdatedSubscription Response", e)) - .get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS); + .get(); LOGGER.info("Subscribed to shadow update delta topic"); GetShadowSubscriptionRequest getShadowSubscriptionRequest = new GetShadowSubscriptionRequest(); @@ -191,12 +194,9 @@ private void subscribeToShadowTopics() throws InterruptedException { iotShadowClient.SubscribeToGetShadowAccepted( getShadowSubscriptionRequest, QualityOfService.AT_LEAST_ONCE, - resp -> { - reportShadowReceived(); - processCISShadow(resp); - }, + onGetShadowAccepted, (e) -> LOGGER.atError().log("Error processing getShadowSubscription Response", e)) - .get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS); + .get(); LOGGER.info("Subscribed to shadow get accepted topic"); GetShadowSubscriptionRequest getShadowRejectedSubscriptionRequest = new GetShadowSubscriptionRequest(); @@ -204,9 +204,9 @@ private void subscribeToShadowTopics() throws InterruptedException { iotShadowClient.SubscribeToGetShadowRejected( getShadowRejectedSubscriptionRequest, QualityOfService.AT_LEAST_ONCE, - err -> reportShadowReceived(), + onGetShadowRejected, (e) -> LOGGER.atError().log("Error processing get shadow rejected response", e)) - .get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS); + .get(); LOGGER.info("Subscribed to shadow get rejected topic"); return; @@ -221,8 +221,6 @@ private void subscribeToShadowTopics() throws InterruptedException { LOGGER.atError().setCause(e) .log("Caught exception while subscribing to shadow topics, will retry shortly"); } - } catch (TimeoutException e) { - LOGGER.atWarn().setCause(e).log("Subscribe to shadow topics timed out, will retry shortly"); } // Wait for sometime and then try to subscribe again Thread.sleep(WAIT_TIME_TO_SUBSCRIBE_AGAIN_IN_MS + JITTER.nextInt(10_000)); @@ -360,6 +358,9 @@ private void reportShadowReceived() { @SuppressWarnings("PMD.AvoidCatchingGenericException") private void fetchCISShadowWithRetriesAsync() { + if (!mqttClient.getMqttOnline().get()) { + return; + } synchronized (getShadowLock) { if (getShadowTask != null && !getShadowTask.isDone()) { // operation already in progress @@ -370,11 +371,12 @@ private void fetchCISShadowWithRetriesAsync() { RetryUtils.runWithRetry( GET_CIS_SHADOW_RETRY_CONFIG, () -> { - CompletableFuture shadowReceived = this.shadowGetResponseReceived.updateAndGet( - ignore -> new CompletableFuture<>()); - publishToGetCISShadowTopic().get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS); + CompletableFuture shadowGetResponseReceived = new CompletableFuture<>(); + this.shadowGetResponseReceived.set(shadowGetResponseReceived); + publishToGetCISShadowTopic().get(); // await shadow get accepted, rejected - shadowReceived.get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS); + shadowGetResponseReceived.get( + mqttClient.getMqttOperationTimeoutMillis() + 5L, TimeUnit.MILLISECONDS); return null; }, "get-cis-shadow", @@ -400,7 +402,7 @@ private CompletableFuture publishToGetCISShadowTopic() { LOGGER.atDebug().log("Publishing to get shadow topic"); GetShadowRequest getShadowRequest = new GetShadowRequest(); getShadowRequest.thingName = shadowName; - return iotShadowClient.PublishGetShadow(getShadowRequest, QualityOfService.AT_LEAST_ONCE) + return iotShadowClient.PublishGetShadow(getShadowRequest, QualityOfService.AT_MOST_ONCE) .exceptionally(e -> { LOGGER.atWarn().cause(e).log("Unable to retrieve CIS shadow"); return null; diff --git a/src/test/java/com/aws/greengrass/clientdevices/auth/connectivity/CISShadowMonitorTest.java b/src/test/java/com/aws/greengrass/clientdevices/auth/connectivity/CISShadowMonitorTest.java index c35d15734..79cfc52dd 100644 --- a/src/test/java/com/aws/greengrass/clientdevices/auth/connectivity/CISShadowMonitorTest.java +++ b/src/test/java/com/aws/greengrass/clientdevices/auth/connectivity/CISShadowMonitorTest.java @@ -101,8 +101,6 @@ void setup() { SHADOW_NAME, connectivityInfoProvider ); - // avoid unnecessary waiting - cisShadowMonitor.setMqttOperationTimeoutSeconds(1L); } @AfterEach