From d8dcda67e5baef7d4db32dbe4b5a606ba395d497 Mon Sep 17 00:00:00 2001 From: Chiran Date: Thu, 2 Jan 2020 15:17:04 +0530 Subject: [PATCH 1/3] Use locking mechanism during pause --- .../stream/input/source/InMemorySource.java | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/InMemorySource.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/InMemorySource.java index 6d042488d3..08e4b26638 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/InMemorySource.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/InMemorySource.java @@ -33,6 +33,9 @@ import io.siddhi.core.util.transport.OptionHolder; import org.apache.log4j.Logger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + /** * Implementation of {@link Source} to receive events through in-memory transport. */ @@ -62,6 +65,9 @@ public class InMemorySource extends Source { private static final String TOPIC_KEY = "topic"; private SourceEventListener sourceEventListener; private InMemoryBroker.Subscriber subscriber; + private ReentrantLock lock = new ReentrantLock(); + private Condition condition = lock.newCondition(); + private volatile boolean paused; @Override protected ServiceDeploymentInfo exposeServiceDeploymentInfo() { @@ -77,6 +83,18 @@ public StateFactory init(SourceEventListener sourceEventListener, OptionH this.subscriber = new InMemoryBroker.Subscriber() { @Override public void onMessage(Object event) { + if (paused) { + lock.lock(); + try { + while (paused) { + condition.await(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + lock.unlock(); + } + } sourceEventListener.onEvent(event, null); } @@ -110,12 +128,17 @@ public void destroy() { @Override public void pause() { - InMemoryBroker.unsubscribe(subscriber); + paused = true; } @Override public void resume() { - InMemoryBroker.subscribe(subscriber); + lock.lock(); + try { + paused = false; + condition.signalAll(); + } finally { + lock.unlock(); + } } - } From fe5169ba857e3046796cd4a18045bcfefa93a91a Mon Sep 17 00:00:00 2001 From: Chiran Date: Thu, 2 Jan 2020 20:26:43 +0530 Subject: [PATCH 2/3] Add test to consume from inmemory source while pause & resume --- .../stream/input/source/InMemorySource.java | 20 +++---- .../transport/InMemoryTransportTestCase.java | 57 +++++++++++++++++++ 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/InMemorySource.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/InMemorySource.java index 08e4b26638..fe97df3c7c 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/InMemorySource.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/InMemorySource.java @@ -65,9 +65,9 @@ public class InMemorySource extends Source { private static final String TOPIC_KEY = "topic"; private SourceEventListener sourceEventListener; private InMemoryBroker.Subscriber subscriber; - private ReentrantLock lock = new ReentrantLock(); - private Condition condition = lock.newCondition(); - private volatile boolean paused; + private ReentrantLock pauseLock = new ReentrantLock(); + private Condition unpaused = pauseLock.newCondition(); + private volatile boolean paused = false; @Override protected ServiceDeploymentInfo exposeServiceDeploymentInfo() { @@ -84,15 +84,15 @@ public StateFactory init(SourceEventListener sourceEventListener, OptionH @Override public void onMessage(Object event) { if (paused) { - lock.lock(); + pauseLock.lock(); try { while (paused) { - condition.await(); + unpaused.await(); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } finally { - lock.unlock(); + pauseLock.unlock(); } } sourceEventListener.onEvent(event, null); @@ -133,12 +133,12 @@ public void pause() { @Override public void resume() { - lock.lock(); + paused = false; try { - paused = false; - condition.signalAll(); + pauseLock.lock(); + unpaused.signalAll(); } finally { - lock.unlock(); + pauseLock.unlock(); } } } diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/transport/InMemoryTransportTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/transport/InMemoryTransportTestCase.java index e47d8285fb..0ff7666dc9 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/transport/InMemoryTransportTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/transport/InMemoryTransportTestCase.java @@ -39,6 +39,7 @@ import java.time.LocalDate; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; public class InMemoryTransportTestCase { @@ -1564,4 +1565,60 @@ public void receive(Event[] events) { } + @Test(dependsOnMethods = {"inMemoryTestCase21"}) + public void inMemoryTestCase22() throws InterruptedException { + log.info("Test inMemoryTestCase22"); + SiddhiManager siddhiManager = new SiddhiManager(); + + String publisherApp = "" + + "define stream CheckStockStream (symbol1 string, totalPrice double); " + + "@sink(type='inMemory', topic='OutputStream', @map(type='passThrough')) " + + "define stream OutputStream (symbol1 string, totalPrice double); " + + "" + + "from CheckStockStream " + + "select * " + + "insert into OutputStream; "; + + String consumerApp = "" + + "@source(type='inMemory', topic='OutputStream', @map(type='passThrough')) " + + "define stream InputStream (symbol1 string, totalPrice double); "; + + SiddhiAppRuntime publisherRuntime = siddhiManager.createSiddhiAppRuntime(publisherApp); + SiddhiAppRuntime consumerRuntime = siddhiManager.createSiddhiAppRuntime(consumerApp); + + consumerRuntime.addCallback("InputStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + wso2Count.incrementAndGet(); + } + }); + InputHandler stockStream = publisherRuntime.getInputHandler("CheckStockStream"); + + publisherRuntime.start(); + consumerRuntime.start(); + + stockStream.send(new Object[]{"WSO2", 50.0f}); + stockStream.send(new Object[]{"WSO2", 70.0f}); + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + consumerRuntime.getSources().iterator().next().get(0).pause(); + } + }); + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + try { + stockStream.send(new Object[]{"WSO2", 90f}); + } catch (InterruptedException ignored) { + } + } + }); + Thread.sleep(2000); + consumerRuntime.getSources().iterator().next().get(0).resume(); + Thread.sleep(2000); + Assert.assertEquals(wso2Count.get(), 3); + siddhiManager.shutdown(); + } } From 90633807a4960fb68cc34e6aab88ef0512aca98f Mon Sep 17 00:00:00 2001 From: Chiran Date: Fri, 3 Jan 2020 20:08:04 +0530 Subject: [PATCH 3/3] Fix trigger test case to round-off --- .../test/java/io/siddhi/core/query/trigger/TriggerTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/query/trigger/TriggerTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/query/trigger/TriggerTestCase.java index eb41d1b560..b7e68148fd 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/query/trigger/TriggerTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/query/trigger/TriggerTestCase.java @@ -194,7 +194,7 @@ public void receive(Event[] events) { count++; if (count > 1) { float triggerTimeDiff = timestamp / 1000 - lastTimeStamp / 1000; - AssertJUnit.assertTrue(1.0f == triggerTimeDiff); + AssertJUnit.assertTrue(1 == Math.round(triggerTimeDiff)); } lastTimeStamp = timestamp; }