Skip to content

Commit

Permalink
MqttMessageReceiver added
Browse files Browse the repository at this point in the history
  • Loading branch information
mp-loki committed Mar 10, 2018
1 parent e4c235b commit b8ca1d1
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright © 2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.gateway.service;

import io.netty.util.concurrent.Future;
import lombok.AllArgsConstructor;
import lombok.Data;

/**
* Created by Valerii Sosliuk on 3/10/2018.
*/
@Data
@AllArgsConstructor
public class MessageFuturePair {

Future<? super Void> future;
MqttPersistentMessage message;
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,18 @@ public class MqttGatewayService implements GatewayService, MqttCallback, MqttCli

private ScheduledExecutorService scheduler;
private ExecutorService mqttSenderExecutor;
private ExecutorService mqttReceiverExecutor;
private ExecutorService callbackExecutor = Executors.newCachedThreadPool();

private Map<AttributeRequestKey, AttributeRequestListener> pendingAttrRequestsMap = new ConcurrentHashMap<>();

@PostConstruct
public void init() throws Exception {
BlockingQueue<MessageFuturePair> incomingQueue = new LinkedBlockingQueue<>();
initTimeouts();
initMqttClient();
initMqttSender();
initMqttSender(incomingQueue);
initMqttReceiver(incomingQueue);
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::reportStats, 0, reporting.getInterval(), TimeUnit.MILLISECONDS);
}
Expand Down Expand Up @@ -439,9 +442,14 @@ private void onDeviceAttributesResponse(MqttMessage message) {
});
}

private void initMqttSender() {
private void initMqttSender(BlockingQueue<MessageFuturePair> incomingQueue) {
mqttSenderExecutor = Executors.newSingleThreadExecutor();
mqttSenderExecutor.submit(new MqttMessageSender(persistence, connection, tbClient, persistentFileService));
mqttSenderExecutor.submit(new MqttMessageSender(persistence, connection, tbClient, persistentFileService, incomingQueue));
}

private void initMqttReceiver(BlockingQueue<MessageFuturePair> incomingQueue) {
mqttReceiverExecutor = Executors.newSingleThreadExecutor();
mqttReceiverExecutor.submit(new MqttMessageReceiver(persistentFileService, incomingQueue, connection.getIncomingQueueWarningThreshold()));
}

private static String toString(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Copyright © 2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.gateway.service;

import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;

/**
* Created by Valerii Sosliuk on 3/10/2018.
*/
@Slf4j
public class MqttMessageReceiver implements Runnable {

private static final int INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD = 10000;

private PersistentFileService persistentFileService;
private BlockingQueue<MessageFuturePair> incomingQueue;

private Consumer<Void> defaultSuccessCallback = message -> log.debug("Successfully sent message: [{}]", message);
private Consumer<Throwable> defaultFailureCallback = e -> log.warn("Failed to send message: [{}]", e);

private int incomingQueueWarningThreshold;

public MqttMessageReceiver(PersistentFileService persistentFileService,
BlockingQueue<MessageFuturePair> incomingQueue,
int warningThreshold) {
this.persistentFileService = persistentFileService;
this.incomingQueue = incomingQueue;
this.incomingQueueWarningThreshold = warningThreshold == 0 ? INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD : warningThreshold;
}

@Override
public void run() {
while (true) {
checkIncomingQueueSize();
try {
MessageFuturePair messageFuturePair = incomingQueue.take();
Future<?> future = messageFuturePair.getFuture();
MqttPersistentMessage message = messageFuturePair.getMessage();
if (future.isSuccess()) {
Consumer<Void> successCallback = persistentFileService.getSuccessCallback(message.getId()).orElse(defaultSuccessCallback);
successCallback.accept(null);
persistentFileService.resolveFutureSuccess(message.getId());
} else {
persistentFileService.saveForResend(message);
persistentFileService.getFailureCallback(message.getId()).orElse(defaultFailureCallback).accept(future.cause());
persistentFileService.resolveFutureFailed(message.getId(), future.cause());
log.warn("Failed to send message [{}] due to [{}]", message, future.cause());
}
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
Thread.currentThread().interrupt();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}

private void checkIncomingQueueSize() {
if (incomingQueue.size() > INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD) {
log.warn("Incoming queue has [{}] messages which is more than thee specified threshold of [{}]", incomingQueue.size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

/**
Expand All @@ -45,19 +46,19 @@ public class MqttMessageSender implements Runnable {

private final TbConnectionConfiguration connection;

private Queue<MessageFuturePair> incomingQueue;
private Queue<Future<Void>> outgoingQueue;

private Consumer<Void> defaultSuccessCallback = message -> log.debug("Successfully sent message: [{}]", message);
private Consumer<Throwable> defaultFailureCallback = e -> log.warn("Failed to send message: [{}]", e);

public MqttMessageSender(TbPersistenceConfiguration persistence,
TbConnectionConfiguration connection,
MqttClient tbClient,
PersistentFileService persistentFileService) {
PersistentFileService persistentFileService,
Queue<MessageFuturePair> incomingQueue) {
this.persistence = persistence;
this.connection = connection;
this.tbClient = tbClient;
this.persistentFileService = persistentFileService;
this.incomingQueue = incomingQueue;
outgoingQueue = new ConcurrentLinkedQueue();
}

Expand Down Expand Up @@ -101,18 +102,7 @@ public void run() {

private Future<Void> publishMqttMessage(MqttPersistentMessage message) {
return tbClient.publish(message.getTopic(), Unpooled.wrappedBuffer(message.getPayload()), MqttQoS.AT_LEAST_ONCE).addListener(
future -> {
if (future.isSuccess()) {
Consumer<Void> successCallback = persistentFileService.getSuccessCallback(message.getId()).orElse(defaultSuccessCallback);
successCallback.accept(null);
persistentFileService.resolveFutureSuccess(message.getId());
} else {
persistentFileService.saveForResend(message);
persistentFileService.getFailureCallback(message.getId()).orElse(defaultFailureCallback).accept(future.cause());
persistentFileService.resolveFutureFailed(message.getId(), future.cause());
log.warn("Failed to send message [{}] due to [{}]", message, future.cause());
}
}
future -> incomingQueue.add(new MessageFuturePair(future, message))
);
}

Expand Down Expand Up @@ -142,7 +132,11 @@ private boolean checkClientConnected() throws InterruptedException {
outgoingQueue.clear();
log.info("ThingsBoard MQTT connection failed. Reconnecting in [{}] milliseconds", connection.getRetryInterval());
Thread.sleep(connection.getRetryInterval());
tbClient.reconnect();
try {
tbClient.reconnect().get();
} catch (ExecutionException e) {
log.error(e.getMessage(), e);
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class TbConnectionConfiguration {
private long connectionTimeout;
private int maxInFlight;
private int maxQueueSize;
private int incomingQueueWarningThreshold;
private MqttGatewaySecurityConfiguration security;

}

0 comments on commit b8ca1d1

Please sign in to comment.