Skip to content

Commit

Permalink
fix: retry get shadow operation
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 committed Oct 27, 2023
1 parent f71adfe commit 608e040
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import com.aws.greengrass.mqttclient.WrapperMqttClientConnection;
import com.aws.greengrass.util.Coerce;
import com.aws.greengrass.util.RetryUtils;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
import software.amazon.awssdk.crt.mqtt.MqttException;
import software.amazon.awssdk.crt.mqtt.QualityOfService;
Expand All @@ -33,6 +36,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -48,6 +52,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.inject.Inject;

Expand All @@ -56,7 +61,6 @@ public class CISShadowMonitor implements Consumer<NetworkStateProvider.Connectio
private static final Logger LOGGER = LogManager.getLogger(CISShadowMonitor.class);
private static final String CIS_SHADOW_SUFFIX = "-gci";
private static final String VERSION = "version";
private static final long TIMEOUT_FOR_SUBSCRIBING_TO_TOPICS_SECONDS = Duration.ofMinutes(1).getSeconds();
private static final long WAIT_TIME_TO_SUBSCRIBE_AGAIN_IN_MS = Duration.ofMinutes(2).toMillis();
private static final Random JITTER = new Random();
static final String SHADOW_UPDATE_DELTA_TOPIC = "$aws/things/%s/shadow/update/delta";
Expand All @@ -67,6 +71,22 @@ public class CISShadowMonitor implements Consumer<NetworkStateProvider.Connectio
.maxRetryInterval(Duration.ofMinutes(30L)).maxAttempt(Integer.MAX_VALUE)
.retryableExceptions(Arrays.asList(ThrottlingException.class, InternalServerException.class))
.build();
private static final RetryUtils.RetryConfig GET_CIS_SHADOW_RETRY_CONFIG =
RetryUtils.RetryConfig.builder()
.initialRetryInterval(Duration.ofSeconds(1L))
.maxRetryInterval(Duration.ofMinutes(30L))
.maxAttempt(Integer.MAX_VALUE)
.retryableExceptions(Collections.singletonList(Exception.class))
.build();

@Getter(AccessLevel.PACKAGE)
@Setter(AccessLevel.PACKAGE)
private long mqttOperationTimeoutSeconds = 10L;

private final Object getShadowLock = new Object();
private Future<?> getShadowTask;
AtomicReference<CompletableFuture<?>> shadowReceived = new AtomicReference<>();


private MqttClientConnection connection;
private IotShadowClient iotShadowClient;
Expand Down Expand Up @@ -113,7 +133,7 @@ public void startMonitor() {
subscribeTaskFuture = executorService.submit(() -> {
try {
subscribeToShadowTopics();
publishToGetCISShadowTopic();
fetchCISShadowWithRetriesAsync();
} catch (InterruptedException e) {
LOGGER.atWarn().cause(e).log("Interrupted while subscribing to CIS shadow topics");
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -158,21 +178,40 @@ private void subscribeToShadowTopics() throws InterruptedException {
ShadowDeltaUpdatedSubscriptionRequest shadowDeltaUpdatedSubscriptionRequest =
new ShadowDeltaUpdatedSubscriptionRequest();
shadowDeltaUpdatedSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToShadowDeltaUpdatedEvents(shadowDeltaUpdatedSubscriptionRequest,
iotShadowClient.SubscribeToShadowDeltaUpdatedEvents(
shadowDeltaUpdatedSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
this::processCISShadow,
resp -> {
reportShadowReceived();
processCISShadow(resp);
},
(e) -> LOGGER.atError()
.log("Error processing shadowDeltaUpdatedSubscription Response", e))
.get(TIMEOUT_FOR_SUBSCRIBING_TO_TOPICS_SECONDS, TimeUnit.SECONDS);
.get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS);
LOGGER.info("Subscribed to shadow update delta topic");

GetShadowSubscriptionRequest getShadowSubscriptionRequest = new GetShadowSubscriptionRequest();
getShadowSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToGetShadowAccepted(getShadowSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE, this::processCISShadow,
iotShadowClient.SubscribeToGetShadowAccepted(
getShadowSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
resp -> {
reportShadowReceived();
processCISShadow(resp);
},
(e) -> LOGGER.atError().log("Error processing getShadowSubscription Response", e))
.get(TIMEOUT_FOR_SUBSCRIBING_TO_TOPICS_SECONDS, TimeUnit.SECONDS);
.get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS);
LOGGER.info("Subscribed to shadow get accepted topic");

GetShadowSubscriptionRequest getShadowRejectedSubscriptionRequest = new GetShadowSubscriptionRequest();
getShadowRejectedSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToGetShadowRejected(
getShadowRejectedSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
err -> reportShadowReceived(),
(e) -> LOGGER.atError().log("Error processing get shadow rejected response", e))
.get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS);
LOGGER.info("Subscribed to shadow get rejected topic");
return;

} catch (ExecutionException e) {
Expand Down Expand Up @@ -281,6 +320,7 @@ private synchronized void processCISShadow(String version, Map<String, Object> d
for (CertificateGenerator cg : monitoredCertificateGenerators) {
cg.generateCertificate(() -> new ArrayList<>(cachedHostAddresses), "connectivity info was updated");
}
LOGGER.atDebug().log("certificates rotated");
} catch (CertificateGenerationException e) {
LOGGER.atError().kv(VERSION, version).cause(e).log("Failed to generate new certificates");
return;
Expand Down Expand Up @@ -315,14 +355,60 @@ private void updateCISShadowReportedState(Map<String, Object> reportedState) {
});
}

private void publishToGetCISShadowTopic() {
private void reportShadowReceived() {
CompletableFuture<?> shadowReceived = this.shadowReceived.get();
if (shadowReceived != null) {
shadowReceived.complete(null);
}
}

@SuppressWarnings("PMD.AvoidCatchingGenericException")
private void fetchCISShadowWithRetriesAsync() {
synchronized (getShadowLock) {
if (getShadowTask != null && !getShadowTask.isDone()) {
// operation already in progress
return;
}
getShadowTask = executorService.submit(() -> {
try {
RetryUtils.runWithRetry(
GET_CIS_SHADOW_RETRY_CONFIG,
() -> {
CompletableFuture<?> shadowReceived =
this.shadowReceived.updateAndGet(ignore -> new CompletableFuture<>());
publishToGetCISShadowTopic().get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS);
// await shadow get accepted, rejected, or update delta
shadowReceived.get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS);
return null;
},
"get-cis-shadow",
LOGGER);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LOGGER.atError().cause(e).log("unable to get CIS shadow");
}
});
}
}

private void cancelGetCISShadow() {
synchronized (getShadowLock) {
if (getShadowTask != null) {
getShadowTask.cancel(true);
}
}
}

private CompletableFuture<Integer> publishToGetCISShadowTopic() {
LOGGER.atDebug().log("Publishing to get shadow topic");
GetShadowRequest getShadowRequest = new GetShadowRequest();
getShadowRequest.thingName = shadowName;
iotShadowClient.PublishGetShadow(getShadowRequest, QualityOfService.AT_LEAST_ONCE).exceptionally(e -> {
LOGGER.atWarn().cause(e).log("Unable to retrieve CIS shadow");
return null;
});
return iotShadowClient.PublishGetShadow(getShadowRequest, QualityOfService.AT_LEAST_ONCE)
.exceptionally(e -> {
LOGGER.atWarn().cause(e).log("Unable to retrieve CIS shadow");
return null;
});
}

private void unsubscribeFromShadowTopics() {
Expand All @@ -339,7 +425,9 @@ private void unsubscribeFromShadowTopics() {
@Override
public void accept(NetworkStateProvider.ConnectionState state) {
if (state == NetworkStateProvider.ConnectionState.NETWORK_UP) {
publishToGetCISShadowTopic();
fetchCISShadowWithRetriesAsync();
} else if (state == NetworkStateProvider.ConnectionState.NETWORK_DOWN) {
cancelGetCISShadow();
}
}
}
Loading

0 comments on commit 608e040

Please sign in to comment.