Skip to content

Commit

Permalink
Improvements to MqttMessageReceiver
Browse files Browse the repository at this point in the history
(cherry picked from commit 2ab1a68)
  • Loading branch information
ashvayka authored and mp-loki committed Mar 13, 2018
1 parent 90e4b65 commit 579a579
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@Slf4j
public class MqttMessageReceiver implements Runnable {

private static final int INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD = 10000;
private static final int INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD = 1000;

private PersistentFileService persistentFileService;
private BlockingQueue<MessageFuturePair> incomingQueue;
Expand Down Expand Up @@ -65,17 +65,17 @@ public void run() {
log.warn("Failed to send message [{}] due to [{}]", message, future.cause());
}
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
Thread.currentThread().interrupt();
} catch (IOException e) {
log.warn(e.getMessage(), e);
break;
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}

private void checkIncomingQueueSize() {
if (incomingQueue.size() > INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD) {
log.warn("Incoming queue has [{}] messages which is more than thee specified threshold of [{}]", incomingQueue.size());
if (incomingQueue.size() > incomingQueueWarningThreshold) {
log.warn("Incoming queue has [{}] messages which is more than thee specified threshold of [{}]", incomingQueue.size(), incomingQueueWarningThreshold);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
Expand All @@ -46,14 +47,14 @@ public class MqttMessageSender implements Runnable {

private final TbConnectionConfiguration connection;

private Queue<MessageFuturePair> incomingQueue;
private BlockingQueue<MessageFuturePair> incomingQueue;
private Queue<Future<Void>> outgoingQueue;

public MqttMessageSender(TbPersistenceConfiguration persistence,
TbConnectionConfiguration connection,
MqttClient tbClient,
PersistentFileService persistentFileService,
Queue<MessageFuturePair> incomingQueue) {
BlockingQueue<MessageFuturePair> incomingQueue) {
this.persistence = persistence;
this.connection = connection;
this.tbClient = tbClient;
Expand Down Expand Up @@ -102,7 +103,7 @@ public void run() {

private Future<Void> publishMqttMessage(MqttPersistentMessage message) {
return tbClient.publish(message.getTopic(), Unpooled.wrappedBuffer(message.getPayload()), MqttQoS.AT_LEAST_ONCE).addListener(
future -> incomingQueue.add(new MessageFuturePair(future, message))
future -> incomingQueue.put(new MessageFuturePair(future, message))
);
}

Expand All @@ -128,7 +129,7 @@ private boolean checkOutgoingQueueIsEmpty() {

private boolean checkClientConnected() throws InterruptedException {
if (!tbClient.isConnected()) {
outgoingQueue.stream().forEach(future -> future.cancel(true));
outgoingQueue.forEach(future -> future.cancel(true));
outgoingQueue.clear();
log.info("ThingsBoard MQTT connection failed. Reconnecting in [{}] milliseconds", connection.getRetryInterval());
Thread.sleep(connection.getRetryInterval());
Expand Down

0 comments on commit 579a579

Please sign in to comment.