Skip to content

Commit

Permalink
upstream changes
Browse files Browse the repository at this point in the history
  • Loading branch information
NuwanSameera committed Jan 27, 2016
2 parents 637e0ef + 55d97dd commit 41de719
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
Expand All @@ -24,18 +24,26 @@
import org.wso2.carbon.apimgt.annotations.api.API;
import org.wso2.carbon.apimgt.annotations.device.DeviceType;
import org.wso2.carbon.apimgt.annotations.device.feature.Feature;
import org.wso2.carbon.device.mgt.iot.arduino.plugin.constants.ArduinoConstants;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.arduino.controller.service.impl.dto.DeviceJSON;
import org.wso2.carbon.device.mgt.iot.arduino.controller.service.impl.transport.ArduinoMQTTConnector;
import org.wso2.carbon.device.mgt.iot.arduino.controller.service.impl.util.ArduinoServiceUtils;
import org.wso2.carbon.device.mgt.iot.arduino.plugin.constants.ArduinoConstants;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorRecord;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.*;
import javax.ws.rs.Consumes;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
Expand All @@ -50,34 +58,41 @@
@DeviceType( value = "arduino")
public class ArduinoControllerService {

private static Log log = LogFactory.getLog(ArduinoControllerService.class);

public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT";
//TODO; replace this tenant domain
private static final String SUPER_TENANT = "carbon.super";

private static Log log = LogFactory.getLog(ArduinoControllerService.class);
private static Map<String, LinkedList<String>> replyMsgQueue = new HashMap<>();
private static Map<String, LinkedList<String>> internalControlsQueue = new HashMap<>();
@Context //injected response proxy supporting multiple thread
private HttpServletResponse response;

public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT";

private ArduinoMQTTConnector arduinoMQTTConnector;
private static Map<String, LinkedList<String>> replyMsgQueue = new HashMap<>();
private static Map<String, LinkedList<String>> internalControlsQueue = new HashMap<>();
private ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<>();

/**
* @param arduinoMQTTConnector an object of type "ArduinoMQTTConnector" specific for this ArduinoControllerService
* @return the queue containing all the MQTT reply messages from all Arduinos communicating to this service
*/
@SuppressWarnings("unused")
public void setArduinoMQTTConnector(
final ArduinoMQTTConnector arduinoMQTTConnector) {
this.arduinoMQTTConnector = arduinoMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
arduinoMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, ArduinoMQTTConnector not started.");
public static Map<String, LinkedList<String>> getReplyMsgQueue() {
return replyMsgQueue;
}

/**
* @return the queue containing all the MQTT controls received to be sent to any Arduinos connected to this server
*/
public static Map<String, LinkedList<String>> getInternalControlsQueue() {
return internalControlsQueue;
}

private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}

/**
Expand All @@ -89,17 +104,27 @@ public ArduinoMQTTConnector getArduinoMQTTConnector() {
}

/**
* @return the queue containing all the MQTT reply messages from all Arduinos communicating to this service
*/
public static Map<String, LinkedList<String>> getReplyMsgQueue() {
return replyMsgQueue;
}

/**
* @return the queue containing all the MQTT controls received to be sent to any Arduinos connected to this server
* @param arduinoMQTTConnector an object of type "ArduinoMQTTConnector" specific for this ArduinoControllerService
*/
public static Map<String, LinkedList<String>> getInternalControlsQueue() {
return internalControlsQueue;
@SuppressWarnings("unused")
public void setArduinoMQTTConnector(
final ArduinoMQTTConnector arduinoMQTTConnector) {
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
ArduinoControllerService.this.arduinoMQTTConnector = arduinoMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
arduinoMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, ArduinoMQTTConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}

/* ---------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,58 +88,60 @@ public void run() {

@Override
public void processIncomingMessage(MqttMessage message, String... messageParams) throws TransportHandlerException {
String topic = messageParams[0];
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
String ownerAndId = topic.replace(serverName + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + ArduinoConstants.DEVICE_TYPE + File.separator, ":");
if(messageParams.length != 0) {
String topic = messageParams[0];
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
String ownerAndId = topic.replace(serverName + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + ArduinoConstants.DEVICE_TYPE + File.separator, ":");

String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];

if (log.isDebugEnabled()) {
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
}
if (log.isDebugEnabled()) {
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
}

int lastIndex = message.toString().lastIndexOf(":");
String msgContext = message.toString().substring(lastIndex + 1);
int lastIndex = message.toString().lastIndexOf(":");
String msgContext = message.toString().substring(lastIndex + 1);

LinkedList<String> deviceControlList;
LinkedList<String> replyMessageList;
LinkedList<String> deviceControlList;
LinkedList<String> replyMessageList;

if (msgContext.equals(MESSAGE_TO_SEND) || msgContext.equals(ArduinoConstants.STATE_ON) || msgContext.equals(
ArduinoConstants.STATE_OFF)) {
if (msgContext.equals(MESSAGE_TO_SEND) || msgContext.equals(ArduinoConstants.STATE_ON) || msgContext.equals(
ArduinoConstants.STATE_OFF)) {

if (log.isDebugEnabled()) {
log.debug("Received a control message: ");
log.debug("Control message topic: " + topic);
log.debug("Control message: " + message.toString());
}
if (log.isDebugEnabled()) {
log.debug("Received a control message: ");
log.debug("Control message topic: " + topic);
log.debug("Control message: " + message.toString());
}

synchronized (ArduinoControllerService.getInternalControlsQueue()) {
deviceControlList = ArduinoControllerService.getInternalControlsQueue().get(deviceId);
if (deviceControlList == null) {
ArduinoControllerService.getInternalControlsQueue()
.put(deviceId, deviceControlList = new LinkedList<String>());
synchronized (ArduinoControllerService.getInternalControlsQueue()) {
deviceControlList = ArduinoControllerService.getInternalControlsQueue().get(deviceId);
if (deviceControlList == null) {
ArduinoControllerService.getInternalControlsQueue()
.put(deviceId, deviceControlList = new LinkedList<String>());
}
}
}
deviceControlList.add(message.toString());
deviceControlList.add(message.toString());

} else if (msgContext.equals(MESSAGE_RECEIVED)) {
} else if (msgContext.equals(MESSAGE_RECEIVED)) {

if (log.isDebugEnabled()) {
log.debug("Received reply from a device: ");
log.debug("Reply message topic: " + topic);
log.debug("Reply message: " + message.toString().substring(0, lastIndex));
}
if (log.isDebugEnabled()) {
log.debug("Received reply from a device: ");
log.debug("Reply message topic: " + topic);
log.debug("Reply message: " + message.toString().substring(0, lastIndex));
}

synchronized (ArduinoControllerService.getReplyMsgQueue()) {
replyMessageList = ArduinoControllerService.getReplyMsgQueue().get(deviceId);
if (replyMessageList == null) {
ArduinoControllerService.getReplyMsgQueue()
.put(deviceId, replyMessageList = new LinkedList<>());
synchronized (ArduinoControllerService.getReplyMsgQueue()) {
replyMessageList = ArduinoControllerService.getReplyMsgQueue().get(deviceId);
if (replyMessageList == null) {
ArduinoControllerService.getReplyMsgQueue()
.put(deviceId, replyMessageList = new LinkedList<>());
}
}
replyMessageList.add(message.toString());
}
replyMessageList.add(message.toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
Expand All @@ -24,11 +24,12 @@
import org.wso2.carbon.apimgt.annotations.device.DeviceType;
import org.wso2.carbon.apimgt.annotations.device.feature.Feature;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.controller.api.exception.DigitalDisplayException;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.controller.api.util.DigitalDisplayMQTTConnector;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.plugin.constants.DigitalDisplayConstants;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;

import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.FormParam;
Expand All @@ -48,19 +49,41 @@ public class DigitalDisplayControllerService {

private static DigitalDisplayMQTTConnector digitalDisplayMQTTConnector;

private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}

public DigitalDisplayMQTTConnector getDigitalDisplayMQTTConnector() {
return DigitalDisplayControllerService.digitalDisplayMQTTConnector;
}

public void setDigitalDisplayMQTTConnector(
public void setDigitalDisplayMQTTConnector(final
DigitalDisplayMQTTConnector digitalDisplayMQTTConnector) {
DigitalDisplayControllerService.digitalDisplayMQTTConnector = digitalDisplayMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
digitalDisplayMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. " +
"Hence, DigitalDisplayMQTTConnector not started.");
}

Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
DigitalDisplayControllerService.digitalDisplayMQTTConnector = digitalDisplayMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
digitalDisplayMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. " +
"Hence, DigitalDisplayMQTTConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
Expand All @@ -11,20 +11,25 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.iot.droneanalyzer.controller.api.impl;


import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.plugin.constants.DroneConstants;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.controller.api.impl.transport.DroneAnalyzerXMPPConnector;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.controller.api.impl.trasformer.MessageTransformer;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.plugin.constants.DroneConstants;

import javax.websocket.*;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;

Expand All @@ -36,14 +41,35 @@ public class DroneRealTimeService {
private DroneAnalyzerXMPPConnector xmppConnector;

public DroneRealTimeService() {
messageController = new MessageTransformer();
xmppConnector = new DroneAnalyzerXMPPConnector(messageController);
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
messageController = new MessageTransformer();
xmppConnector = new DroneAnalyzerXMPPConnector(messageController);

if (XmppConfig.getInstance().isEnabled()) {
xmppConnector.connect();
} else {
log.warn("XMPP disabled in 'devicemgt-config.xml'. Hence, DroneAnalyzerXMPPConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}

if (XmppConfig.getInstance().isEnabled()){
xmppConnector.connect();
} else {
log.warn("XMPP disabled in 'devicemgt-config.xml'. Hence, DroneAnalyzerXMPPConnector not started.");
private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}

@OnOpen
Expand Down
Loading

0 comments on commit 41de719

Please sign in to comment.