Skip to content

Commit

Permalink
Deferring use of ExportObject shouldn't lead to error (#4515)
Browse files Browse the repository at this point in the history
Introduces a liveness scope specifically for the ExportObjects created
when handling a client message, releasing it after all of those objects
have been obtained and the ExportObjects are no longer needed.

Fixes #4514
  • Loading branch information
niloc132 authored Sep 20, 2023
1 parent bf579e0 commit bcb240f
Showing 1 changed file with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.google.protobuf.ByteString;
import com.google.rpc.Code;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.plugin.type.ObjectCommunicationException;
import io.deephaven.plugin.type.ObjectType;
Expand All @@ -17,6 +19,7 @@
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.SessionState.ExportObject;
import io.deephaven.server.session.TicketRouter;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.function.ThrowingRunnable;
import io.grpc.stub.StreamObserver;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -147,11 +150,21 @@ public void onNext(final StreamRequest request) {
"Data message sent before Connect message");
}
Data data = request.getData();
List<SessionState.ExportObject<Object>> referenceObjects = data.getExportedReferencesList().stream()
.map(typedTicket -> ticketRouter.resolve(session, typedTicket.getTicket(), "ticket"))
.collect(Collectors.toList());
LivenessScope exportScope = new LivenessScope();

List<SessionState.ExportObject<Object>> referenceObjects;
try (SafeCloseable ignored = LivenessScopeStack.open(exportScope, false)) {
referenceObjects = data.getExportedReferencesList().stream()
.map(typedTicket -> ticketRouter.resolve(session, typedTicket.getTicket(), "ticket"))
.collect(Collectors.toList());
}
runOrEnqueue(referenceObjects, () -> {
Object[] objs = referenceObjects.stream().map(ExportObject::get).toArray();
Object[] objs;
try {
objs = referenceObjects.stream().map(ExportObject::get).toArray();
} finally {
exportScope.release();
}
messageStream.onData(data.getPayload().asReadOnlyByteBuffer(), objs);
});
}
Expand Down

0 comments on commit bcb240f

Please sign in to comment.