From b8ca1d195e43382a80aa792cbaf8de6ec6e9d39a Mon Sep 17 00:00:00 2001 From: mp-loki Date: Sat, 10 Mar 2018 10:12:28 -0500 Subject: [PATCH] MqttMessageReceiver added --- .../gateway/service/MessageFuturePair.java | 31 +++++++ .../gateway/service/MqttGatewayService.java | 14 +++- .../gateway/service/MqttMessageReceiver.java | 81 +++++++++++++++++++ .../gateway/service/MqttMessageSender.java | 28 +++---- .../conf/TbConnectionConfiguration.java | 1 + 5 files changed, 135 insertions(+), 20 deletions(-) create mode 100644 src/main/java/org/thingsboard/gateway/service/MessageFuturePair.java create mode 100644 src/main/java/org/thingsboard/gateway/service/MqttMessageReceiver.java diff --git a/src/main/java/org/thingsboard/gateway/service/MessageFuturePair.java b/src/main/java/org/thingsboard/gateway/service/MessageFuturePair.java new file mode 100644 index 000000000..889a43852 --- /dev/null +++ b/src/main/java/org/thingsboard/gateway/service/MessageFuturePair.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.gateway.service; + +import io.netty.util.concurrent.Future; +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * Created by Valerii Sosliuk on 3/10/2018. + */ +@Data +@AllArgsConstructor +public class MessageFuturePair { + + Future future; + MqttPersistentMessage message; +} diff --git a/src/main/java/org/thingsboard/gateway/service/MqttGatewayService.java b/src/main/java/org/thingsboard/gateway/service/MqttGatewayService.java index e70aa4031..640c8473c 100644 --- a/src/main/java/org/thingsboard/gateway/service/MqttGatewayService.java +++ b/src/main/java/org/thingsboard/gateway/service/MqttGatewayService.java @@ -105,15 +105,18 @@ public class MqttGatewayService implements GatewayService, MqttCallback, MqttCli private ScheduledExecutorService scheduler; private ExecutorService mqttSenderExecutor; + private ExecutorService mqttReceiverExecutor; private ExecutorService callbackExecutor = Executors.newCachedThreadPool(); private Map pendingAttrRequestsMap = new ConcurrentHashMap<>(); @PostConstruct public void init() throws Exception { + BlockingQueue incomingQueue = new LinkedBlockingQueue<>(); initTimeouts(); initMqttClient(); - initMqttSender(); + initMqttSender(incomingQueue); + initMqttReceiver(incomingQueue); scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(this::reportStats, 0, reporting.getInterval(), TimeUnit.MILLISECONDS); } @@ -439,9 +442,14 @@ private void onDeviceAttributesResponse(MqttMessage message) { }); } - private void initMqttSender() { + private void initMqttSender(BlockingQueue incomingQueue) { mqttSenderExecutor = Executors.newSingleThreadExecutor(); - mqttSenderExecutor.submit(new MqttMessageSender(persistence, connection, tbClient, persistentFileService)); + mqttSenderExecutor.submit(new MqttMessageSender(persistence, connection, tbClient, persistentFileService, incomingQueue)); + } + + private void initMqttReceiver(BlockingQueue incomingQueue) { + mqttReceiverExecutor = Executors.newSingleThreadExecutor(); + mqttReceiverExecutor.submit(new MqttMessageReceiver(persistentFileService, incomingQueue, connection.getIncomingQueueWarningThreshold())); } private static String toString(Exception e) { diff --git a/src/main/java/org/thingsboard/gateway/service/MqttMessageReceiver.java b/src/main/java/org/thingsboard/gateway/service/MqttMessageReceiver.java new file mode 100644 index 000000000..113f19d3a --- /dev/null +++ b/src/main/java/org/thingsboard/gateway/service/MqttMessageReceiver.java @@ -0,0 +1,81 @@ +/** + * Copyright © 2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.gateway.service; + +import io.netty.util.concurrent.Future; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.function.Consumer; + +/** + * Created by Valerii Sosliuk on 3/10/2018. + */ +@Slf4j +public class MqttMessageReceiver implements Runnable { + + private static final int INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD = 10000; + + private PersistentFileService persistentFileService; + private BlockingQueue incomingQueue; + + private Consumer defaultSuccessCallback = message -> log.debug("Successfully sent message: [{}]", message); + private Consumer defaultFailureCallback = e -> log.warn("Failed to send message: [{}]", e); + + private int incomingQueueWarningThreshold; + + public MqttMessageReceiver(PersistentFileService persistentFileService, + BlockingQueue incomingQueue, + int warningThreshold) { + this.persistentFileService = persistentFileService; + this.incomingQueue = incomingQueue; + this.incomingQueueWarningThreshold = warningThreshold == 0 ? INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD : warningThreshold; + } + + @Override + public void run() { + while (true) { + checkIncomingQueueSize(); + try { + MessageFuturePair messageFuturePair = incomingQueue.take(); + Future future = messageFuturePair.getFuture(); + MqttPersistentMessage message = messageFuturePair.getMessage(); + if (future.isSuccess()) { + Consumer successCallback = persistentFileService.getSuccessCallback(message.getId()).orElse(defaultSuccessCallback); + successCallback.accept(null); + persistentFileService.resolveFutureSuccess(message.getId()); + } else { + persistentFileService.saveForResend(message); + persistentFileService.getFailureCallback(message.getId()).orElse(defaultFailureCallback).accept(future.cause()); + persistentFileService.resolveFutureFailed(message.getId(), future.cause()); + log.warn("Failed to send message [{}] due to [{}]", message, future.cause()); + } + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + Thread.currentThread().interrupt(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + } + } + + private void checkIncomingQueueSize() { + if (incomingQueue.size() > INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD) { + log.warn("Incoming queue has [{}] messages which is more than thee specified threshold of [{}]", incomingQueue.size()); + } + } +} diff --git a/src/main/java/org/thingsboard/gateway/service/MqttMessageSender.java b/src/main/java/org/thingsboard/gateway/service/MqttMessageSender.java index e86839a48..6fe146460 100644 --- a/src/main/java/org/thingsboard/gateway/service/MqttMessageSender.java +++ b/src/main/java/org/thingsboard/gateway/service/MqttMessageSender.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; /** @@ -45,19 +46,19 @@ public class MqttMessageSender implements Runnable { private final TbConnectionConfiguration connection; + private Queue incomingQueue; private Queue> outgoingQueue; - private Consumer defaultSuccessCallback = message -> log.debug("Successfully sent message: [{}]", message); - private Consumer defaultFailureCallback = e -> log.warn("Failed to send message: [{}]", e); - public MqttMessageSender(TbPersistenceConfiguration persistence, TbConnectionConfiguration connection, MqttClient tbClient, - PersistentFileService persistentFileService) { + PersistentFileService persistentFileService, + Queue incomingQueue) { this.persistence = persistence; this.connection = connection; this.tbClient = tbClient; this.persistentFileService = persistentFileService; + this.incomingQueue = incomingQueue; outgoingQueue = new ConcurrentLinkedQueue(); } @@ -101,18 +102,7 @@ public void run() { private Future publishMqttMessage(MqttPersistentMessage message) { return tbClient.publish(message.getTopic(), Unpooled.wrappedBuffer(message.getPayload()), MqttQoS.AT_LEAST_ONCE).addListener( - future -> { - if (future.isSuccess()) { - Consumer successCallback = persistentFileService.getSuccessCallback(message.getId()).orElse(defaultSuccessCallback); - successCallback.accept(null); - persistentFileService.resolveFutureSuccess(message.getId()); - } else { - persistentFileService.saveForResend(message); - persistentFileService.getFailureCallback(message.getId()).orElse(defaultFailureCallback).accept(future.cause()); - persistentFileService.resolveFutureFailed(message.getId(), future.cause()); - log.warn("Failed to send message [{}] due to [{}]", message, future.cause()); - } - } + future -> incomingQueue.add(new MessageFuturePair(future, message)) ); } @@ -142,7 +132,11 @@ private boolean checkClientConnected() throws InterruptedException { outgoingQueue.clear(); log.info("ThingsBoard MQTT connection failed. Reconnecting in [{}] milliseconds", connection.getRetryInterval()); Thread.sleep(connection.getRetryInterval()); - tbClient.reconnect(); + try { + tbClient.reconnect().get(); + } catch (ExecutionException e) { + log.error(e.getMessage(), e); + } return false; } return true; diff --git a/src/main/java/org/thingsboard/gateway/service/conf/TbConnectionConfiguration.java b/src/main/java/org/thingsboard/gateway/service/conf/TbConnectionConfiguration.java index d0cf7ad73..b7a3211a7 100644 --- a/src/main/java/org/thingsboard/gateway/service/conf/TbConnectionConfiguration.java +++ b/src/main/java/org/thingsboard/gateway/service/conf/TbConnectionConfiguration.java @@ -34,6 +34,7 @@ public class TbConnectionConfiguration { private long connectionTimeout; private int maxInFlight; private int maxQueueSize; + private int incomingQueueWarningThreshold; private MqttGatewaySecurityConfiguration security; }