Skip to content

Commit

Permalink
Rename "safe" variants to "...orClosed" (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Apr 17, 2024
1 parent dd2fc55 commit c96fe9b
Show file tree
Hide file tree
Showing 16 changed files with 79 additions and 78 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ see [Project Loom](https://openjdk.org/projects/loom/) (although the `core` modu
Inspired by the "Fast and Scalable Channels in Kotlin Coroutines" [paper](https://arxiv.org/abs/2211.04986), and
the [Kotlin implementation](https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt).

JavaDocs can be browsed at [https://javadoc.io](https://www.javadoc.io/doc/com.softwaremill.jox/core/latest/com.softwaremill.jox/com/softwaremill/jox/package-summary.html).
JavaDocs can be browsed
at [https://javadoc.io](https://www.javadoc.io/doc/com.softwaremill.jox/core/latest/com.softwaremill.jox/com/softwaremill/jox/package-summary.html).

Articles:

Expand Down Expand Up @@ -113,8 +114,8 @@ Channels can be closed, either because the source is `done` with sending values,
the sink processes the received values.

`send()` and `receive()` will throw a `ChannelClosedException` when the channel is closed. Alternatively, you can
use the `sendSafe()` and `receiveSafe()` methods, which return either a `ChannelClosed` value (reason of closure),
or `null` / the received value.
use the `sendOrClosed()` and `receiveOrClosed()` methods, which return either a `ChannelClosed` value (reason of
closure), or `null` / the received value.

Channels can also be inspected whether they are closed, using the `isClosedForReceive()` and `isClosedForSend()`.

Expand All @@ -131,9 +132,9 @@ class Demo3 {
ch.done();

// prints: Received: 1
System.out.println("Received: " + ch.receiveSafe());
System.out.println("Received: " + ch.receiveOrClosed());
// prints: Received: ChannelDone[]
System.out.println("Received: " + ch.receiveSafe());
System.out.println("Received: " + ch.receiveOrClosed());
}
}
```
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* the reason for the closure.
* <p>
* In case the channel is closed, one of the {@link ChannelClosedException}s is thrown. Alternatively, you can call
* the less type-safe, but more exception-safe {@link Channel#sendSafe(Object)} and {@link Channel#receiveSafe()}
* the less type-safe, but more exception-safe {@link Channel#sendOrClosed(Object)} and {@link Channel#receiveOrClosed()}
* methods, which do not throw in case the channel is closed, but return one of the {@link ChannelClosed} values.
*
* @param <T> The type of the values processed by the channel.
Expand Down Expand Up @@ -210,14 +210,14 @@ public static <T> Channel<T> newUnlimitedChannel() {

@Override
public void send(T value) throws InterruptedException {
var r = sendSafe(value);
var r = sendOrClosed(value);
if (r instanceof ChannelClosed c) {
throw c.toException();
}
}

@Override
public Object sendSafe(T value) throws InterruptedException {
public Object sendOrClosed(T value) throws InterruptedException {
return doSend(value, null, null);
}

Expand Down Expand Up @@ -382,7 +382,7 @@ private Object updateCellSend(Segment segment, int i, long s, T value, SelectIns

@Override
public T receive() throws InterruptedException {
var r = receiveSafe();
var r = receiveOrClosed();
if (r instanceof ChannelClosed c) {
throw c.toException();
} else {
Expand All @@ -392,7 +392,7 @@ public T receive() throws InterruptedException {
}

@Override
public Object receiveSafe() throws InterruptedException {
public Object receiveOrClosed() throws InterruptedException {
return doReceive(null, null);
}

Expand Down Expand Up @@ -684,34 +684,34 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) {

@Override
public void done() {
var r = doneSafe();
var r = doneOrClosed();
if (r instanceof ChannelClosed c) {
throw c.toException();
}
}

@Override
public Object doneSafe() {
return closeSafe(new ChannelDone());
public Object doneOrClosed() {
return closeOrClosed(new ChannelDone());
}

@Override
public void error(Throwable reason) {
if (reason == null) {
throw new NullPointerException("Error reason cannot be null");
}
var r = errorSafe(reason);
var r = errorOrClosed(reason);
if (r instanceof ChannelClosed c) {
throw c.toException();
}
}

@Override
public Object errorSafe(Throwable reason) {
return closeSafe(new ChannelError(reason));
public Object errorOrClosed(Throwable reason) {
return closeOrClosed(new ChannelError(reason));
}

private Object closeSafe(ChannelClosed channelClosed) {
private Object closeOrClosed(ChannelClosed channelClosed) {
if (!CLOSED_REASON.compareAndSet(this, null, channelClosed)) {
return closedReason; // already closed
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/softwaremill/jox/ChannelClosed.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.softwaremill.jox;

/**
* Returned by {@link Channel#sendSafe(Object)} and {@link Channel#receiveSafe()} when the channel is closed.
* Returned by {@link Channel#sendOrClosed(Object)} and {@link Channel#receiveOrClosed()} when the channel is closed.
*/
public sealed interface ChannelClosed permits ChannelDone, ChannelError {
ChannelClosedException toException();
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/java/com/softwaremill/jox/CloseableChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
* <p>
* A channel can be closed in two ways:
* <ul>
* <li>using {@link #done()} or {@link #doneSafe()}, indicating that no more elements will be sent</li>
* <li>using {@link #error(Throwable)} or {@link #errorSafe(Throwable)}, indicating an error</li>
* <li>using {@link #done()} or {@link #doneOrClosed()}, indicating that no more elements will be sent</li>
* <li>using {@link #error(Throwable)} or {@link #errorOrClosed(Throwable)}, indicating an error</li>
* </ul>
* <p>
* A channel can be closed only once. Subsequent calls to {@link #done()} or {@link #error(Throwable)} will throw
* {@link ChannelClosedException}, or return the original closing reason (when using {@link #doneSafe()} or {@link #errorSafe(Throwable)}).
* {@link ChannelClosedException}, or return the original closing reason (when using {@link #doneOrClosed()} or {@link #errorOrClosed(Throwable)}).
* <p>
* Closing the channel is thread-safe.
*/
Expand Down Expand Up @@ -41,7 +41,7 @@ public interface CloseableChannel {
*
* @return Either {@code null}, or {@link ChannelClosed}, when the channel is already closed.
*/
Object doneSafe();
Object doneOrClosed();

//

Expand All @@ -68,13 +68,13 @@ public interface CloseableChannel {
*
* @return Either {@code null}, or {@link ChannelClosed}, when the channel is already closed.
*/
Object errorSafe(Throwable reason);
Object errorOrClosed(Throwable reason);

//

/**
* @return {@code true} if no more values can be sent to this channel; {@link Sink#send(Object)} will throw
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the safe variant).
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the or-closed variant).
* <p>
* When closed for send, receiving using {@link Source#receive()} might still be possible, if the channel is done,
* and not in an error. This can be verified using {@link #isClosedForReceive()}.
Expand All @@ -85,7 +85,7 @@ default boolean isClosedForSend() {

/**
* @return {@code true} if no more values can be received from this channel; {@link Source#receive()} will throw
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the safe variant).
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the or-closed variant).
* <p>
* When closed for receive, sending values is also not possible, {@link #isClosedForSend()} will return {@code true}.
* <p>
Expand All @@ -100,7 +100,7 @@ default boolean isClosedForReceive() {

/**
* @return Non-{@code null} if no more values can be sent to this channel; {@link Sink#send(Object)} will throw
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the safe variant).
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the or-closed variant).
* <p>
* {@code null} if the channel is not closed, and values can be sent.
* <p>
Expand All @@ -111,7 +111,7 @@ default boolean isClosedForReceive() {

/**
* @return Non-{@code null} if no more values can be received from this channel; {@link Source#receive()} will throw
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the safe variant).
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the or-closed variant).
* <p>
* {@code null} if the channel is not closed, and values can be received.
* <p>
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/java/com/softwaremill/jox/CollectSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public T receive() throws InterruptedException {
}

@Override
public Object receiveSafe() throws InterruptedException {
public Object receiveOrClosed() throws InterruptedException {
while (true) {
var r = original.receiveSafe();
var r = original.receiveOrClosed();
if (r instanceof ChannelClosed c) {
return c;
} else {
Expand Down Expand Up @@ -87,8 +87,8 @@ public void done() {
}

@Override
public Object doneSafe() {
return original.doneSafe();
public Object doneOrClosed() {
return original.doneOrClosed();
}

@Override
Expand All @@ -97,8 +97,8 @@ public void error(Throwable reason) {
}

@Override
public Object errorSafe(Throwable reason) {
return original.errorSafe(reason);
public Object errorOrClosed(Throwable reason) {
return original.errorOrClosed(reason);
}

@Override
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/java/com/softwaremill/jox/Select.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class Select {
*/
@SafeVarargs
public static <U> U select(SelectClause<U>... clauses) throws InterruptedException {
var r = selectSafe(clauses);
var r = selectOrClosed(clauses);
if (r instanceof ChannelClosed c) {
throw c.toException();
} else {
Expand All @@ -77,18 +77,18 @@ public static <U> U select(SelectClause<U>... clauses) throws InterruptedExcepti
* is closed (done or in error).
*/
@SafeVarargs
public static <U> Object selectSafe(SelectClause<U>... clauses) throws InterruptedException {
public static <U> Object selectOrClosed(SelectClause<U>... clauses) throws InterruptedException {
while (true) {
if (clauses.length == 0) {
// no clauses given
return new ChannelDone();
}

var r = doSelectSafe(clauses);
var r = doSelectOrClosed(clauses);
//noinspection StatementWithEmptyBody
if (r == RestartSelectMarker.RESTART) {
// in case a `CollectSource` function filters out the element (the transformation function returns `null`,
// which is represented as a marker because `null` is a valid result of `doSelectSafe`, e.g. for send clauses),
// which is represented as a marker because `null` is a valid result of `doSelectorClosed`, e.g. for send clauses),
// we need to restart the selection process

// next loop
Expand All @@ -99,7 +99,7 @@ public static <U> Object selectSafe(SelectClause<U>... clauses) throws Interrupt
}

@SafeVarargs
private static <U> Object doSelectSafe(SelectClause<U>... clauses) throws InterruptedException {
private static <U> Object doSelectOrClosed(SelectClause<U>... clauses) throws InterruptedException {
// check that the clause doesn't refer to a channel that is already used in a different clause
var allRendezvous = verifyChannelsUnique_getAreAllRendezvous(clauses);

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/softwaremill/jox/Sink.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface Sink<T> extends CloseableChannel {
* @param value The value to send. Not {@code null}.
* @return Either {@code null}, or {@link ChannelClosed}, when the channel is closed.
*/
Object sendSafe(T value) throws InterruptedException;
Object sendOrClosed(T value) throws InterruptedException;

//

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/softwaremill/jox/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface Source<T> extends CloseableChannel {
*
* @return Either a value of type {@code T}, or {@link ChannelClosed}, when the channel is closed.
*/
Object receiveSafe() throws InterruptedException;
Object receiveOrClosed() throws InterruptedException;

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
Expand Down
10 changes: 5 additions & 5 deletions core/src/test/java/com/softwaremill/jox/ChannelBufferedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ void shouldReceiveFromAChannelUntilDone() throws InterruptedException {
c.done();

// when
var r1 = c.receiveSafe();
var r2 = c.receiveSafe();
var r3 = c.receiveSafe();
var r1 = c.receiveOrClosed();
var r2 = c.receiveOrClosed();
var r3 = c.receiveOrClosed();

// then
assertEquals(1, r1);
Expand All @@ -104,8 +104,8 @@ void shouldNotReceiveFromAChannelInCaseOfAnError() throws InterruptedException {
c.error(new RuntimeException());

// when
var r1 = c.receiveSafe();
var r2 = c.receiveSafe();
var r1 = c.receiveOrClosed();
var r2 = c.receiveOrClosed();

// then
assertInstanceOf(ChannelError.class, r1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void pendingReceivesShouldGetNotifiedThatChannelIsDone() throws InterruptedExcep
// given
Channel<Integer> c = new Channel<>();
scoped(scope -> {
var f = fork(scope, c::receiveSafe);
var f = fork(scope, c::receiveOrClosed);

// when
Thread.sleep(100L);
Expand All @@ -124,7 +124,7 @@ void pendingReceivesShouldGetNotifiedThatChannelIsDone() throws InterruptedExcep
assertEquals(new ChannelDone(), f.get());

// should be rejected immediately
assertEquals(new ChannelDone(), c.receiveSafe());
assertEquals(new ChannelDone(), c.receiveOrClosed());
});
}

Expand All @@ -133,7 +133,7 @@ void pendingSendsShouldGetNotifiedThatChannelIsErrored() throws InterruptedExcep
// given
Channel<Integer> c = new Channel<>();
scoped(scope -> {
var f = fork(scope, () -> c.sendSafe(1));
var f = fork(scope, () -> c.sendOrClosed(1));

// when
Thread.sleep(100L);
Expand All @@ -143,7 +143,7 @@ void pendingSendsShouldGetNotifiedThatChannelIsErrored() throws InterruptedExcep
assertInstanceOf(ChannelError.class, f.get());

// should be rejected immediately
assertInstanceOf(ChannelError.class, c.sendSafe(2));
assertInstanceOf(ChannelError.class, c.sendOrClosed(2));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ void shouldReceiveFromAChannelUntilDone() throws InterruptedException {
c.done();

// when
var r1 = c.receiveSafe();
var r2 = c.receiveSafe();
var r3 = c.receiveSafe();
var r4 = c.receiveSafe();
var r1 = c.receiveOrClosed();
var r2 = c.receiveOrClosed();
var r3 = c.receiveOrClosed();
var r4 = c.receiveOrClosed();

// then
assertEquals(1, r1);
Expand All @@ -60,8 +60,8 @@ void shouldNotReceiveFromAChannelInCaseOfAnError() throws InterruptedException {
c.error(new RuntimeException());

// when
var r1 = c.receiveSafe();
var r2 = c.receiveSafe();
var r1 = c.receiveOrClosed();
var r2 = c.receiveOrClosed();

// then
assertInstanceOf(ChannelError.class, r1);
Expand Down
Loading

0 comments on commit c96fe9b

Please sign in to comment.