-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added implementation for simple thread safe async event emitter
- Loading branch information
Showing
5 changed files
with
219 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package com.ably.chat | ||
|
||
import io.ably.lib.util.Log.ERROR | ||
import io.ably.lib.util.Log.LogHandler | ||
import java.util.LinkedList | ||
import java.util.concurrent.CopyOnWriteArrayList | ||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.launch | ||
|
||
/** | ||
* Emitter interface for supplied value | ||
* Ideally, class implementation should work for both kotlin and java | ||
*/ | ||
interface Emitter<V> { | ||
fun emit(value: V) | ||
fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription | ||
fun offAll() | ||
} | ||
|
||
/** | ||
* AsyncEmitter is thread safe, highly performant async emitter implementation for kotlin. | ||
* Currently, use-case is limited to handle internal events. | ||
* This can be modified in the future to handle external listeners, events etc | ||
*/ | ||
class AsyncEmitter<V> (private val collectorScope: CoroutineScope = CoroutineScope(Dispatchers.Default)) : Emitter<V> { | ||
|
||
// Read https://www.codejava.net/java-core/concurrency/java-concurrent-collection-copyonwritearraylist-examples | ||
// For more information on why it's good to have this list | ||
private val subscribers = CopyOnWriteArrayList<AsyncSubscriber<V>>() | ||
|
||
override fun emit(value: V) { | ||
for (subscriber in subscribers) { | ||
subscriber.notifyAsync(value) | ||
} | ||
} | ||
|
||
override fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription { | ||
val subscriber = AsyncSubscriber(collectorScope, block) | ||
subscribers.addIfAbsent(subscriber) | ||
return Subscription { | ||
subscribers.remove(subscriber) | ||
} | ||
} | ||
|
||
override fun offAll() { | ||
subscribers.clear() | ||
} | ||
} | ||
|
||
private class AsyncSubscriber<V>( | ||
private val scope: CoroutineScope, | ||
private val subscriberBlock: (suspend CoroutineScope.(V) -> Unit), | ||
private val logger: LogHandler? = null, | ||
) { | ||
private var isSubscriberRunning = false | ||
private val values = LinkedList<V>() | ||
|
||
fun notifyAsync(value: V) { | ||
sequentialScope.launch { | ||
values.add(value) | ||
if (!isSubscriberRunning) { | ||
isSubscriberRunning = true | ||
while (values.isNotEmpty()) { | ||
val valueTobeEmitted = values.poll() | ||
try { | ||
// Process values sequentially, similar to blocking eventEmitter | ||
scope.launch { subscriberBlock(valueTobeEmitted as V) }.join() | ||
} catch (t: Throwable) { | ||
// TODO - replace with more verbose logging | ||
logger?.println(ERROR, "AsyncSubscriber", "Error processing value $valueTobeEmitted", t) | ||
} | ||
} | ||
isSubscriberRunning = false | ||
} | ||
} | ||
} | ||
|
||
override fun equals(other: Any?): Boolean { | ||
if (other is AsyncSubscriber<*>) { | ||
// Avoid registering duplicate anonymous subscriber block with same instance id | ||
// Common scenario when Android activity is refreshed or some app components refresh | ||
return this.subscriberBlock.hashCode() == other.subscriberBlock.hashCode() | ||
} | ||
return super.equals(other) | ||
} | ||
|
||
companion object { | ||
val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1)) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package com.ably.chat; | ||
|
||
import org.junit.Assert; | ||
import org.junit.Test; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
|
||
public class EmitterTest { | ||
|
||
@Test | ||
public void testEmitter() { | ||
AsyncEmitter<String> asyncEmitter = new AsyncEmitter<>(); | ||
ArrayList<String> receivedValues = new ArrayList<>(); | ||
|
||
asyncEmitter.emit("1"); | ||
|
||
Subscription subscription = asyncEmitter.on((coroutineScope, s, continuation) -> { | ||
receivedValues.add(s); | ||
return null; | ||
}); | ||
|
||
asyncEmitter.emit("2"); | ||
asyncEmitter.emit("3"); | ||
asyncEmitter.emit("4"); | ||
|
||
subscription.unsubscribe(); | ||
|
||
asyncEmitter.emit("5"); | ||
asyncEmitter.emit("6"); | ||
|
||
Exception conditionError = new TestUtils.ConditionalWaiter(). | ||
wait(() -> receivedValues.size() == 3, 5000); | ||
Assert.assertNull(conditionError); | ||
|
||
Assert.assertEquals(Arrays.asList("2", "3", "4"), receivedValues); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package com.ably.chat | ||
|
||
import kotlinx.coroutines.test.runTest | ||
import org.junit.Assert | ||
import org.junit.Test | ||
|
||
class AsyncEmitterTest { | ||
|
||
@Test | ||
fun `should be able to emit and listen to event`() = runTest { | ||
val asyncEmitter = AsyncEmitter<String>() | ||
val receivedValues = mutableListOf<String>() | ||
|
||
asyncEmitter.emit("1") | ||
|
||
val subscription = asyncEmitter.on { received: String -> | ||
receivedValues.add(received) | ||
} | ||
|
||
asyncEmitter.emit("2") | ||
asyncEmitter.emit("3") | ||
asyncEmitter.emit("4") | ||
|
||
subscription.unsubscribe() | ||
|
||
asyncEmitter.emit("5") | ||
asyncEmitter.emit("6") | ||
|
||
assertWaiter { receivedValues.size == 3 }.join() | ||
|
||
Assert.assertEquals(listOf("2", "3", "4"), receivedValues) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package com.ably.chat; | ||
|
||
import java.util.Timer; | ||
import java.util.TimerTask; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
public class TestUtils { | ||
|
||
public interface ConditionFn<O> { | ||
O call(); | ||
} | ||
|
||
public static class ConditionalWaiter { | ||
public Exception wait(ConditionFn<Boolean> condition, int timeoutInMs) { | ||
AtomicBoolean taskTimedOut = new AtomicBoolean(); | ||
new Timer().schedule(new TimerTask() { | ||
@Override | ||
public void run() { | ||
taskTimedOut.set(true); | ||
} | ||
}, timeoutInMs); | ||
while (true) { | ||
try { | ||
Boolean result = condition.call(); | ||
if (result) { | ||
return null; | ||
} | ||
if (taskTimedOut.get()) { | ||
throw new Exception("Timed out after " + timeoutInMs + "ms waiting for condition"); | ||
} | ||
Thread.sleep(200); | ||
} catch (Exception e) { | ||
return e; | ||
} | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters