Skip to content

Commit

Permalink
Correctly notify UI of failures in subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Sep 9, 2024
1 parent 6b4054f commit 6f74b22
Showing 1 changed file with 30 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.deephaven.web.client.state.ClientTableState;
import io.deephaven.web.shared.data.RangeSet;
import io.deephaven.web.shared.data.ShiftedRange;
import io.deephaven.web.shared.fu.JsRunnable;
import jsinterop.annotations.JsProperty;
import jsinterop.base.Any;
import jsinterop.base.Js;
Expand Down Expand Up @@ -85,6 +86,8 @@ public enum Status {

protected Status status = Status.STARTING;

private String failMsg;

public AbstractTableSubscription(ClientTableState state, WorkerConnection connection) {
state.retain(this);
this.state = state;
Expand All @@ -100,6 +103,7 @@ public AbstractTableSubscription(ClientTableState state, WorkerConnection connec
*/
protected void revive() {
// Once the state is running, set up the actual subscription
// Don't let subscription be used again, table failed and user will have already gotten an error elsewhere
state.onRunning(s -> {
if (status != Status.STARTING) {
// already closed
Expand All @@ -124,10 +128,11 @@ protected void revive() {
doExchange.onEnd(this::onStreamEnd);

sendFirstSubscriptionRequest();
}, () -> {
// Don't let subscription be used again, table failed and user will have already gotten an error elsewhere
status = Status.DONE;
});
},
// If the upstream table fails, kill the subscription
this::fail,
// If the upstream table is closed, its because this subscription released it, do nothing
JsRunnable.doNothing());
}

public Status getStatus() {
Expand Down Expand Up @@ -161,7 +166,13 @@ protected static FlatBufferBuilder subscriptionRequest(byte[] tableTicket, BitSe

protected void sendBarrageSubscriptionRequest(RangeSet viewport, JsArray<Column> columns, Double updateIntervalMs,
boolean isReverseViewport) {
assert status == Status.ACTIVE || status == Status.PENDING_UPDATE : status;
if (status == Status.DONE) {
if (failMsg == null) {
throw new IllegalStateException("Can't change subscription, already closed");
} else {
throw new IllegalStateException("Can't change subscription, already failed: " + failMsg);
}
}
status = Status.PENDING_UPDATE;
this.columns = columns;
this.viewportRowSet = viewport;
Expand Down Expand Up @@ -471,12 +482,25 @@ private void onFlightData(FlightData data) {
}

protected void onStreamEnd(ResponseStreamWrapper.Status status) {
if (this.status != Status.DONE && status.isTransportError()) {
if (this.status == Status.DONE) {
return;
}
if (status.isTransportError()) {
// If the subscription isn't closed and we hit a transport error, allow it to restart
this.status = Status.STARTING;
} else {
// Subscription failed somehow, fire an event
fail(status.getDetails());
}
}

private void fail(String message) {
failureHandled(message);
this.status = Status.DONE;
doExchange = null;
failMsg = message;
}

/**
* The columns that were subscribed to when this subscription was created
*
Expand Down

0 comments on commit 6f74b22

Please sign in to comment.