Skip to content

Commit

Permalink
Merge pull request #169 from GPrathap/IoTS-1.0.0-M3
Browse files Browse the repository at this point in the history
changed the way server address is taken
  • Loading branch information
ruwany committed Jan 25, 2016
2 parents c1b2195 + 86530bc commit bd341c8
Show file tree
Hide file tree
Showing 16 changed files with 199 additions and 292 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.apimgt.annotations.api.API;
import org.wso2.carbon.apimgt.annotations.device.DeviceType;
import org.wso2.carbon.apimgt.annotations.device.feature.Feature;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.DeviceValidator;
Expand All @@ -29,13 +28,8 @@
import org.wso2.carbon.device.mgt.iot.droneanalyzer.plugin.controller.DroneController;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.plugin.controller.impl.DroneControllerImpl;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -57,11 +51,10 @@ public class DroneControllerService {
--------------------------------------------------------------------------------------- */
@Path("controller/register/{owner}/{deviceId}/{ip}/{port}")
@POST
public String registerDeviceIP(@PathParam("owner") String owner, @PathParam("deviceId") String deviceId,
public Response registerDeviceIP(@PathParam("owner") String owner, @PathParam("deviceId") String deviceId,
@PathParam("ip") String deviceIP,
@PathParam("port") String devicePort,
@Context HttpServletResponse response,
@Context HttpServletRequest request) {
@Context HttpServletResponse response) {
String result;
log.info("Got register call from IP: " + deviceIP + " for Device ID: " + deviceId + " of owner: " + owner);
String deviceHttpEndpoint = deviceIP + ":" + devicePort;
Expand All @@ -72,17 +65,16 @@ public String registerDeviceIP(@PathParam("owner") String owner, @PathParam("dev
log.debug(result);
}
log.info(owner + deviceId + deviceIP + devicePort );
return result;

return Response.ok(Response.Status.OK.getStatusCode()).build();
}

@Path("controller/send_command")
@POST
@Feature( code="send_command", name="Send Command", type="operation",
description="Send Commands to Drone")
/*@Feature( code="send_command", name="Send Command", type="operation",
description="Send Commands to Drone")*/
public Response droneController(@HeaderParam("owner") String owner, @HeaderParam("deviceId") String deviceId,
@QueryParam("action") String action, @QueryParam("duration") String duration,
@QueryParam("speed") String speed){
@FormParam("action") String action, @FormParam("duration") String duration,
@FormParam("speed") String speed){
try {
DeviceValidator deviceValidator = new DeviceValidator();
if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public class DroneRealTimeService {
public DroneRealTimeService() {
messageController = new MessageTransformer();
xmppConnector = new DroneAnalyzerXMPPConnector(messageController);

if (XmppConfig.getInstance().isEnabled()){
if (!XmppConfig.getInstance().isEnabled()){
xmppConnector.connect();
} else {
log.warn("XMPP disabled in 'devicemgt-config.xml'. Hence, DroneAnalyzerXMPPConnector not started.");
Expand All @@ -58,24 +57,23 @@ public void onOpen(Session session){

@OnMessage
public void onMessage(String message, Session session){
try {
while(true){
if((messageController !=null) && (!messageController.isEmptyQueue())){
String message1 = messageController.getMessage();
session.getBasicRemote().sendText(message1);
try{
if((messageController !=null) && (!messageController.isEmptyQueue())){
String message1 = messageController.getMessage();
session.getBasicRemote().sendText(message1);
}
Thread.sleep(DroneConstants.MINIMUM_TIME_DURATION);
} catch (IOException ex) {
log.error(ex.getMessage() + "\n" + ex);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
Thread.sleep(DroneConstants.MINIMUM_TIME_DURATION);
}
} catch (IOException ex) {
log.error(ex.getMessage() + "\n" + ex);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}

@OnClose
public void onClose(Session session){

try {
xmppConnector.disconnect();
log.info("XMPP connection is disconnected");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,29 @@ public class DroneAnalyzerXMPPConnector extends XMPPTransportHandler {
private static Log log = LogFactory.getLog(DroneAnalyzerXMPPConnector.class);

private static String xmppServerIP;
private static int xmppServerPort;
private static String xmppAdminUsername;
private static String xmppAdminPassword;
private static String xmppAdminAccountJID;
private MessageTransformer messageTransformer;

private ScheduledFuture<?> connectorServiceHandler;
private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

public DroneAnalyzerXMPPConnector(MessageTransformer messageTransformer) {
super(XmppConfig.getInstance().getXmppServerIP(),
XmppConfig.getInstance().getSERVER_CONNECTION_PORT());
super(XmppConfig.getInstance().getXmppServerIP(),XmppConfig.getInstance().getSERVER_CONNECTION_PORT());
this.messageTransformer = messageTransformer;

}

@Override
public void connect() {
Runnable connector = new Runnable() {
@Override
public void run() {
if (!isConnected()) {
try {
initConnector();
connectToServer();
loginToServer(xmppAdminUsername, xmppAdminPassword, null);
setFilterOnReceiver(DroneConstants.DEVICE_ID+ "@" + xmppServerIP);

setFilterOnReceiver(xmppAdminAccountJID);
} catch (TransportHandlerException e) {
if (log.isDebugEnabled()) {
log.warn("Connection/Login to XMPP server at: " + server + " as " +
Expand All @@ -84,18 +80,28 @@ public void initConnector() {

@Override
public void processIncomingMessage(Message message) throws TransportHandlerException {
String from = message.getFrom();
String subject = message.getSubject();
String inbound_message = message.getBody();
int indexOfAt = from.indexOf("@");
int indexOfSlash = from.indexOf("/");
String deviceId = from.substring(0, indexOfAt);
String resource = from.substring(indexOfSlash + 1, from.length());

if ((inbound_message != null)&&(resource.equals(DroneConstants.MESSAGE_RESOURCE)) ){
messageTransformer.messageTranslater(inbound_message);
} else {
log.error("Message is empty or it is not belongs to "+ DroneConstants.DEVICE_ID);
try{
String from = message.getFrom();
String inbound_message = message.getBody();
int indexOfSlash = from.indexOf("/");
if(indexOfSlash==0){
if(log.isDebugEnabled()){
log.debug("Required resource not available.");
}
}else{
String resource = from.substring(indexOfSlash + 1, from.length());
if ((inbound_message != null)&&(resource.equals(DroneConstants.MESSAGE_RESOURCE)) ){
messageTransformer.messageTranslater(inbound_message);
} else {
if(log.isDebugEnabled()){
log.debug("Message is empty or it is not belongs to " + xmppAdminUsername);
}
}
}
}catch(ArrayIndexOutOfBoundsException e){
log.error("Wrong message format: input message", e);
}catch(RuntimeException e){
log.error("Unexpected error has been occurred, ", e);
}
}

Expand All @@ -117,18 +123,15 @@ public void run() {
log.warn("Unable to 'STOP' connection to XMPP server at: " + server +
" for user - " + xmppAdminUsername);
}

try {
Thread.sleep(timeoutInterval);
} catch (InterruptedException e1) {
log.error("XMPP-Terminator: Thread Sleep Interrupt Exception for "
+ DroneConstants.DEVICE_TYPE + " type.", e1);
}

}
}
};

Thread terminatorThread = new Thread(stopConnection);
terminatorThread.setDaemon(true);
terminatorThread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,72 +35,73 @@ public class MessageTransformer {
private Log log = LogFactory.getLog(MessageTransformer.class);
private CircularFifoQueue<String> sharedQueue;

private String outbound_message_format_for_simulator = "{\"quatanium_val\":[%f, %f, %f, %f]," +
"\"basicParam\":{\"velocity\":[%f, %f, %f], \"global_location\":[%f, %f, %f]},\"battery_level\":%f, \"device_type\":\"IRIS_DRONE\"}";
private String outbound_message_format_for_iris_drone = "{\"quatanium_val\":[%f, %f, %f]," +
private String outboundMessageFormatForSimulator = "{\"quatanium_val\":[%f, %f, %f, %f]," +
"\"basicParam\":{\"velocity\":[%f, %f, %f], \"global_location\":[%f, %f, %f]},\"battery_level\":%f, \"device_type\":\"SIMULATOR\"}";
private String outboundMessageFormatForIrisDrone = "{\"quatanium_val\":[%f, %f, %f]," +
"\"basicParam\":{\"velocity\":[%f, %f, %f], \"global_location\":[%f, %f, %f]},\"battery_level\":%f," +
"\"device_type\":\"SIMULATOR\"}";
"\"device_type\":\"IRIS_DRONE\"}";

public MessageTransformer(){
sharedQueue = new CircularFifoQueue<String>(DroneConstants.MAXIMUM_BUFFERE_SIZE_OF_SHARED_QUEUE);
}

private void messageTranslaterForSimulator(JsonNode inbound_message){
JsonNode node = inbound_message;
String outbound_message;

String outboundMessage;
try {
JsonNode velocity = node.get(MessageConfig.OUT_BASIC_PARAM_VAL).get(MessageConfig.OUT_BASIC_PARAM_VELOCITY);
JsonNode global_location = node.get(MessageConfig.OUT_BASIC_PARAM_VAL).get(
JsonNode globalLocation = node.get(MessageConfig.OUT_BASIC_PARAM_VAL).get(
MessageConfig.OUT_BASIC_PARAM_GLOBAL_LOCATION);
JsonNode quatanium_vals = node.get(MessageConfig.OUT_QUATANNIM_VAL);
JsonNode battery_level = node.get(MessageConfig.OUT_BATTERY_LEVEL);
outbound_message = String.format(outbound_message_format_for_simulator, sTd(quatanium_vals.get(0)),
sTd(quatanium_vals.get(1)), sTd(quatanium_vals.get(2)), sTd(quatanium_vals.get(0)),
sTd(velocity.get(0)), sTd(velocity.get(1)), sTd(velocity.get(2)), sTd(global_location.get(0)),
sTd(global_location.get(1)), sTd(global_location.get(2)), sTd(battery_level));
sharedQueue.add(outbound_message);
JsonNode quataniumVals = node.get(MessageConfig.OUT_QUATANNIM_VAL);
JsonNode batteryLevel = node.get(MessageConfig.OUT_BATTERY_LEVEL);
outboundMessage = String.format(outboundMessageFormatForSimulator, sTd(quataniumVals.get(0)),
sTd(quataniumVals.get(1)), sTd(quataniumVals.get(2)), sTd(quataniumVals.get(0)),
sTd(velocity.get(0)), sTd(velocity.get(1)), sTd(velocity.get(2)), sTd(globalLocation.get(0)),
sTd(globalLocation.get(1)), sTd(globalLocation.get(2)), sTd(batteryLevel));
sharedQueue.add(outboundMessage);
} catch (Exception e) {
log.error(e.getMessage()+",\n"+ e);
}
}

private void messageTranslaterForIRISDrone(JsonNode inbound_message){
JsonNode node = inbound_message;
String outbound_message;
String outboundMessage;
try {

JsonNode velocity = node.get(MessageConfig.OUT_BASIC_PARAM_VAL).get(MessageConfig.OUT_BASIC_PARAM_VELOCITY);
JsonNode global_location = node.get(MessageConfig.OUT_BASIC_PARAM_VAL).get(
JsonNode globalLocation = node.get(MessageConfig.OUT_BASIC_PARAM_VAL).get(
MessageConfig.OUT_BASIC_PARAM_GLOBAL_LOCATION);
JsonNode quatanium_vals = node.get(MessageConfig.OUT_QUATANNIM_VAL);
JsonNode battery_level = node.get(MessageConfig.OUT_BATTERY_LEVEL);
outbound_message = String.format(outbound_message_format_for_iris_drone, sTd(quatanium_vals.get(0)),
sTd(quatanium_vals.get(1)), sTd(quatanium_vals.get(2)), sTd(velocity.get(0)),
sTd(velocity.get(1)), sTd(velocity.get(2)), sTd(global_location.get(0)),
sTd(global_location.get(1)), sTd(global_location.get(2)), sTd(battery_level));
sharedQueue.add(outbound_message);
JsonNode quataniumVals = node.get(MessageConfig.OUT_QUATANNIM_VAL);
JsonNode batteryLevel = node.get(MessageConfig.OUT_BATTERY_LEVEL);
outboundMessage = String.format(outboundMessageFormatForIrisDrone, sTd(quataniumVals.get(0)),
sTd(quataniumVals.get(1)), sTd(quataniumVals.get(2)), sTd(velocity.get(0)),
sTd(velocity.get(1)), sTd(velocity.get(2)), sTd(globalLocation.get(0)),
sTd(globalLocation.get(1)), sTd(globalLocation.get(2)), sTd(batteryLevel));
sharedQueue.add(outboundMessage);

}catch (Exception e) {
log.error(e.getMessage()+",\n"+ e);
}
}

public void messageTranslater(String inbound_message){
JsonNode actualMessage = null;
JsonNode actualMessage;
ObjectMapper objectMapper = new ObjectMapper();

try {
actualMessage = objectMapper.readValue(inbound_message, JsonNode.class);
JsonNode deviceType = actualMessage.get(MessageConfig.IN_DEVICE_TYPE);
switch (deviceType.getTextValue()) {

case MessageConfig.IN_IRIS_DRONE:
messageTranslaterForIRISDrone(actualMessage);
break;
case MessageConfig.IN_SIMULATOR:
messageTranslaterForSimulator(actualMessage);
break;
default:
if(log.isDebugEnabled()){
log.debug("Wrong message format");
}
}
} catch (JsonProcessingException e) {
log.error("Incoming message might be corrupted, "+ e);
Expand Down
Loading

0 comments on commit bd341c8

Please sign in to comment.