Skip to content

Commit

Permalink
Fix premature execution of subscription onCompleted
Browse files Browse the repository at this point in the history
  • Loading branch information
minbi committed Jul 19, 2018
1 parent a8d139d commit 426a0e0
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 25 deletions.
1 change: 0 additions & 1 deletion .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

* Adds support for AWS AppSync Defined Scalars such as `AWSTimestamp`.

### Bug Fixes

* Fix premature execution of `onCompleted` method of `AppSyncSubscriptionCall.Callback`.

## [Release 2.6.21](https://github.com/awslabs/aws-mobile-appsync-sdk-android/releases/tag/release_v2.6.21)

### Enhancements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,14 @@ public class MqttSubscriptionClient implements SubscriptionClient {
*/
public final Map<String, Set<SubscriptionObject>> subsMap;
MessageListener msgListener;
ClientConnectionListener clientConnectionListener;

public MqttSubscriptionClient(Context applicationContext, String wssURL, String clientId) {
mMqttAndroidClient = new MqttAndroidClient(applicationContext, wssURL, clientId, new MemoryPersistence());
subsMap = new HashMap<>();
msgListener = new MessageListener();
msgListener.client = this;
msgListener.setTransmitting(false);
}

@Override
Expand All @@ -64,23 +68,8 @@ public void connect(final SubscriptionClientCallback callback) {
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setKeepAliveInterval(5);
mMqttAndroidClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
Log.d(TAG, "connection lost");
callback.onError(new SubscriptionDisconnectedException("Client disconnected", cause));
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d(TAG, "message arrived");
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.d(TAG, "delivery complete");
}
});
clientConnectionListener = new ClientConnectionListener(callback);
mMqttAndroidClient.setCallback(clientConnectionListener);
Log.d(TAG, "Calling MQTT Connect with actual endpoint");
mMqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
Expand All @@ -107,15 +96,10 @@ public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
public void subscribe(final String topic, int qos, final SubscriptionCallback callback) {
try {
Log.d(TAG, this + " Attempt to subscribe to topic " + topic);
if (msgListener == null) {
msgListener = new MessageListener();
msgListener.client = this;
msgListener.setTransmitting(false);
if (msgListener != null) {
msgListener.setCallback(callback);
mMqttAndroidClient.subscribe(topic, qos, msgListener);
} else {
mMqttAndroidClient.subscribe(topic, qos, msgListener);
}
mMqttAndroidClient.subscribe(topic, qos, msgListener);
} catch (MqttException e) {
callback.onError(topic, e);
}
Expand Down Expand Up @@ -145,6 +129,9 @@ public void setTransmitting(final boolean isTransmitting) {
if (msgListener != null) {
msgListener.setTransmitting(isTransmitting);
}
if (this.clientConnectionListener != null) {
this.clientConnectionListener.isTransmitting = isTransmitting;
}
}

@Override
Expand Down Expand Up @@ -180,4 +167,32 @@ public void messageArrived(String topic, MqttMessage message) throws Exception {
}
}
}

class ClientConnectionListener implements MqttCallback {
private boolean isTransmitting;
final SubscriptionClientCallback callback;

public ClientConnectionListener(final SubscriptionClientCallback callback) {
this.callback = callback;
isTransmitting = true;
}

@Override
public void connectionLost(Throwable cause) {
Log.d(TAG, "connection lost isTransmitting: " + isTransmitting);
if (isTransmitting) {
callback.onError(new SubscriptionDisconnectedException("Client disconnected", cause));
}
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d(TAG, "message arrived");
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.d(TAG, "delivery complete");
}
}
}

0 comments on commit 426a0e0

Please sign in to comment.