Skip to content

Commit

Permalink
chore: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 committed Oct 27, 2023
1 parent a2a005f commit dbf4e7c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
import com.aws.greengrass.mqttclient.WrapperMqttClientConnection;
import com.aws.greengrass.util.Coerce;
import com.aws.greengrass.util.RetryUtils;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
import software.amazon.awssdk.crt.mqtt.MqttException;
import software.amazon.awssdk.crt.mqtt.QualityOfService;
import software.amazon.awssdk.iot.iotshadow.IotShadowClient;
import software.amazon.awssdk.iot.iotshadow.model.ErrorResponse;
import software.amazon.awssdk.iot.iotshadow.model.GetShadowRequest;
import software.amazon.awssdk.iot.iotshadow.model.GetShadowResponse;
import software.amazon.awssdk.iot.iotshadow.model.GetShadowSubscriptionRequest;
Expand Down Expand Up @@ -79,14 +77,18 @@ public class CISShadowMonitor implements Consumer<NetworkStateProvider.Connectio
.retryableExceptions(Collections.singletonList(Exception.class))
.build();

@Getter(AccessLevel.PACKAGE)
@Setter(AccessLevel.PACKAGE)
private long mqttOperationTimeoutSeconds = 10L;
private final Consumer<ShadowDeltaUpdatedEvent> onShadowDeltaUpdated = this::processCISShadow;
private final Consumer<GetShadowResponse> onGetShadowAccepted = resp -> {
reportShadowReceived();
processCISShadow(resp);
};
private final Consumer<ErrorResponse> onGetShadowRejected = err -> reportShadowReceived();

private final Object getShadowLock = new Object();
private Future<?> getShadowTask;
AtomicReference<CompletableFuture<?>> shadowGetResponseReceived = new AtomicReference<>();

private MqttClient mqttClient;
private MqttClientConnection connection;
private IotShadowClient iotShadowClient;
private String lastVersion;
Expand All @@ -109,6 +111,7 @@ public CISShadowMonitor(MqttClient mqttClient, ExecutorService executorService,
DeviceConfiguration deviceConfiguration, ConnectivityInformation connectivityInformation) {
this(null, null, executorService, Coerce.toString(deviceConfiguration.getThingName()) + CIS_SHADOW_SUFFIX,
connectivityInformation);
this.mqttClient = mqttClient;
this.connection = new WrapperMqttClientConnection(mqttClient);
this.iotShadowClient = new IotShadowClient(this.connection);
}
Expand Down Expand Up @@ -180,33 +183,30 @@ private void subscribeToShadowTopics() throws InterruptedException {
iotShadowClient.SubscribeToShadowDeltaUpdatedEvents(
shadowDeltaUpdatedSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
this::processCISShadow,
onShadowDeltaUpdated,
(e) -> LOGGER.atError()
.log("Error processing shadowDeltaUpdatedSubscription Response", e))
.get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS);
.get();
LOGGER.info("Subscribed to shadow update delta topic");

GetShadowSubscriptionRequest getShadowSubscriptionRequest = new GetShadowSubscriptionRequest();
getShadowSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToGetShadowAccepted(
getShadowSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
resp -> {
reportShadowReceived();
processCISShadow(resp);
},
onGetShadowAccepted,
(e) -> LOGGER.atError().log("Error processing getShadowSubscription Response", e))
.get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS);
.get();
LOGGER.info("Subscribed to shadow get accepted topic");

GetShadowSubscriptionRequest getShadowRejectedSubscriptionRequest = new GetShadowSubscriptionRequest();
getShadowRejectedSubscriptionRequest.thingName = shadowName;
iotShadowClient.SubscribeToGetShadowRejected(
getShadowRejectedSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE,
err -> reportShadowReceived(),
onGetShadowRejected,
(e) -> LOGGER.atError().log("Error processing get shadow rejected response", e))
.get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS);
.get();
LOGGER.info("Subscribed to shadow get rejected topic");
return;

Expand All @@ -221,8 +221,6 @@ private void subscribeToShadowTopics() throws InterruptedException {
LOGGER.atError().setCause(e)
.log("Caught exception while subscribing to shadow topics, will retry shortly");
}
} catch (TimeoutException e) {
LOGGER.atWarn().setCause(e).log("Subscribe to shadow topics timed out, will retry shortly");
}
// Wait for sometime and then try to subscribe again
Thread.sleep(WAIT_TIME_TO_SUBSCRIBE_AGAIN_IN_MS + JITTER.nextInt(10_000));
Expand Down Expand Up @@ -360,6 +358,9 @@ private void reportShadowReceived() {

@SuppressWarnings("PMD.AvoidCatchingGenericException")
private void fetchCISShadowWithRetriesAsync() {
if (!mqttClient.getMqttOnline().get()) {
return;
}
synchronized (getShadowLock) {
if (getShadowTask != null && !getShadowTask.isDone()) {
// operation already in progress
Expand All @@ -370,11 +371,12 @@ private void fetchCISShadowWithRetriesAsync() {
RetryUtils.runWithRetry(
GET_CIS_SHADOW_RETRY_CONFIG,
() -> {
CompletableFuture<?> shadowReceived = this.shadowGetResponseReceived.updateAndGet(
ignore -> new CompletableFuture<>());
publishToGetCISShadowTopic().get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS);
CompletableFuture<?> shadowGetResponseReceived = new CompletableFuture<>();
this.shadowGetResponseReceived.set(shadowGetResponseReceived);
publishToGetCISShadowTopic().get();
// await shadow get accepted, rejected
shadowReceived.get(mqttOperationTimeoutSeconds, TimeUnit.SECONDS);
shadowGetResponseReceived.get(
mqttClient.getMqttOperationTimeoutMillis() + 5L, TimeUnit.MILLISECONDS);
return null;
},
"get-cis-shadow",
Expand All @@ -400,7 +402,7 @@ private CompletableFuture<Integer> publishToGetCISShadowTopic() {
LOGGER.atDebug().log("Publishing to get shadow topic");
GetShadowRequest getShadowRequest = new GetShadowRequest();
getShadowRequest.thingName = shadowName;
return iotShadowClient.PublishGetShadow(getShadowRequest, QualityOfService.AT_LEAST_ONCE)
return iotShadowClient.PublishGetShadow(getShadowRequest, QualityOfService.AT_MOST_ONCE)
.exceptionally(e -> {
LOGGER.atWarn().cause(e).log("Unable to retrieve CIS shadow");
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ void setup() {
SHADOW_NAME,
connectivityInfoProvider
);
// avoid unnecessary waiting
cisShadowMonitor.setMqttOperationTimeoutSeconds(1L);
}

@AfterEach
Expand Down

0 comments on commit dbf4e7c

Please sign in to comment.