Skip to content

Commit

Permalink
last time pushing multiple changes at once!!
Browse files Browse the repository at this point in the history
  • Loading branch information
NehaSelvan1512 committed Sep 24, 2023
1 parent ce068fc commit e3042b4
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,20 @@

package org.polypheny.db.mqtt;

public class PublishingMqttMessage {
import lombok.Getter;

MqttMessage msg;
public class FilteringMqttMessage {
private MqttMessage mqttMessage;
@Getter
private String query;
public FilteringMqttMessage( MqttMessage mqttMessage, String query ) {
this.mqttMessage = mqttMessage;
this.query = query;
}


public String getMessage() {
return mqttMessage.getMessage();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.collect.ImmutableList;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.io.File;
Expand Down Expand Up @@ -103,19 +102,19 @@ public void start() {
mqttDefaultSettings.put( "commonCollection", "false" );
mqttDefaultSettings.put( "commonCollectionName", "" );
mqttDefaultSettings.put( "Query Interface Name", "mqtt" );
QueryInterfaceManager.addInterfaceType( "mqtt", MqttStreamServer.class, mqttDefaultSettings );
QueryInterfaceManager.addInterfaceType( "mqtt", MqttStreamClient.class, mqttDefaultSettings );
}


@Override
public void stop() {
QueryInterfaceManager.removeInterfaceType( MqttStreamServer.class );
QueryInterfaceManager.removeInterfaceType( MqttStreamClient.class );
}


@Slf4j
@Extension
public static class MqttStreamServer extends QueryInterface {
public static class MqttStreamClient extends QueryInterface {

@SuppressWarnings("WeakerAccess")
public static final String INTERFACE_NAME = "MQTT Interface";
Expand Down Expand Up @@ -167,7 +166,7 @@ public static class MqttStreamServer extends QueryInterface {
private final MonitoringPage monitoringPage;


public MqttStreamServer( TransactionManager transactionManager, Authenticator authenticator, int ifaceId, String uniqueName, Map<String, String> settings ) {
public MqttStreamClient( TransactionManager transactionManager, Authenticator authenticator, int ifaceId, String uniqueName, Map<String, String> settings ) {
super( transactionManager, authenticator, ifaceId, uniqueName, settings, true, false );
// Add information page
this.monitoringPage = new MonitoringPage();
Expand Down Expand Up @@ -228,6 +227,7 @@ public void run() {
.identifier( getUniqueName() )
.serverHost( brokerAddress )
.serverPort( brokerPort )
.automaticReconnectWithDefaultConfig()
.sslConfig()
//TODO: delete or enter password from GUI password thinghere and in method
.keyManagerFactory( SslHelper.createKeyManagerFactory( "polyphenyClient.crt", "polyphenyClient.key", "" ) )
Expand All @@ -239,6 +239,7 @@ public void run() {
.identifier( getUniqueName() )
.serverHost( brokerAddress )
.serverPort( brokerPort )
.automaticReconnectWithDefaultConfig()
.buildAsync();
}

Expand Down Expand Up @@ -571,8 +572,8 @@ protected void processMsg( Mqtt5Publish subMsg ) {

String topic = subMsg.getTopic().toString();
String message = extractPayload( subMsg );
MqttMessage mqttMessage = new MqttMessage( message, topic );
addMessageToQueue( topic, message );
MqttMessage mqttMessage = new MqttMessage( message, topic );

String wildcardTopic = "";
if ( !topicsMap.containsKey( topic ) ) {
Expand All @@ -584,36 +585,38 @@ protected void processMsg( Mqtt5Publish subMsg ) {

if ( this.filterMap.containsKey( topic ) ) {
String filterQuery = this.filterMap.get( topic );
MqttStreamProcessor streamProcessor = new MqttStreamProcessor( mqttMessage, filterQuery, statement );
FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery );
MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, statement );
// false is returned when a message should not be stored in DB
if ( streamProcessor.applyFilter() ) {
insert( mqttMessage, transaction );
insertInEntity( mqttMessage, transaction );
}
} else if ( !wildcardTopic.isEmpty() && this.filterMap.containsKey( wildcardTopic ) ) {
String filterQuery = this.filterMap.get( wildcardTopic );
MqttStreamProcessor streamProcessor = new MqttStreamProcessor( mqttMessage, filterQuery, statement );
FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery );
MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, statement );
if ( streamProcessor.applyFilter() ) {
insert( mqttMessage, transaction );
insertInEntity( mqttMessage, transaction );
}
} else {
insert( mqttMessage, transaction );
insertInEntity( mqttMessage, transaction );
}
}


private void insert( MqttMessage mqttMessage, Transaction transaction ) {
ReceivedMqttMessage receivedMqttMessage;
private void insertInEntity( MqttMessage mqttMessage, Transaction transaction ) {
StoringMqttMessage storingMqttMessage;
synchronized ( settingsLock ) {
if ( !this.commonCollection.get() ) {
String collectionToBeSaved;
collectionToBeSaved = mqttMessage.getTopic().replace( '#', '_' ).replace( '+', '_' ).replace( '/', '_' );
receivedMqttMessage = new ReceivedMqttMessage( mqttMessage, this.namespaceName, getNamespaceId( this.namespaceName, this.namespaceType ), this.namespaceType, getUniqueName(), this.databaseId, this.userId, collectionToBeSaved );
storingMqttMessage = new StoringMqttMessage( mqttMessage, this.namespaceName, this.namespaceType, getUniqueName(), this.databaseId, this.userId, collectionToBeSaved );
} else {
receivedMqttMessage = new ReceivedMqttMessage( mqttMessage, this.namespaceName, getNamespaceId( this.namespaceName, this.namespaceType ), this.namespaceType, getUniqueName(), this.databaseId, this.userId, this.commonCollectionName );
storingMqttMessage = new StoringMqttMessage( mqttMessage, this.namespaceName, this.namespaceType, getUniqueName(), this.databaseId, this.userId, this.commonCollectionName );
}
}
StreamCapture streamCapture = new StreamCapture( transaction );
streamCapture.insert( receivedMqttMessage );
streamCapture.insert( storingMqttMessage );
}


Expand Down Expand Up @@ -694,9 +697,13 @@ private void createStreamCollection( String collectionName ) {
}
try {
List<DataStore> dataStores = new ArrayList<>();
//TODO: StreamCollection einbinden
DdlManager.getInstance().createCollection( namespaceID, collectionName, true, //only creates collection if it does not already exist.
dataStores.size() == 0 ? null : dataStores, PlacementType.MANUAL, statement );
DdlManager.getInstance().createCollection(
namespaceID,
collectionName,
true, //only creates collection if it does not already exist.
dataStores.size() == 0 ? null : dataStores,
PlacementType.MANUAL,
statement );
transaction.commit();
} catch ( EntityAlreadyExistsException | TransactionException e ) {
throw new RuntimeException( "Error while creating a new collection:", e );
Expand Down Expand Up @@ -935,19 +942,18 @@ public MonitoringPage() {

informationGroupPub = new InformationGroup( informationPage, "Publish a message" ).setOrder( 3 );
im.addGroup( informationGroupPub );
msgButton = new InformationAction( informationGroupPub, "Send a msg", ( parameters ) -> {
String end = "Msg was published!";
msgButton = new InformationAction( informationGroupPub, "Publish", ( parameters ) -> {
String end = "Message was published!";
try {
client.publishWith()
.topic( parameters.get( "topic" ) )
.payload( parameters.get( "msg" ).getBytes() )
.qos( MqttQos.AT_LEAST_ONCE )
.payload( parameters.get( "payload" ).getBytes() )
.send();
} catch ( IllegalArgumentException e ) {
throw new RuntimeException( e );
}
return end;
} ).withParameters( "topic", "msg" );
} ).withParameters( "topic", "payload" );
im.registerInformation( msgButton );

// Reconnection button
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ public class MqttStreamProcessor extends StreamProcessorImpl {
private final Statement statement;


public MqttStreamProcessor( MqttMessage mqttMessage, String filterQuery, Statement statement ) {
super( mqttMessage.getMessage() );
this.filterQuery = filterQuery;
public MqttStreamProcessor( FilteringMqttMessage filteringMqttMessage, Statement statement ) {
super( filteringMqttMessage.getMessage() );
this.filterQuery = filteringMqttMessage.getQuery();
this.statement = statement;
}


//TODO: in Tutorial schreiben, was allgemein für Strings gilt
public boolean applyFilter() {
AlgRoot root = processMqlQuery();
List<List<Object>> res = executeAndTransformPolyAlg( root, statement );
Expand Down Expand Up @@ -79,9 +78,9 @@ private AlgRoot processMqlQuery() {
} else if ( isNumber( msg ) ) {
double value = Double.parseDouble( msg );
msgDoc = new BsonDocument( "$$ROOT", new BsonDouble( value ) );
} else if ( isBoolean( msg ) ) {
/*} else if ( isBoolean( msg ) ) {
boolean value = Boolean.parseBoolean( msg );
msgDoc = new BsonDocument( "$$ROOT", new BsonBoolean( value ) );
msgDoc = new BsonDocument( "$$ROOT", new BsonBoolean( value ) );*/
} else {
// msg is String
msgDoc = new BsonDocument( "$$ROOT", new BsonString( msg ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,15 @@

package org.polypheny.db.mqtt;

import javax.management.Query;
import lombok.Getter;
import lombok.Setter;
import org.polypheny.db.catalog.Catalog.NamespaceType;
//TODO: javadoc
public class ReceivedMqttMessage {
public class StoringMqttMessage {

private final MqttMessage msg;
@Getter
private final String namespaceName;
@Getter
private final long namespaceId;
@Getter
private final NamespaceType namespaceType;
@Getter
private final String uniqueNameOfInterface;
Expand All @@ -37,33 +33,30 @@ public class ReceivedMqttMessage {
@Getter
private final int userId;
/**
* if MqttStreamServer.collectionPerTopic = TRUE, then collectionName is name of the topic or (if the subscribed topic
* if MqttStreamClient.collectionPerTopic = TRUE, then collectionName is name of the topic or (if the subscribed topic
* has wildcards) the wildcardTopic
* if MqttStreamServer.collectionPerTopic = FALSE, then collectionName is the name of the common collection
* if MqttStreamClient.collectionPerTopic = FALSE, then collectionName is the name of the common collection
*/
@Getter
private final String collectionName;
private final String entityName;

public ReceivedMqttMessage( MqttMessage msg, String namespaceName, long namespaceId, NamespaceType namespaceType, String uniqueNameOfInterface, long databaseId, int userId, String collectionName ) {
public StoringMqttMessage( MqttMessage msg, String namespaceName, NamespaceType namespaceType, String uniqueNameOfInterface, long databaseId, int userId, String entityName ) {
this.msg = msg;
this.namespaceName = namespaceName;
this.namespaceType = namespaceType;
//TODO: schauen, wo namespaceId gebraucht wird.
this.namespaceId = namespaceId;
this.uniqueNameOfInterface = uniqueNameOfInterface;
this.databaseId = databaseId;
this.userId = userId;
this.collectionName = collectionName;
}


public String getTopic() {
return this.msg.getTopic();
this.entityName = entityName;
}


public String getMessage() {
return this.msg.getMessage();
}

public String getTopic() {
return this.msg.getTopic();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@

import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonArray;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonDouble;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.polypheny.db.PolyImplementation;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.AlgRoot;
Expand All @@ -36,31 +41,48 @@ public class StreamCapture {

Transaction transaction;
PolyphenyHomeDirManager homeDirManager;
ReceivedMqttMessage receivedMqttMessage;
StoringMqttMessage storingMqttMessage;


StreamCapture( final Transaction transaction ) {
this.transaction = transaction;
}


public void insert( ReceivedMqttMessage receivedMqttMessage ) {
this.receivedMqttMessage = receivedMqttMessage;
public void insert( StoringMqttMessage storingMqttMessage ) {
this.storingMqttMessage = storingMqttMessage;
insertMessage();
}


private void insertMessage() {
String sqlCollectionName = this.receivedMqttMessage.getNamespaceName() + "." + this.receivedMqttMessage.getCollectionName();
String sqlCollectionName = this.storingMqttMessage.getNamespaceName() + "." + this.storingMqttMessage.getEntityName();
Statement statement = transaction.createStatement();

// Builder which allows to construct the algebra tree which is equivalent to query and is executed
AlgBuilder builder = AlgBuilder.createDocumentBuilder( statement );

BsonDocument document = new BsonDocument();
document.put( "source", new BsonString( this.receivedMqttMessage.getUniqueNameOfInterface() ) );
document.put( "topic", new BsonString( this.receivedMqttMessage.getTopic() ) );
document.put( "content", new BsonString( this.receivedMqttMessage.getMessage() ) );
document.put( "source", new BsonString( this.storingMqttMessage.getUniqueNameOfInterface() ) );
document.put( "topic", new BsonString( this.storingMqttMessage.getTopic() ) );
String msg = this.storingMqttMessage.getMessage();
BsonValue value;
if ( msg.contains( "{" ) && msg.contains( "}" ) ) {
value = BsonDocument.parse( msg );
} else if ( msg.contains( "[" ) && msg.contains( "]" ) ) {
BsonArray bsonArray = new BsonArray();
msg = msg.replace( "[", "" ).replace( "]", "" );
String[] msglist = msg.split( "," );
for ( String stringValue : msglist ) {
stringValue = stringValue.trim();
bsonArray.add( getBsonValue( stringValue ) );
}
value = bsonArray;
} else {
// msg is a single value
value = getBsonValue( msg );
}
document.put( "payload", value );

AlgNode algNode = builder.docInsert( statement, sqlCollectionName, document ).build();

Expand All @@ -75,6 +97,51 @@ private void insertMessage() {
}


/**
* turns one single value into the corresponding BsonValue
* @param value value that has to be casted as String
* @return
*/
protected BsonValue getBsonValue(String value) {
if ( isInteger( value ) ) {
return new BsonInt32(Integer.parseInt( value ) );
} else if ( isDouble( value ) ) {
return new BsonDouble(Double.parseDouble( value ) );
} else if ( isBoolean( value ) ) {
return new BsonBoolean( Boolean.parseBoolean( value ) );
} else {
return new BsonString( value );
}
}


public boolean isDouble( String value ) {
try {
Double.parseDouble( value );
} catch ( NumberFormatException e ) {
return false;
}
return true;
}


protected boolean isInteger( String value ) {
try {
int intNumber = Integer.parseInt( value );
double doubleNumber = Double.parseDouble( value );
return intNumber == doubleNumber;
} catch ( NumberFormatException e ) {
return false;
}
}


public boolean isBoolean( String value ) {
return value.equals( "true" ) || value.equals( "false" );
}



List<List<Object>> executeAndTransformPolyAlg( AlgRoot algRoot, Statement statement, final Context ctx ) {

try {
Expand Down
Loading

0 comments on commit e3042b4

Please sign in to comment.