Skip to content

Commit

Permalink
Merge pull request #1605 from pcnfernando/master
Browse files Browse the repository at this point in the history
Add latch mechanism to pause/resume in inmemory source
  • Loading branch information
pcnfernando authored Jan 4, 2020
2 parents a47397e + 9063380 commit 0c3cfcd
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import io.siddhi.core.util.transport.OptionHolder;
import org.apache.log4j.Logger;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* Implementation of {@link Source} to receive events through in-memory transport.
*/
Expand Down Expand Up @@ -62,6 +65,9 @@ public class InMemorySource extends Source {
private static final String TOPIC_KEY = "topic";
private SourceEventListener sourceEventListener;
private InMemoryBroker.Subscriber subscriber;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
private volatile boolean paused = false;

@Override
protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
Expand All @@ -77,6 +83,18 @@ public StateFactory<State> init(SourceEventListener sourceEventListener, OptionH
this.subscriber = new InMemoryBroker.Subscriber() {
@Override
public void onMessage(Object event) {
if (paused) {
pauseLock.lock();
try {
while (paused) {
unpaused.await();
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} finally {
pauseLock.unlock();
}
}
sourceEventListener.onEvent(event, null);
}

Expand Down Expand Up @@ -110,12 +128,17 @@ public void destroy() {

@Override
public void pause() {
InMemoryBroker.unsubscribe(subscriber);
paused = true;
}

@Override
public void resume() {
InMemoryBroker.subscribe(subscriber);
paused = false;
try {
pauseLock.lock();
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void receive(Event[] events) {
count++;
if (count > 1) {
float triggerTimeDiff = timestamp / 1000 - lastTimeStamp / 1000;
AssertJUnit.assertTrue(1.0f == triggerTimeDiff);
AssertJUnit.assertTrue(1 == Math.round(triggerTimeDiff));
}
lastTimeStamp = timestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.time.LocalDate;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class InMemoryTransportTestCase {
Expand Down Expand Up @@ -1564,4 +1565,60 @@ public void receive(Event[] events) {

}

@Test(dependsOnMethods = {"inMemoryTestCase21"})
public void inMemoryTestCase22() throws InterruptedException {
log.info("Test inMemoryTestCase22");
SiddhiManager siddhiManager = new SiddhiManager();

String publisherApp = "" +
"define stream CheckStockStream (symbol1 string, totalPrice double); " +
"@sink(type='inMemory', topic='OutputStream', @map(type='passThrough')) " +
"define stream OutputStream (symbol1 string, totalPrice double); " +
"" +
"from CheckStockStream " +
"select * " +
"insert into OutputStream; ";

String consumerApp = "" +
"@source(type='inMemory', topic='OutputStream', @map(type='passThrough')) " +
"define stream InputStream (symbol1 string, totalPrice double); ";

SiddhiAppRuntime publisherRuntime = siddhiManager.createSiddhiAppRuntime(publisherApp);
SiddhiAppRuntime consumerRuntime = siddhiManager.createSiddhiAppRuntime(consumerApp);

consumerRuntime.addCallback("InputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
wso2Count.incrementAndGet();
}
});
InputHandler stockStream = publisherRuntime.getInputHandler("CheckStockStream");

publisherRuntime.start();
consumerRuntime.start();

stockStream.send(new Object[]{"WSO2", 50.0f});
stockStream.send(new Object[]{"WSO2", 70.0f});
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
consumerRuntime.getSources().iterator().next().get(0).pause();
}
});
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
stockStream.send(new Object[]{"WSO2", 90f});
} catch (InterruptedException ignored) {
}
}
});
Thread.sleep(2000);
consumerRuntime.getSources().iterator().next().get(0).resume();
Thread.sleep(2000);
Assert.assertEquals(wso2Count.get(), 3);
siddhiManager.shutdown();
}
}

0 comments on commit 0c3cfcd

Please sign in to comment.