Skip to content

Commit

Permalink
fix(mqtt5): catch mqtt builder fails with mqtt exception (#1492)
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDombo authored Jun 20, 2023
1 parent 8fa0f36 commit 3b555bb
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@
<dependency>
<groupId>software.amazon.awssdk.iotdevicesdk</groupId>
<artifactId>aws-iot-device-sdk</artifactId>
<version>1.12.2-CLI-SNAPSHOT</version>
<version>1.13.2-CLI-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/com/aws/greengrass/deployment/IotJobsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ public void unsubscribeFromIotJobsTopics() {
* @throws InterruptedException if the thread gets interrupted
* @throws TimeoutException if the operation does not complete within the given time
*/
@SuppressWarnings("PMD.AvoidCatchingThrowable")
protected void subscribeToGetNextJobDescription(Consumer<DescribeJobExecutionResponse> consumerAccept,
Consumer<RejectedError> consumerReject)
throws InterruptedException {
Expand All @@ -523,10 +524,12 @@ protected void subscribeToGetNextJobDescription(Consumer<DescribeJobExecutionRes
describeJobExecutionSubscriptionRequest.jobId = NEXT_JOB_LITERAL;

while (true) {
CompletableFuture<Integer> subscribed = iotJobsClientWrapper
.SubscribeToDescribeJobExecutionAccepted(describeJobExecutionSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE, consumerAccept);
CompletableFuture<Integer> subscribed;
try {
subscribed = iotJobsClientWrapper
.SubscribeToDescribeJobExecutionAccepted(describeJobExecutionSubscriptionRequest,
QualityOfService.AT_LEAST_ONCE, consumerAccept);

subscribed.get(mqttClient.getMqttOperationTimeoutMillis(), TimeUnit.MILLISECONDS);
subscribed = iotJobsClientWrapper
.SubscribeToDescribeJobExecutionRejected(describeJobExecutionSubscriptionRequest,
Expand All @@ -544,11 +547,12 @@ protected void subscribeToGetNextJobDescription(Consumer<DescribeJobExecutionRes
logger.atWarn().log(SUBSCRIPTION_JOB_DESCRIPTION_INTERRUPTED);
break;
}
} catch (TimeoutException e) {
logger.atWarn().setCause(e).log(SUBSCRIPTION_JOB_DESCRIPTION_RETRY_MESSAGE);
} catch (InterruptedException e) {
logger.atWarn().log(SUBSCRIPTION_JOB_DESCRIPTION_INTERRUPTED);
throw e;
} catch (Throwable t) {
// Catch anything else that happens
logger.atWarn().setCause(t).log(SUBSCRIPTION_JOB_DESCRIPTION_RETRY_MESSAGE);
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public class ShadowDeploymentListener implements InjectionActions {
private static final String SHADOW_UPDATE_REJECTED_TOPIC = "$aws/things/{thingName}/shadow/name/{shadowName}"
+ "/update/rejected";
private static final String SHADOW_GET_TOPIC = "$aws/things/{thingName}/shadow/name/{shadowName}/get/accepted";
private static final String SUBSCRIBE_ERROR_RETRY_MESSAGE =
"Caught exception while subscribing to shadow topics, will retry shortly";

@Inject
private Kernel kernel;
Expand Down Expand Up @@ -222,6 +224,7 @@ private void setupShadowCommunications() {
Subscribe to "$aws/things/{thingName}/shadow/update/rejected" topic to get notified when an update is rejected
Subscribe to "$aws/things/{thingName}/shadow/get/accepted" topic to retrieve shadow by publishing to get topic
*/
@SuppressWarnings("PMD.AvoidCatchingThrowable")
private void subscribeToShadowTopics() {
logger.atDebug().log(SUBSCRIBING_TO_SHADOW_TOPICS_MESSAGE);
while (true) {
Expand Down Expand Up @@ -259,22 +262,25 @@ private void subscribeToShadowTopics() {
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof MqttException || cause instanceof TimeoutException) {
logger.atWarn().setCause(cause).log("Caught exception while subscribing to shadow topics, "
+ "will retry shortly");
logger.atWarn().setCause(cause).log(SUBSCRIBE_ERROR_RETRY_MESSAGE);
} else if (cause instanceof InterruptedException) {
logger.atWarn().log("Interrupted while subscribing to shadow topics");
return;
} else {
logger.atError().setCause(e)
.log("Caught exception while subscribing to shadow topics, will retry shortly");
logger.atError().setCause(e).log(SUBSCRIBE_ERROR_RETRY_MESSAGE);
}
} catch (TimeoutException e) {
logger.atWarn().setCause(e).log("Subscribe to shadow topics timed out, will retry shortly");
} catch (InterruptedException e) {
//Since this method can run as runnable cannot throw exception so handling exceptions here
logger.atWarn().log("Interrupted while subscribing to shadow topics");
return;
} catch (Throwable t) {
logger.atWarn().setCause(t).log(SUBSCRIBE_ERROR_RETRY_MESSAGE);
}



try {
// Wait for sometime and then try to subscribe again
Thread.sleep(WAIT_TIME_TO_SUBSCRIBE_AGAIN_IN_MS + JITTER.nextInt(10_000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.Getter;
import lombok.Setter;
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.mqtt.MqttException;
import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;
import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions;
import software.amazon.awssdk.crt.mqtt5.OnAttemptingConnectReturn;
Expand Down Expand Up @@ -319,6 +320,9 @@ private synchronized void internalConnect() {
"sessionExpirySeconds")))
);
client = builder.build();
} catch (MqttException e) {
connectFuture.completeExceptionally(e);
return;
}
client.start();
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/aws/greengrass/mqttclient/MqttClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ protected int getNextClientIdNumber() {
return 0;
}

@SuppressWarnings("PMD.AvoidCatchingGenericException")
@SuppressWarnings({"PMD.AvoidCatchingGenericException", "PMD.AvoidRethrowingException"})
protected IndividualMqttClient getNewMqttClient() {
int clientIdNum = getNextClientIdNumber();
// Name client by thingName#<number> except for the first connection which will just be thingName
Expand All @@ -988,6 +988,8 @@ protected IndividualMqttClient getNewMqttClient() {
return new AwsIotMqtt5Client(() -> {
try {
return builderProvider.apply(clientBootstrap).toAwsIotMqtt5ClientBuilder();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ void GIVEN_ping_timeout_gte_keep_alive_WHEN_mqtt_client_connects_THEN_throws_exc
mqttNamespace.lookup(MqttClient.MQTT_PING_TIMEOUT_KEY).withValue(pingTimeout);
MqttClient mqttClient = new MqttClient(deviceConfiguration, ses, executorService,
mock(SecurityService.class), kernel);
RuntimeException e = assertThrows(RuntimeException.class, () -> mqttClient.getNewMqttClient().connect().get());
ExecutionException e = assertThrows(ExecutionException.class, () -> mqttClient.getNewMqttClient().connect().get());
assertEquals(MqttException.class, e.getCause().getClass());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public void beforeEach(ExtensionContext context) throws Exception {
ignoreExceptionOfType(context, ClosedByInterruptException.class);
ignoreExceptionWithStackTraceContaining(context, IllegalAccessException.class,
ProvisioningPluginFactory.class.getName());
ignoreExceptionWithStackTraceContaining(context, NullPointerException.class,
"subscribeToGetNextJobDescription");
}

@Override
Expand Down

0 comments on commit 3b555bb

Please sign in to comment.