Skip to content

Commit

Permalink
Debugged code while testing stream capture + mqttClient component
Browse files Browse the repository at this point in the history
  • Loading branch information
NehaSelvan1512 committed Sep 27, 2023
1 parent 5610f8f commit c359ddf
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import lombok.Getter;

public class FilteringMqttMessage {

private MqttMessage mqttMessage;
@Getter
private String query;


public FilteringMqttMessage( MqttMessage mqttMessage, String query ) {
this.mqttMessage = mqttMessage;
this.query = query;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,12 +620,12 @@ private void insertInEntity( MqttMessage mqttMessage, Transaction transaction )
}


private static String extractPayload( Mqtt5Publish subMsg ) {
protected static String extractPayload( Mqtt5Publish subMsg ) {
return new String( subMsg.getPayloadAsBytes(), Charset.defaultCharset() );
}


private String getWildcardTopic( String topic ) {
protected String getWildcardTopic( String topic ) {
for ( String t : topicsMap.keySet() ) {
//multilevel wildcard
if ( t.contains( "#" ) && topic.startsWith( t.substring( 0, t.indexOf( "#" ) ) ) ) {
Expand All @@ -641,7 +641,7 @@ private String getWildcardTopic( String topic ) {
}


private void addMessageToQueue( String topic, String message ) {
protected void addMessageToQueue( String topic, String message ) {
if ( this.messageQueue.size() >= 20 ) {
this.messageQueue.poll();
this.messageQueue.add( new String[]{ topic, message } );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import lombok.Getter;
import org.polypheny.db.catalog.Catalog.NamespaceType;

//TODO: javadoc
public class StoringMqttMessage {

Expand All @@ -40,6 +41,7 @@ public class StoringMqttMessage {
@Getter
private final String entityName;


public StoringMqttMessage( MqttMessage msg, String namespaceName, NamespaceType namespaceType, String uniqueNameOfInterface, long databaseId, int userId, String entityName ) {
this.msg = msg;
this.namespaceName = namespaceName;
Expand All @@ -55,6 +57,7 @@ public String getMessage() {
return this.msg.getMessage();
}


public String getTopic() {
return this.msg.getTopic();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,15 @@ private void insertMessage() {

/**
* turns one single value into the corresponding BsonValue
*
* @param value value that has to be casted as String
* @return
*/
protected BsonValue getBsonValue(String value) {
protected BsonValue getBsonValue( String value ) {
if ( isInteger( value ) ) {
return new BsonInt32(Integer.parseInt( value ) );
return new BsonInt32( Integer.parseInt( value ) );
} else if ( isDouble( value ) ) {
return new BsonDouble(Double.parseDouble( value ) );
return new BsonDouble( Double.parseDouble( value ) );
} else if ( isBoolean( value ) ) {
return new BsonBoolean( Boolean.parseBoolean( value ) );
} else {
Expand All @@ -125,7 +126,7 @@ public boolean isDouble( String value ) {
}


protected boolean isInteger( String value ) {
public boolean isInteger( String value ) {
try {
int intNumber = Integer.parseInt( value );
double doubleNumber = Double.parseDouble( value );
Expand All @@ -141,11 +142,9 @@ public boolean isBoolean( String value ) {
}



List<List<Object>> executeAndTransformPolyAlg( AlgRoot algRoot, Statement statement, final Context ctx ) {

try {
// Prepare
PolyImplementation result = statement.getQueryProcessor().prepareQuery( algRoot, false );
log.debug( "AlgRoot was prepared." );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,11 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl;
import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImplBuilder;
import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImplBuilder.Default;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -103,42 +97,52 @@ public void saveQueryEmptyStringTest() {

@Test
public void saveSimpleQueryTest() {
changedSettings.replace( "filterQuery", "topic1:{key1:value1}" );
changedSettings.replace( "filterQuery", "topic1:{\"key1\":\"value1\"}" );
client.updateSettings( changedSettings );
Map<String, String> expected = new HashMap<>( 1 );
expected.put( "topic1", "{key1:value1}" );
expected.put( "topic1", "{\"key1\":\"value1\"}" );
assertEquals( expected, client.getFilterMap() );
}


@Test
public void saveQueryToExistingTopicTest() {
changedSettings.replace( "filterQuery", "topic1:{\"key1\":\"value2\"}" );
client.updateSettings( changedSettings );
Map<String, String> expected = new HashMap<>( 1 );
expected.put( "topic1", "{\"key1\":\"value2\"}" );
assertEquals( expected, client.getFilterMap() );
}


@Test
public void saveQueryWithArrayTest() {
changedSettings.replace( "filterQuery", "topic1:{key1:[1, 2, 3]}" );
changedSettings.replace( "filterQuery", "topic1:{\"key1\":[1, 2, 3]}" );
client.updateSettings( changedSettings );
Map<String, String> expected = new HashMap<>( 1 );
expected.put( "topic1", "{key1:[1, 2, 3]}" );
expected.put( "topic1", "{\"key1\":[1, 2, 3]}" );
assertEquals( expected, client.getFilterMap() );
}


@Test
public void saveTwoSimpleQueryTest() {
changedSettings.replace( "filterQuery", "topic1:{key1:value1}, topic2:{key2:value2}" );
changedSettings.replace( "filterQuery", "topic1:{\"key1\":\"value1\"}, topic2:{\"key2\":\"value2\"}" );
client.updateSettings( changedSettings );
Map<String, String> expected = new HashMap<>( 2 );
expected.put( "topic1", "{key1:value1}" );
expected.put( "topic2", "{key2:value2}" );
expected.put( "topic1", "{\"key1\":\"value1\"}" );
expected.put( "topic2", "{\"key2\":\"value2\"}" );
assertEquals( expected, client.getFilterMap() );
}


@Test
public void saveNestedQueryTest() {
changedSettings.replace( "filterQuery", "topic1:{key1:{$lt:3}}, topic2:{$or:[key2:{$lt:3}, key2:{$gt:5}]}" );
changedSettings.replace( "filterQuery", "topic1:{\"key1\":{$lt:3}}, topic2:{$or:[\"key2\":{$lt:3}, \"key2\":{$gt:5}]}" );
client.updateSettings( changedSettings );
Map<String, String> expected = new HashMap<>( 2 );
expected.put( "topic1", "{key1:{$lt:3}}" );
expected.put( "topic2", "{$or:[key2:{$lt:3}, key2:{$gt:5}]}" );
expected.put( "topic1", "{\"key1\":{$lt:3}}" );
expected.put( "topic2", "{$or:[\"key2\":{$lt:3}, \"key2\":{$gt:5}]}" );
assertEquals( expected, client.getFilterMap() );
}

Expand Down Expand Up @@ -170,9 +174,26 @@ public void toListWithContentTest() {


@Test
public void reloadSettingsTopicTest() {
//TODO with broker
//TODO: with wildcards
public void addFirstMessageToQueueTest() {
ConcurrentLinkedQueue<String[]> msgQueueBefore = client.getMessageQueue();
assertEquals( 0, msgQueueBefore.size() );
client.addMessageToQueue( "topic1", "payload1" );
ConcurrentLinkedQueue<String[]> msgQueueAfter = client.getMessageQueue();
assertEquals( 1, msgQueueAfter.size() );
String[] expected = { "topic1", "payload1" };
assertEquals( expected, msgQueueAfter.poll() );
}


@Test
public void addTwentyOneMessagesToQueueTest() {
for ( int i = 0; i < 22; i++ ) {
client.addMessageToQueue( "topic1", String.valueOf( i ) );
}
ConcurrentLinkedQueue<String[]> msgQueueAfter = client.getMessageQueue();
assertEquals( 20, msgQueueAfter.size() );
String[] expected = { "topic1", String.valueOf( 2 ) };
assertEquals( expected, msgQueueAfter.poll() );
}


Expand Down Expand Up @@ -265,16 +286,4 @@ public void reloadSettingsCommonCollectionAndCommonCollectionNameTest2() {
}


@Test
public void processMsgTest() {
MqttUserPropertiesImplBuilder.Default defaultProperties = new Default();
Mqtt5Publish message = new MqttPublish( MqttTopicImpl.of( "topic1" ), ByteBuffer.wrap( "payload".getBytes() ), MqttQos.AT_LEAST_ONCE, false, 10, null, null, null, null, defaultProperties.build(), null );
client.processMsg( message );
//TODO: was prüfe ich hier????
// ob die zwei Maps richtig gestzt wurden.
String[] messageInQueue = client.getMessageQueue().peek();
assertEquals( "topic1", messageInQueue[0] );
assertEquals( "payload", messageInQueue[1] );
}

}
Loading

0 comments on commit c359ddf

Please sign in to comment.