From 426a0e0a719167bb40137eec2c83bb776b840c77 Mon Sep 17 00:00:00 2001 From: Min Bi Date: Wed, 18 Jul 2018 16:55:03 -0700 Subject: [PATCH] Fix premature execution of subscription onCompleted --- .idea/modules.xml | 1 - CHANGELOG.md | 4 ++ .../mqtt/MqttSubscriptionClient.java | 63 ++++++++++++------- 3 files changed, 43 insertions(+), 25 deletions(-) diff --git a/.idea/modules.xml b/.idea/modules.xml index d30cdb49..15e23523 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -2,7 +2,6 @@ - diff --git a/CHANGELOG.md b/CHANGELOG.md index 057ad068..8a867c24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ * Adds support for AWS AppSync Defined Scalars such as `AWSTimestamp`. +### Bug Fixes + +* Fix premature execution of `onCompleted` method of `AppSyncSubscriptionCall.Callback`. + ## [Release 2.6.21](https://github.com/awslabs/aws-mobile-appsync-sdk-android/releases/tag/release_v2.6.21) ### Enhancements diff --git a/aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/subscription/mqtt/MqttSubscriptionClient.java b/aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/subscription/mqtt/MqttSubscriptionClient.java index ed2914a6..d21ac5a0 100644 --- a/aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/subscription/mqtt/MqttSubscriptionClient.java +++ b/aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/subscription/mqtt/MqttSubscriptionClient.java @@ -50,10 +50,14 @@ public class MqttSubscriptionClient implements SubscriptionClient { */ public final Map> subsMap; MessageListener msgListener; + ClientConnectionListener clientConnectionListener; public MqttSubscriptionClient(Context applicationContext, String wssURL, String clientId) { mMqttAndroidClient = new MqttAndroidClient(applicationContext, wssURL, clientId, new MemoryPersistence()); subsMap = new HashMap<>(); + msgListener = new MessageListener(); + msgListener.client = this; + msgListener.setTransmitting(false); } @Override @@ -64,23 +68,8 @@ public void connect(final SubscriptionClientCallback callback) { mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setKeepAliveInterval(5); - mMqttAndroidClient.setCallback(new MqttCallback() { - @Override - public void connectionLost(Throwable cause) { - Log.d(TAG, "connection lost"); - callback.onError(new SubscriptionDisconnectedException("Client disconnected", cause)); - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - Log.d(TAG, "message arrived"); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - Log.d(TAG, "delivery complete"); - } - }); + clientConnectionListener = new ClientConnectionListener(callback); + mMqttAndroidClient.setCallback(clientConnectionListener); Log.d(TAG, "Calling MQTT Connect with actual endpoint"); mMqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() { @Override @@ -107,15 +96,10 @@ public void onFailure(IMqttToken asyncActionToken, Throwable exception) { public void subscribe(final String topic, int qos, final SubscriptionCallback callback) { try { Log.d(TAG, this + " Attempt to subscribe to topic " + topic); - if (msgListener == null) { - msgListener = new MessageListener(); - msgListener.client = this; - msgListener.setTransmitting(false); + if (msgListener != null) { msgListener.setCallback(callback); - mMqttAndroidClient.subscribe(topic, qos, msgListener); - } else { - mMqttAndroidClient.subscribe(topic, qos, msgListener); } + mMqttAndroidClient.subscribe(topic, qos, msgListener); } catch (MqttException e) { callback.onError(topic, e); } @@ -145,6 +129,9 @@ public void setTransmitting(final boolean isTransmitting) { if (msgListener != null) { msgListener.setTransmitting(isTransmitting); } + if (this.clientConnectionListener != null) { + this.clientConnectionListener.isTransmitting = isTransmitting; + } } @Override @@ -180,4 +167,32 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { } } } + + class ClientConnectionListener implements MqttCallback { + private boolean isTransmitting; + final SubscriptionClientCallback callback; + + public ClientConnectionListener(final SubscriptionClientCallback callback) { + this.callback = callback; + isTransmitting = true; + } + + @Override + public void connectionLost(Throwable cause) { + Log.d(TAG, "connection lost isTransmitting: " + isTransmitting); + if (isTransmitting) { + callback.onError(new SubscriptionDisconnectedException("Client disconnected", cause)); + } + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + Log.d(TAG, "message arrived"); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + Log.d(TAG, "delivery complete"); + } + } }