diff --git a/modules/siddhi-extensions/input-transports/tcp-input-transport/src/test/java/org/wso2/siddhi/extension/input/transport/tcp/TCPInputTransportTestCase.java b/modules/siddhi-extensions/input-transports/tcp-input-transport/src/test/java/org/wso2/siddhi/extension/input/transport/tcp/TCPInputTransportTestCase.java index a818ce5ee3..b988b7e29c 100644 --- a/modules/siddhi-extensions/input-transports/tcp-input-transport/src/test/java/org/wso2/siddhi/extension/input/transport/tcp/TCPInputTransportTestCase.java +++ b/modules/siddhi-extensions/input-transports/tcp-input-transport/src/test/java/org/wso2/siddhi/extension/input/transport/tcp/TCPInputTransportTestCase.java @@ -21,8 +21,10 @@ import junit.framework.Assert; import org.apache.log4j.Logger; import org.junit.Before; +import org.junit.FixMethodOrder; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.MethodSorters; import org.wso2.siddhi.core.ExecutionPlanRuntime; import org.wso2.siddhi.core.SiddhiManager; import org.wso2.siddhi.core.event.Event; @@ -37,6 +39,7 @@ import java.util.Collection; import java.util.List; +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class TCPInputTransportTestCase { static final Logger log = Logger.getLogger(TCPInputTransportTestCase.class); private volatile int count; @@ -54,13 +57,13 @@ public void testTcpInputTransport1() throws InterruptedException { SiddhiManager siddhiManager = new SiddhiManager(); String inStreamDefinition = "" + - "@plan:name('foo')" + - "@source(type='tcp', @map(type='passThrough'))" + - "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; + "@plan:name('foo')" + + "@source(type='tcp', @map(type='passThrough'))" + + "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; String query = ("@info(name = 'query1') " + - "from inputStream " + - "select * " + - "insert into outputStream;"); + "from inputStream " + + "select * " + + "insert into outputStream;"); ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query); executionPlanRuntime.addCallback("query1", new QueryCallback() { @@ -115,13 +118,13 @@ public void testTcpInputTransport2() throws InterruptedException { SiddhiManager siddhiManager = new SiddhiManager(); String inStreamDefinition = "" + - "@plan:name('foo')" + - "@source(type='tcp', context='bar', @map(type='passThrough'))" + - "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; + "@plan:name('foo')" + + "@source(type='tcp', context='bar', @map(type='passThrough'))" + + "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; String query = ("@info(name = 'query1') " + - "from inputStream " + - "select * " + - "insert into outputStream;"); + "from inputStream " + + "select * " + + "insert into outputStream;"); ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query); executionPlanRuntime.addCallback("query1", new QueryCallback() { @@ -166,7 +169,6 @@ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { Assert.assertEquals(3, count); Assert.assertTrue(eventArrived); executionPlanRuntime.shutdown(); - } @Test @@ -175,13 +177,13 @@ public void testTcpInputTransport3() throws InterruptedException { SiddhiManager siddhiManager = new SiddhiManager(); String inStreamDefinition = "" + - "@plan:name('foo')" + - "@source(type='tcp', @map(type='passThrough'))" + - "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; + "@plan:name('foo')" + + "@source(type='tcp', @map(type='passThrough'))" + + "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; String query = ("@info(name = 'query1') " + - "from inputStream " + - "select * " + - "insert into outputStream;"); + "from inputStream " + + "select * " + + "insert into outputStream;"); ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query); executionPlanRuntime.addCallback("query1", new QueryCallback() { @@ -209,117 +211,6 @@ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { Assert.assertFalse(eventArrived); executionPlanRuntime.shutdown(); - - } - - @Test - public void testTcpInputTransportPauseAndResume() throws InterruptedException { - init(); - log.info("tcpInputTransport TestCase PauseAndResume"); - SiddhiManager siddhiManager = new SiddhiManager(); - - String inStreamDefinition = "" + - "@source(type='tcp', context='inputStream', @map(type='passThrough'))" + - "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; - String query = ("@info(name = 'query1') " + - "from inputStream " + - "select * " + - "insert into outputStream;"); - ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query); - Collection> inputTransports = executionPlanRuntime.getInputTransports(); - - executionPlanRuntime.addCallback("query1", new QueryCallback() { - @Override - public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { - EventPrinter.print(timeStamp, inEvents, removeEvents); - eventArrived = true; - for (Event event : inEvents) { - count++; - switch (count) { - case 1: - Assert.assertEquals("test", event.getData(0)); - break; - case 2: - Assert.assertEquals("test1", event.getData(0)); - break; - case 3: - Assert.assertEquals("test2", event.getData(0)); - break; - default: - } - } - } - }); - - executionPlanRuntime.start(); - - TCPNettyClient tcpNettyClient = new TCPNettyClient(); - tcpNettyClient.connect("localhost", 9892); - ArrayList arrayList = new ArrayList(3); - - arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test", 36, 3.0f, 380l, 23.0, true})); - arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test1", 361, 31.0f, 3801l, 231.0, false})); - arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test2", 362, 32.0f, 3802l, 232.0, true})); - tcpNettyClient.send("inputStream", arrayList.toArray(new Event[3])); - - TCPNettyClient tcpNettyClient2 = new TCPNettyClient(); - tcpNettyClient2.connect("localhost", 9892); - ArrayList arrayList2 = new ArrayList(1); - - arrayList2.add(new Event(System.currentTimeMillis(), new Object[]{"test2", 36, 3.0f, 380l, 23.0, true})); - Thread.sleep(1000); - tcpNettyClient2.send("inputStream", arrayList2.toArray(new Event[1])); - - - Thread.sleep(1000); - Assert.assertTrue(eventArrived); - Assert.assertEquals(4, count); - count = 0; - eventArrived = false; - - // pause - inputTransports.forEach(e -> e.forEach(InputTransport::pause)); - Thread.sleep(1000); - // send few events - arrayList.clear(); - arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test", 36, 3.0f, 380l, 23.0, true})); - arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test1", 361, 31.0f, 3801l, 231.0, false})); - tcpNettyClient.send("inputStream", arrayList.toArray(new Event[2])); - Thread.sleep(100); - tcpNettyClient2.send("inputStream", arrayList2.toArray(new Event[1])); - - Thread.sleep(1000); - Assert.assertFalse(eventArrived); - - // resume - inputTransports.forEach(e -> e.forEach(InputTransport::resume)); - // send few more events - arrayList.clear(); - arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test2", 36, 3.0f, 380l, 23.0, true})); - arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test3", 361, 31.0f, 3801l, 231.0, false})); - tcpNettyClient.send("inputStream", arrayList.toArray(new Event[2])); - Thread.sleep(1000); - // once resumed, we should be able to access the data sent while the transport is paused - Assert.assertEquals(5, count); - Assert.assertTrue(eventArrived); - - count = 0; - - // send few more events - arrayList.clear(); - arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test", 36, 3.0f, 380l, 23.0, true})); - arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test1", 361, 31.0f, 3801l, 231.0, false})); - tcpNettyClient.send("inputStream", arrayList.toArray(new Event[2])); - - Thread.sleep(1000); - Assert.assertEquals(2, count); - - tcpNettyClient.disconnect(); - tcpNettyClient2.disconnect(); - tcpNettyClient.shutdown(); - tcpNettyClient2.shutdown(); - Thread.sleep(300); - executionPlanRuntime.shutdown(); } @Test @@ -330,15 +221,15 @@ public void testTcpInputTransport4() throws InterruptedException { SiddhiManager siddhiManager = new SiddhiManager(); String inStreamDefinition = "" + - "@plan:name('foo')" + - "@source(type='tcp', context='bar', @map(type='passThrough')) " + - "define stream inputStream (a string, b int, c float, d long, e double, f bool); " + - "@source(type='tcp', context='bar', @map(type='passThrough')) " + - "define stream inputStream2 (a string, b int, c float, d long, e double, f bool); "; + "@plan:name('foo')" + + "@source(type='tcp', context='bar', @map(type='passThrough')) " + + "define stream inputStream (a string, b int, c float, d long, e double, f bool); " + + "@source(type='tcp', context='bar', @map(type='passThrough')) " + + "define stream inputStream2 (a string, b int, c float, d long, e double, f bool); "; String query = ("@info(name = 'query1') " + - "from inputStream " + - "select * " + - "insert into outputStream;"); + "from inputStream " + + "select * " + + "insert into outputStream;"); executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query); executionPlanRuntime.start(); } catch (ExecutionPlanCreationException e) { @@ -356,13 +247,13 @@ public void testTcpInputTransport5() throws InterruptedException { SiddhiManager siddhiManager = new SiddhiManager(); String inStreamDefinition = "" + - "@plan:name('foo')" + - "@source(type='tcp')" + - "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; + "@plan:name('foo')" + + "@source(type='tcp')" + + "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; String query = ("@info(name = 'query1') " + - "from inputStream " + - "select * " + - "insert into outputStream;"); + "from inputStream " + + "select * " + + "insert into outputStream;"); ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query); executionPlanRuntime.addCallback("query1", new QueryCallback() { @@ -441,21 +332,21 @@ public void testTcpInputTransport7() throws InterruptedException { SiddhiManager siddhiManager = new SiddhiManager(); String inStreamDefinition = "" + - "@plan:name('foo')" + - "@source(type='tcp', context='bar', @map(type='passThrough'))" + - "define stream inputStream (a string, b int, c float, d long, e double, f bool);" + - "@source(type='tcp', context='bar1', @map(type='passThrough'))" + - "define stream inputStream1 (a string, b int, c float, d long, e double, f bool);" + - ""; + "@plan:name('foo')" + + "@source(type='tcp', context='bar', @map(type='passThrough'))" + + "define stream inputStream (a string, b int, c float, d long, e double, f bool);" + + "@source(type='tcp', context='bar1', @map(type='passThrough'))" + + "define stream inputStream1 (a string, b int, c float, d long, e double, f bool);" + + ""; String query = ("@info(name = 'query1') " + - "from inputStream " + - "select * " + - "insert into outputStream;" + - "" + - "from inputStream1 " + - "select * " + - "insert into outputStream;" + - ""); + "from inputStream " + + "select * " + + "insert into outputStream;" + + "" + + "from inputStream1 " + + "select * " + + "insert into outputStream;" + + ""); ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query); executionPlanRuntime.addCallback("outputStream", new StreamCallback() { @@ -524,14 +415,14 @@ public void testTcpInputTransport8() throws InterruptedException { SiddhiManager siddhiManager = new SiddhiManager(); String inStreamDefinition = "" + - "@plan:name('foo')" + - "@source(type='tcp')" + - "@source(type='tcp')" + - "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; + "@plan:name('foo')" + + "@source(type='tcp')" + + "@source(type='tcp')" + + "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; String query = ("@info(name = 'query1') " + - "from inputStream " + - "select * " + - "insert into outputStream;"); + "from inputStream " + + "select * " + + "insert into outputStream;"); executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query); executionPlanRuntime.start(); @@ -542,4 +433,116 @@ public void testTcpInputTransport8() throws InterruptedException { } } + @Ignore + @Test + public void testTcpInputTransportPauseAndResume() throws InterruptedException { + init(); + log.info("tcpInputTransport TestCase PauseAndResume"); + SiddhiManager siddhiManager = new SiddhiManager(); + + String inStreamDefinition = "" + + "@source(type='tcp', context='inputStream', @map(type='passThrough'))" + + "define stream inputStream (a string, b int, c float, d long, e double, f bool);"; + String query = ("@info(name = 'query1') " + + "from inputStream " + + "select * " + + "insert into outputStream;"); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query); + Collection> inputTransports = executionPlanRuntime.getInputTransports(); + + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + eventArrived = true; + for (Event event : inEvents) { + count++; + switch (count) { + case 1: + Assert.assertEquals("test", event.getData(0)); + break; + case 2: + Assert.assertEquals("test1", event.getData(0)); + break; + case 3: + Assert.assertEquals("test2", event.getData(0)); + break; + default: + } + } + } + }); + + executionPlanRuntime.start(); + + TCPNettyClient tcpNettyClient = new TCPNettyClient(); + tcpNettyClient.connect("localhost", 9892); + ArrayList arrayList = new ArrayList(3); + + arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test", 36, 3.0f, 380l, 23.0, true})); + arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test1", 361, 31.0f, 3801l, 231.0, false})); + arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test2", 362, 32.0f, 3802l, 232.0, true})); + tcpNettyClient.send("inputStream", arrayList.toArray(new Event[3])); + + TCPNettyClient tcpNettyClient2 = new TCPNettyClient(); + tcpNettyClient2.connect("localhost", 9892); + ArrayList arrayList2 = new ArrayList(1); + + arrayList2.add(new Event(System.currentTimeMillis(), new Object[]{"test2", 36, 3.0f, 380l, 23.0, true})); + Thread.sleep(1000); + tcpNettyClient2.send("inputStream", arrayList2.toArray(new Event[1])); + + + Thread.sleep(1000); + Assert.assertTrue(eventArrived); + Assert.assertEquals(4, count); + count = 0; + eventArrived = false; + + // pause + inputTransports.forEach(e -> e.forEach(InputTransport::pause)); + Thread.sleep(1000); + // send few events + arrayList.clear(); + arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test", 36, 3.0f, 380l, 23.0, true})); + arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test1", 361, 31.0f, 3801l, 231.0, false})); + tcpNettyClient.send("inputStream", arrayList.toArray(new Event[2])); + Thread.sleep(100); + tcpNettyClient2.send("inputStream", arrayList2.toArray(new Event[1])); + + Thread.sleep(1000); + Assert.assertFalse(eventArrived); + + // resume + inputTransports.forEach(e -> e.forEach(InputTransport::resume)); + // send few more events + arrayList.clear(); + arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test2", 36, 3.0f, 380l, 23.0, true})); + arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test3", 361, 31.0f, 3801l, 231.0, false})); + tcpNettyClient.send("inputStream", arrayList.toArray(new Event[2])); + Thread.sleep(1000); + // once resumed, we should be able to access the data sent while the transport is paused + Assert.assertEquals(5, count); + Assert.assertTrue(eventArrived); + + count = 0; + + // send few more events + arrayList.clear(); + arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test", 36, 3.0f, 380l, 23.0, true})); + arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test1", 361, 31.0f, 3801l, 231.0, false})); + tcpNettyClient.send("inputStream", arrayList.toArray(new Event[2])); + + Thread.sleep(1000); + Assert.assertEquals(2, count); + + tcpNettyClient.disconnect(); + tcpNettyClient2.disconnect(); + tcpNettyClient.shutdown(); + tcpNettyClient2.shutdown(); + Thread.sleep(300); + executionPlanRuntime.shutdown(); + } + + } diff --git a/pom.xml b/pom.xml index a49d47a279..ab9997e541 100644 --- a/pom.xml +++ b/pom.xml @@ -485,7 +485,7 @@ 4.0.0-SNAPSHOT - 4.10 + 4.12 1.2.17.wso2v1 [1.2.17, 1.3.0) 1.7.12