diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/LogStreamProcessor.java b/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/LogStreamProcessor.java index 83cc4608dc..b1a9fb2ec4 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/LogStreamProcessor.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/LogStreamProcessor.java @@ -24,7 +24,6 @@ import io.siddhi.annotation.ParameterOverload; import io.siddhi.annotation.util.DataType; import io.siddhi.core.config.SiddhiQueryContext; -import io.siddhi.core.event.ComplexEvent; import io.siddhi.core.event.ComplexEventChunk; import io.siddhi.core.event.stream.MetaStreamEvent; import io.siddhi.core.event.stream.StreamEvent; @@ -221,52 +220,54 @@ protected void process(ComplexEventChunk streamEventChunk, Processo StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, State state) { while (streamEventChunk.hasNext()) { - ComplexEvent complexEvent = streamEventChunk.next(); + StreamEvent streamEvent = streamEventChunk.next(); switch (attributeExpressionLength) { case 0: - log.info(logPrefix + complexEvent); + log.info(logPrefix + streamEvent.toString(1)); break; case 1: if (isLogEventExpressionExecutor != null) { - if ((Boolean) isLogEventExpressionExecutor.execute(complexEvent)) { - log.info(logPrefix + complexEvent); + if ((Boolean) isLogEventExpressionExecutor.execute(streamEvent)) { + log.info(logPrefix + streamEvent.toString(1)); } else { log.info(logPrefix + "Event Arrived"); } } else { - log.info(logPrefix + logMessageExpressionExecutor.execute(complexEvent) + ", " + complexEvent); + log.info(logPrefix + logMessageExpressionExecutor.execute(streamEvent) + ", " + + streamEvent.toString(1)); } break; case 2: if (isLogEventExpressionExecutor != null) { - if ((Boolean) isLogEventExpressionExecutor.execute(complexEvent)) { - log.info(logPrefix + logMessageExpressionExecutor.execute(complexEvent) + ", " + - complexEvent); + if ((Boolean) isLogEventExpressionExecutor.execute(streamEvent)) { + log.info(logPrefix + logMessageExpressionExecutor.execute(streamEvent) + ", " + + streamEvent.toString(1)); } else { - log.info(logPrefix + logMessageExpressionExecutor.execute(complexEvent)); + log.info(logPrefix + logMessageExpressionExecutor.execute(streamEvent)); } } else { LogPriority tempLogPriority = logPriority; if (logPriorityExpressionExecutor != null) { - tempLogPriority = LogPriority.valueOf((String) logPriorityExpressionExecutor.execute - (complexEvent)); + tempLogPriority = LogPriority.valueOf((String) logPriorityExpressionExecutor. + execute(streamEvent)); } - String message = logPrefix + logMessageExpressionExecutor.execute(complexEvent) + ", " + - complexEvent; + String message = logPrefix + logMessageExpressionExecutor.execute(streamEvent) + ", " + + streamEvent.toString(1); logMessage(tempLogPriority, message); } break; default: String message; - if ((Boolean) isLogEventExpressionExecutor.execute(complexEvent)) { - message = logPrefix + logMessageExpressionExecutor.execute(complexEvent) + ", " + complexEvent; + if ((Boolean) isLogEventExpressionExecutor.execute(streamEvent)) { + message = logPrefix + logMessageExpressionExecutor.execute(streamEvent) + ", " + + streamEvent.toString(1); } else { - message = logPrefix + logMessageExpressionExecutor.execute(complexEvent); + message = logPrefix + logMessageExpressionExecutor.execute(streamEvent); } LogPriority tempLogPriority = logPriority; if (logPriorityExpressionExecutor != null) { - tempLogPriority = LogPriority.valueOf((String) logPriorityExpressionExecutor.execute - (complexEvent)); + tempLogPriority = LogPriority.valueOf((String) logPriorityExpressionExecutor. + execute(streamEvent)); } logMessage(tempLogPriority, message); } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputDistributor.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputDistributor.java index 71ee83a803..4da1f5cfeb 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputDistributor.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputDistributor.java @@ -53,4 +53,8 @@ public void addInputProcessor(InputProcessor inputProcessor) { inputProcessors.add(inputProcessor); } + + public void clear() { + inputProcessors.clear(); + } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputManager.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputManager.java index 0850c66996..d113fa3946 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputManager.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputManager.java @@ -73,6 +73,7 @@ public synchronized void disconnect() { for (InputHandler inputHandler : inputHandlerMap.values()) { inputHandler.disconnect(); } + inputDistributor.clear(); inputHandlerMap.clear(); this.isConnected = false; } diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/LogTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/LogTestCase.java index 5ff2a06cfb..202e082b1f 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/LogTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/LogTestCase.java @@ -88,7 +88,9 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { InputHandler inputHandler = siddhiAppRuntime.getInputHandler("StockStream"); siddhiAppRuntime.start(); - inputHandler.send(new Object[]{"IBM", 75.6f, 100}); + inputHandler.send(new Event[]{ + new Event(System.currentTimeMillis(), new Object[]{"IBM", 75.6f, 100}), + new Event(System.currentTimeMillis(), new Object[]{"GOOG", 70.6f, 100})}); Thread.sleep(100); siddhiAppRuntime.shutdown(); diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/managment/StartStopTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/StartStopTestCase.java new file mode 100644 index 0000000000..1c6fb93e74 --- /dev/null +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/managment/StartStopTestCase.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.core.managment; + +import io.siddhi.core.SiddhiAppRuntime; +import io.siddhi.core.SiddhiManager; +import io.siddhi.core.event.Event; +import io.siddhi.core.stream.input.InputHandler; +import io.siddhi.core.stream.output.StreamCallback; +import io.siddhi.core.util.EventPrinter; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.AssertJUnit; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +public class StartStopTestCase { + private static final Logger log = Logger.getLogger(StartStopTestCase.class); + private AtomicInteger count; + private boolean eventArrived; + + @BeforeMethod + public void init() { + count = new AtomicInteger(); + eventArrived = false; + } + + @Test(expectedExceptions = InterruptedException.class) + public void startStopTest1() throws InterruptedException { + log.info("startStop test 1"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String siddhiApp = "" + + "define stream cseEventStream (symbol string, price float, volume int);" + + "define stream cseEventStream2 (symbol string, price float, volume int);" + + "" + + "@info(name = 'query1') " + + "from cseEventStream " + + "select 1 as eventFrom " + + "insert into outputStream ;" + + "" + + "@info(name = 'query2') " + + "from cseEventStream2 " + + "select 2 as eventFrom " + + "insert into outputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp); + siddhiAppRuntime.addCallback("outputStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + } + + }); + + InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream2"); + siddhiAppRuntime.start(); + siddhiAppRuntime.shutdown(); + siddhiAppRuntime.start(); + inputHandler.send(new Object[]{"WSO2", 55.6f, 100}); + siddhiAppRuntime.shutdown(); + } + + @Test() + public void startStopTest2() throws InterruptedException { + log.info("startStop test 2"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String siddhiApp = "" + + "define stream cseEventStream (symbol string, price float, volume int);" + + "define stream cseEventStream2 (symbol string, price float, volume int);" + + "" + + "@info(name = 'query1') " + + "from cseEventStream " + + "select 1 as eventFrom " + + "insert into outputStream ;" + + "" + + "@info(name = 'query2') " + + "from cseEventStream2 " + + "select 2 as eventFrom " + + "insert into outputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp); + siddhiAppRuntime.addCallback("outputStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + Assert.assertEquals(events[0].getData(0), 1); + eventArrived = true; + count.incrementAndGet(); + } + + }); + + InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream2"); + inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream"); + siddhiAppRuntime.start(); + siddhiAppRuntime.shutdown(); + siddhiAppRuntime.start(); + inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream"); + inputHandler.send(new Object[]{"WSO2", 55.6f, 100}); + siddhiAppRuntime.shutdown(); + AssertJUnit.assertTrue(eventArrived); + AssertJUnit.assertEquals(1, count.get()); + } + +} diff --git a/modules/siddhi-core/src/test/resources/testng.xml b/modules/siddhi-core/src/test/resources/testng.xml index fba93888f8..cc5a24bb50 100644 --- a/modules/siddhi-core/src/test/resources/testng.xml +++ b/modules/siddhi-core/src/test/resources/testng.xml @@ -33,6 +33,7 @@ +