diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java index cfa9617a690..9d4d875f6f8 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java @@ -14,7 +14,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.util.UUID; +import java.util.function.Supplier; public class GrpcUtil { private static final Logger log = LoggerFactory.getLogger(GrpcUtil.class); @@ -56,7 +56,7 @@ public static void safelyOnNext(StreamObserver observer, T message) { * @param message the last message to send on this stream before completing * @param the type of message that the stream handles */ - public static void safelyComplete(StreamObserver observer, T message) { + public static void safelyOnNextAndComplete(StreamObserver observer, T message) { safelyExecuteLocked(observer, () -> { observer.onNext(message); observer.onCompleted(); diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index 56f225ea3c7..be2b5f63b52 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -120,6 +120,7 @@ public static void DoGetCustom( .queryPerformanceRecorder(queryPerformanceRecorder) .require(tableExport) .onError(observer) + .onSuccess(observer) .submit(() -> { metrics.queueNanos = System.nanoTime() - queueStartTm; Object export = tableExport.get(); @@ -146,8 +147,6 @@ public static void DoGetCustom( // shared code between `DoGet` and `BarrageSnapshotRequest` BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, null, null, false, DEFAULT_SNAPSHOT_DESER_OPTIONS, listener, metrics); - - listener.onCompleted(); }); } } @@ -544,6 +543,27 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { .queryPerformanceRecorder(queryPerformanceRecorder) .require(tableExport) .onError(listener) + .onSuccess(() -> { + final HalfClosedState newState = halfClosedState.updateAndGet(current -> { + switch (current) { + case DONT_CLOSE: + // record that we have finished sending + return HalfClosedState.FINISHED_SENDING; + case CLIENT_HALF_CLOSED: + // since streaming has now finished, and client already half-closed, + // time to half close from server + return HalfClosedState.CLOSED; + case FINISHED_SENDING: + case CLOSED: + throw new IllegalStateException("Can't finish streaming twice"); + default: + throw new IllegalStateException("Unknown state " + current); + } + }); + if (newState == HalfClosedState.CLOSED) { + GrpcUtil.safelyComplete(listener); + } + }) .submit(() -> { metrics.queueNanos = System.nanoTime() - queueStartTm; Object export = tableExport.get(); @@ -586,25 +606,6 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, columns, viewport, reverseViewport, snapshotOptAdapter.adapt(snapshotRequest), listener, metrics); - HalfClosedState newState = halfClosedState.updateAndGet(current -> { - switch (current) { - case DONT_CLOSE: - // record that we have finished sending - return HalfClosedState.FINISHED_SENDING; - case CLIENT_HALF_CLOSED: - // since streaming has now finished, and client already half-closed, - // time to half close from server - return HalfClosedState.CLOSED; - case FINISHED_SENDING: - case CLOSED: - throw new IllegalStateException("Can't finish streaming twice"); - default: - throw new IllegalStateException("Unknown state " + current); - } - }); - if (newState == HalfClosedState.CLOSED) { - listener.onCompleted(); - } }); } } @@ -614,7 +615,7 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { public void close() { // no work to do for DoGetRequest close // possibly safely complete if finished sending data - HalfClosedState newState = halfClosedState.updateAndGet(current -> { + final HalfClosedState newState = halfClosedState.updateAndGet(current -> { switch (current) { case DONT_CLOSE: // record that we have half closed @@ -630,7 +631,7 @@ public void close() { } }); if (newState == HalfClosedState.CLOSED) { - listener.onCompleted(); + GrpcUtil.safelyComplete(listener); } } } diff --git a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java index ac3bedf066d..dc7fc6f1ce0 100644 --- a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java @@ -44,6 +44,10 @@ import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; +import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete; +import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyError; +import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyOnNextAndComplete; + @Singleton public class FlightServiceGrpcImpl extends FlightServiceGrpc.FlightServiceImplBase { private static final Logger log = LoggerFactory.getLogger(FlightServiceGrpcImpl.class); @@ -97,7 +101,7 @@ public void onError(Throwable t) { @Override public void onCompleted() { - GrpcUtil.safelyComplete(responseObserver); + safelyComplete(responseObserver); } }; } @@ -118,7 +122,7 @@ private HandshakeObserver(StreamObserver responseObser public void onNext(final Flight.HandshakeRequest value) { final AuthenticationRequestHandler.HandshakeResponseListener handshakeResponseListener = (protocol, response) -> { - GrpcUtil.safelyComplete(responseObserver, Flight.HandshakeResponse.newBuilder() + safelyOnNextAndComplete(responseObserver, Flight.HandshakeResponse.newBuilder() .setProtocolVersion(protocol) .setPayload(ByteStringAccess.wrap(response)) .build()); @@ -222,14 +226,13 @@ public void getFlightInfo( ticketRouter.flightInfoFor(session, request, "request"); if (session != null) { - session.nonExport() + session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .require(export) .onError(responseObserver) - .submit(() -> { - responseObserver.onNext(export.get()); - responseObserver.onCompleted(); - }); + .onSuccess((final Flight.FlightInfo resultFlightInfo) -> safelyOnNextAndComplete( + responseObserver, resultFlightInfo)) + .submit(export::get); return; } @@ -237,15 +240,14 @@ public void getFlightInfo( if (export.tryRetainReference()) { try { if (export.getState() == ExportNotification.State.EXPORTED) { - GrpcUtil.safelyOnNext(responseObserver, export.get()); - GrpcUtil.safelyComplete(responseObserver); + safelyOnNextAndComplete(responseObserver, export.get()); } } finally { export.dropReference(); } } else { exception = Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info"); - GrpcUtil.safelyError(responseObserver, exception); + safelyError(responseObserver, exception); } if (queryPerformanceRecorder.endQuery() || exception != null) { @@ -269,16 +271,16 @@ public void getSchema( ticketRouter.flightInfoFor(session, request, "request"); if (session != null) { - session.nonExport() + session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .require(export) .onError(responseObserver) - .submit(() -> { - responseObserver.onNext(Flight.SchemaResult.newBuilder() - .setSchema(export.get().getSchema()) - .build()); - responseObserver.onCompleted(); - }); + .onSuccess((final Flight.SchemaResult resultSchema) -> safelyOnNextAndComplete( + responseObserver, + resultSchema)) + .submit(() -> Flight.SchemaResult.newBuilder() + .setSchema(export.get().getSchema()) + .build()); return; } @@ -286,10 +288,9 @@ public void getSchema( if (export.tryRetainReference()) { try { if (export.getState() == ExportNotification.State.EXPORTED) { - GrpcUtil.safelyOnNext(responseObserver, Flight.SchemaResult.newBuilder() + safelyOnNextAndComplete(responseObserver, Flight.SchemaResult.newBuilder() .setSchema(export.get().getSchema()) .build()); - GrpcUtil.safelyComplete(responseObserver); } } finally { export.dropReference(); diff --git a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java index 8ab6115ccb7..2197546a9f2 100644 --- a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java @@ -52,8 +52,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete; -import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyOnNext; +import static io.deephaven.extensions.barrage.util.GrpcUtil.*; @Singleton public class ConsoleServiceGrpcImpl extends ConsoleServiceGrpc.ConsoleServiceImplBase { @@ -139,11 +138,9 @@ public void startConsole( .onError(responseObserver) .submit(() -> { final ScriptSession scriptSession = new DelegatingScriptSession(scriptSessionProvider.get()); - - safelyComplete(responseObserver, StartConsoleResponse.newBuilder() + safelyOnNextAndComplete(responseObserver, StartConsoleResponse.newBuilder() .setResultId(request.getResultId()) .build()); - return scriptSession; }); } @@ -154,7 +151,7 @@ public void subscribeToLogs( @NotNull final StreamObserver responseObserver) { sessionService.getCurrentSession(); if (REMOTE_CONSOLE_DISABLED) { - GrpcUtil.safelyError(responseObserver, Code.FAILED_PRECONDITION, "Remote console disabled"); + safelyError(responseObserver, Code.FAILED_PRECONDITION, "Remote console disabled"); return; } // Session close logic implicitly handled in @@ -183,16 +180,18 @@ public void executeCommand( final SessionState.ExportObject exportedConsole = ticketRouter.resolve(session, consoleId, "consoleId"); - session.nonExport() + session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .requiresSerialQueue() .require(exportedConsole) .onError(responseObserver) + .onSuccess((final ExecuteCommandResponse response) -> safelyOnNextAndComplete(responseObserver, + response)) .submit(() -> { - ScriptSession scriptSession = exportedConsole.get(); - ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode()); - ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder(); - FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder(); + final ScriptSession scriptSession = exportedConsole.get(); + final ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode()); + final ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder(); + final FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder(); changes.created.entrySet() .forEach(entry -> fieldChanges.addCreated(makeVariableDefinition(entry))); changes.updated.entrySet() @@ -203,7 +202,7 @@ public void executeCommand( diff.setErrorMessage(Throwables.getStackTraceAsString(changes.error)); log.error().append("Error running script: ").append(changes.error).endl(); } - safelyComplete(responseObserver, diff.setChanges(fieldChanges).build()); + return diff.setChanges(fieldChanges).build(); }); } } @@ -276,7 +275,9 @@ public void bindTableToVariable( ExportBuilder exportBuilder = session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .requiresSerialQueue() - .onError(responseObserver); + .onError(responseObserver) + .onSuccess(() -> safelyOnNextAndComplete(responseObserver, + BindTableToVariableResponse.getDefaultInstance())); if (request.hasConsoleId()) { exportedConsole = ticketRouter.resolve(session, request.getConsoleId(), "consoleId"); @@ -292,8 +293,6 @@ public void bindTableToVariable( Table table = exportedTable.get(); queryScope.putParam(request.getVariableName(), table); - responseObserver.onNext(BindTableToVariableResponse.getDefaultInstance()); - responseObserver.onCompleted(); }); } } @@ -405,7 +404,7 @@ public void start() { } public void stop() { - GrpcUtil.safelyComplete(client); + safelyComplete(client); } // ------------------------------------------------------------------------------------------------------------ @@ -459,7 +458,7 @@ public void run() { return; } if (tooSlow) { - GrpcUtil.safelyError(client, Code.RESOURCE_EXHAUSTED, String.format( + safelyError(client, Code.RESOURCE_EXHAUSTED, String.format( "Too slow: the client or network may be too slow to keep up with the logging rates, or there may be logging bursts that exceed the available buffer size. The buffer size can be configured through the server property '%s'.", SUBSCRIBE_TO_LOGS_BUFFER_SIZE_PROP)); return; @@ -472,7 +471,7 @@ public void run() { bufferIsKnownEmpty = true; break; } - GrpcUtil.safelyOnNext(client, payload); + safelyOnNext(client, payload); } } finally { guard.set(false); diff --git a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java index 08e6e9eeba6..e6222880e66 100644 --- a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java @@ -22,6 +22,7 @@ import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.select.WhereFilter; import io.deephaven.extensions.barrage.util.ExportUtil; +import io.deephaven.extensions.barrage.util.GrpcUtil; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.proto.backplane.grpc.*; @@ -46,7 +47,6 @@ import java.util.stream.Collectors; import static io.deephaven.engine.table.impl.AbsoluteSortColumnConventions.baseColumnNameToAbsoluteName; -import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete; public class HierarchicalTableServiceGrpcImpl extends HierarchicalTableServiceGrpc.HierarchicalTableServiceImplBase { @@ -85,10 +85,12 @@ public void rollup( final SessionState.ExportObject sourceTableExport = ticketRouter.resolve(session, request.getSourceTableId(), "sourceTableId"); - session.newExport(request.getResultRollupTableId(), "resultRollupTableId") + session.newExport(request.getResultRollupTableId(), "resultRollupTableId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(sourceTableExport) .onError(responseObserver) + .onSuccess((final RollupTable ignoredResult) -> GrpcUtil.safelyOnNextAndComplete(responseObserver, + RollupResponse.getDefaultInstance())) .submit(() -> { final Table sourceTable = sourceTableExport.get(); @@ -109,7 +111,6 @@ public void rollup( throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to rollup hierarchical table"); } - safelyComplete(responseObserver, RollupResponse.getDefaultInstance()); return transformedResult; }); } @@ -141,10 +142,12 @@ public void tree( final SessionState.ExportObject
sourceTableExport = ticketRouter.resolve(session, request.getSourceTableId(), "sourceTableId"); - session.newExport(request.getResultTreeTableId(), "resultTreeTableId") + session.newExport(request.getResultTreeTableId(), "resultTreeTableId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(sourceTableExport) .onError(responseObserver) + .onSuccess((final TreeTable ignoredResult) -> GrpcUtil.safelyOnNextAndComplete(responseObserver, + TreeResponse.getDefaultInstance())) .submit(() -> { final Table sourceTable = sourceTableExport.get(); @@ -169,7 +172,6 @@ public void tree( throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to tree hierarchical table"); } - safelyComplete(responseObserver, TreeResponse.getDefaultInstance()); return transformedResult; }); } @@ -202,10 +204,13 @@ public void apply( final SessionState.ExportObject> inputHierarchicalTableExport = ticketRouter.resolve(session, request.getInputHierarchicalTableId(), "inputHierarchicalTableId"); - session.newExport(request.getResultHierarchicalTableId(), "resultHierarchicalTableId") + session.>newExport(request.getResultHierarchicalTableId(), "resultHierarchicalTableId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(inputHierarchicalTableExport) .onError(responseObserver) + .onSuccess((final HierarchicalTable ignoredResult) -> GrpcUtil.safelyOnNextAndComplete( + responseObserver, + HierarchicalTableApplyResponse.getDefaultInstance())) .submit(() -> { final HierarchicalTable inputHierarchicalTable = inputHierarchicalTableExport.get(); @@ -274,7 +279,6 @@ public void apply( throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to apply to hierarchical table"); } - safelyComplete(responseObserver, HierarchicalTableApplyResponse.getDefaultInstance()); return transformedResult; }); } @@ -395,6 +399,9 @@ public void view( resultExportBuilder .queryPerformanceRecorder(queryPerformanceRecorder) .onError(responseObserver) + .onSuccess((final HierarchicalTableView ignoredResult) -> GrpcUtil.safelyOnNextAndComplete( + responseObserver, + HierarchicalTableViewResponse.getDefaultInstance())) .submit(() -> { final Table keyTable = keyTableExport == null ? null : keyTableExport.get(); final Object target = targetExport.get(); @@ -439,7 +446,6 @@ public void view( throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to view hierarchical table"); } - safelyComplete(responseObserver, HierarchicalTableViewResponse.getDefaultInstance()); return transformedResult; }); } @@ -483,10 +489,12 @@ public void exportSource( final SessionState.ExportObject> hierarchicalTableExport = ticketRouter.resolve(session, request.getHierarchicalTableId(), "hierarchicalTableId"); - session.newExport(request.getResultTableId(), "resultTableId") + session.
newExport(request.getResultTableId(), "resultTableId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(hierarchicalTableExport) .onError(responseObserver) + .onSuccess((final Table transformedResult) -> GrpcUtil.safelyOnNextAndComplete(responseObserver, + ExportUtil.buildTableCreationResponse(request.getResultTableId(), transformedResult))) .submit(() -> { final HierarchicalTable hierarchicalTable = hierarchicalTableExport.get(); @@ -499,9 +507,6 @@ public void exportSource( Code.FAILED_PRECONDITION, "Not authorized to export source from hierarchical table"); } - final ExportedTableCreationResponse response = - ExportUtil.buildTableCreationResponse(request.getResultTableId(), transformedResult); - safelyComplete(responseObserver, response); return transformedResult; }); } diff --git a/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java index 60aa6283e17..d48eb29ed66 100644 --- a/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java @@ -66,6 +66,7 @@ public ObjectServiceGrpcImpl(SessionService sessionService, TicketRouter ticketR private enum EnqueuedState { WAITING, RUNNING, CLOSED } + private final class SendMessageObserver implements StreamObserver { private final SessionState session; private final StreamObserver responseObserver; @@ -268,10 +269,13 @@ public void fetchObject( final SessionState.ExportObject object = ticketRouter.resolve(session, request.getSourceId().getTicket(), "sourceId"); - session.nonExport() + session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .require(object) .onError(responseObserver) + .onSuccess( + (final FetchObjectResponse response) -> GrpcUtil.safelyOnNextAndComplete(responseObserver, + response)) .submit(() -> { final Object o = object.get(); ObjectType objectTypeInstance = getObjectTypeInstance(type, o); @@ -312,9 +316,7 @@ public void onCompleted() { throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Plugin didn't close response, use MessageStream instead for this object"); } - GrpcUtil.safelyComplete(responseObserver, message); - - return null; + return message; }); } } diff --git a/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java index 7c45821873e..398b70bba48 100644 --- a/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java @@ -30,7 +30,7 @@ import java.util.List; import static io.deephaven.extensions.barrage.util.ExportUtil.buildTableCreationResponse; -import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete; +import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyOnNextAndComplete; public class PartitionedTableServiceGrpcImpl extends PartitionedTableServiceGrpc.PartitionedTableServiceImplBase { private static final Logger log = LoggerFactory.getLogger(PartitionedTableServiceGrpcImpl.class); @@ -67,16 +67,17 @@ public void partitionBy( final SessionState.ExportObject
targetTable = ticketRouter.resolve(session, request.getTableId(), "tableId"); - session.newExport(request.getResultId(), "resultId") + session.newExport(request.getResultId(), "resultId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(targetTable) .onError(responseObserver) + .onSuccess((final PartitionedTable ignoredResult) -> safelyOnNextAndComplete(responseObserver, + PartitionByResponse.getDefaultInstance())) .submit(() -> { authWiring.checkPermissionPartitionBy(session.getAuthContext(), request, Collections.singletonList(targetTable.get())); PartitionedTable partitionedTable = targetTable.get().partitionBy(request.getDropKeys(), request.getKeyColumnNamesList().toArray(String[]::new)); - safelyComplete(responseObserver, PartitionByResponse.getDefaultInstance()); return partitionedTable; }); } @@ -97,10 +98,12 @@ public void merge( final SessionState.ExportObject partitionedTable = ticketRouter.resolve(session, request.getPartitionedTable(), "partitionedTable"); - session.newExport(request.getResultId(), "resultId") + session.
newExport(request.getResultId(), "resultId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(partitionedTable) .onError(responseObserver) + .onSuccess((final Table merged) -> safelyOnNextAndComplete(responseObserver, + buildTableCreationResponse(request.getResultId(), merged))) .submit(() -> { final Table table = partitionedTable.get().table(); authWiring.checkPermissionMerge(session.getAuthContext(), request, @@ -116,9 +119,6 @@ public void merge( throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to merge table."); } - final ExportedTableCreationResponse response = - buildTableCreationResponse(request.getResultId(), merged); - safelyComplete(responseObserver, response); return merged; }); } @@ -142,10 +142,12 @@ public void getTable( final SessionState.ExportObject
keys = ticketRouter.resolve(session, request.getKeyTableTicket(), "keyTable"); - session.newExport(request.getResultId(), "resultId") + session.
newExport(request.getResultId(), "resultId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(partitionedTable, keys) .onError(responseObserver) + .onSuccess((final Table table) -> safelyOnNextAndComplete(responseObserver, + buildTableCreationResponse(request.getResultId(), table))) .submit(() -> { Table table; Table keyTable = keys.get(); @@ -189,13 +191,10 @@ public void getTable( }); } table = authorizationTransformation.transform(table); - final ExportedTableCreationResponse response = - buildTableCreationResponse(request.getResultId(), table); if (table == null) { throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to get table."); } - safelyComplete(responseObserver, response); return table; }); } diff --git a/server/src/main/java/io/deephaven/server/session/SessionService.java b/server/src/main/java/io/deephaven/server/session/SessionService.java index d19867eaa1d..f04d8a9937f 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionService.java +++ b/server/src/main/java/io/deephaven/server/session/SessionService.java @@ -511,7 +511,7 @@ protected void onClose() { } void sendMessage(final TerminationNotificationResponse response) { - GrpcUtil.safelyComplete(responseObserver, response); + GrpcUtil.safelyOnNextAndComplete(responseObserver, response); } } } diff --git a/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java index f9ccce190d2..ea9e1b39f23 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java @@ -184,11 +184,9 @@ public void exportFromTicket( .queryPerformanceRecorder(queryPerformanceRecorder) .require(source) .onError(responseObserver) - .submit(() -> { - final Object o = source.get(); - GrpcUtil.safelyComplete(responseObserver, ExportResponse.getDefaultInstance()); - return o; - }); + .onSuccess((final Object ignoredResult) -> GrpcUtil.safelyOnNextAndComplete(responseObserver, + ExportResponse.getDefaultInstance())) + .submit(source::get); } } @@ -221,7 +219,7 @@ public void publishFromTicket( Ticket resultId = request.getResultId(); ticketRouter.publish(session, resultId, "resultId", - () -> GrpcUtil.safelyComplete(responseObserver, PublishResponse.getDefaultInstance()), + () -> GrpcUtil.safelyOnNextAndComplete(responseObserver, PublishResponse.getDefaultInstance()), SessionState.toErrorHandler(sre -> GrpcUtil.safelyError(responseObserver, sre)), source); } diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 74b5aafc304..c144a1b77b7 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -417,7 +417,6 @@ public ExportBuilder nonExport() { * the close() method be idempotent, but when combined with {@link #removeOnCloseCallback(Closeable)}, close() will * only be called once from this class. *

- *

* If called after the session has expired, this will throw, and the close() method on the provided instance will * not be called. * @@ -436,7 +435,7 @@ public void addOnCloseCallback(final Closeable onClose) { /** * Remove an on-close callback bound to the life of the session. - *

+ *

* A common pattern to use this will be for an object to try to remove itself, and if it succeeds, to call its own * {@link Closeable#close()}. If it fails, it can expect to have close() be called automatically. * @@ -525,7 +524,6 @@ public static boolean isExportStateTerminal(final ExportNotification.State state * Note: we reuse ExportObject for non-exporting tasks that have export dependencies. * * @param Is context-sensitive depending on the export. - * * @apiNote ExportId may be 0, if this is a task that has exported dependencies, but does not export anything * itself. Non-exports do not publish state changes. */ @@ -1313,6 +1311,7 @@ void onError(final ExportNotification.State resultState, @Nullable final Exception cause, @Nullable final String dependentExportId); } + @FunctionalInterface public interface ExportErrorGrpcHandler { /** @@ -1505,6 +1504,21 @@ public ExportBuilder onSuccess(final Runnable successHandler) { return onSuccess(ignored -> successHandler.run()); } + /** + * Invoke this method to set the {@link StreamObserver} to be + * {@link io.deephaven.extensions.barrage.util.GrpcUtil#safelyComplete(StreamObserver) safely completed} if this + * export succeeds. Only one success handler may be set. Exactly one of the onError and onSuccess handlers will + * be invoked. + *

+ * Not synchronized, it is expected that the provided callback handles thread safety itself. + * + * @param streamObserver the streamObserver to be notified + * @return this builder + */ + public ExportBuilder onSuccess(final StreamObserver streamObserver) { + return onSuccess(() -> safelyComplete(streamObserver)); + } + /** * This method is the final method for submitting an export to the session. The provided callable is enqueued on * the scheduler when all dependencies have been satisfied. Only the dependencies supplied to the builder are @@ -1515,6 +1529,9 @@ public ExportBuilder onSuccess(final Runnable successHandler) { * * @param exportMain the callable that generates the export * @return the submitted export object + * @apiNote For exports used in RPC handling, it is recommended to use {@link #onSuccess onSuccess} for result + * message (unary) and completion (unary or streaming) delivery, rather than from {@code exportMain}. + * This allows clients to observe performance results more predictably. */ public ExportObject submit(final Callable exportMain) { export.setWork(exportMain, errorHandler, successHandler, requiresSerialQueue); @@ -1531,6 +1548,9 @@ public ExportObject submit(final Callable exportMain) { * * @param exportMain the runnable to execute once dependencies have resolved * @return the submitted export object + * @apiNote For exports used in RPC handling, it is recommended to use {@link #onSuccess onSuccess} for result + * message (unary) and completion (unary or streaming) delivery, rather than from {@code exportMain}. + * This allows clients to observe performance results more predictably. */ public ExportObject submit(final Runnable exportMain) { return submit(() -> { @@ -1555,7 +1575,7 @@ public int getExportId() { } private static final KeyedIntObjectKey> EXPORT_OBJECT_ID_KEY = - new KeyedIntObjectKey.BasicStrict>() { + new KeyedIntObjectKey.BasicStrict<>() { @Override public int getIntKey(final ExportObject exportObject) { return exportObject.exportId; @@ -1563,7 +1583,7 @@ public int getIntKey(final ExportObject exportObject) { }; private final KeyedIntObjectHash.ValueFactory> EXPORT_OBJECT_VALUE_FACTORY = - new KeyedIntObjectHash.ValueFactory.Strict>() { + new KeyedIntObjectHash.ValueFactory.Strict<>() { @Override public ExportObject newValue(final int key) { if (isExpired()) { diff --git a/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java index c7d56cde22d..43f7b168242 100644 --- a/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java @@ -29,7 +29,6 @@ import org.jetbrains.annotations.NotNull; import javax.inject.Inject; -import java.io.IOException; import java.util.List; public class InputTableServiceGrpcImpl extends InputTableServiceGrpc.InputTableServiceImplBase { @@ -99,7 +98,8 @@ public void addTableToInputTable( inputTableUpdater.addAsync(tableToAdd, new InputTableStatusListener() { @Override public void onSuccess() { - GrpcUtil.safelyComplete(responseObserver, AddTableResponse.getDefaultInstance()); + GrpcUtil.safelyOnNextAndComplete(responseObserver, + AddTableResponse.getDefaultInstance()); } @Override @@ -164,7 +164,8 @@ public void deleteTableFromInputTable( inputTableUpdater.deleteAsync(tableToRemove, new InputTableStatusListener() { @Override public void onSuccess() { - GrpcUtil.safelyComplete(responseObserver, DeleteTableResponse.getDefaultInstance()); + GrpcUtil.safelyOnNextAndComplete(responseObserver, + DeleteTableResponse.getDefaultInstance()); } @Override diff --git a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java index 3fdcc66811b..27299f57856 100644 --- a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java @@ -46,9 +46,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete; -import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyError; -import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyOnNext; +import static io.deephaven.extensions.barrage.util.GrpcUtil.*; public class TableServiceGrpcImpl extends TableServiceGrpc.TableServiceImplBase { @@ -455,10 +453,12 @@ public void seekRow( final SessionState.ExportObject

exportedTable = ticketRouter.resolve(session, sourceId, "sourceId"); - session.nonExport() + session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .require(exportedTable) .onError(responseObserver) + .onSuccess((final SeekRowResponse response) -> safelyOnNextAndComplete(responseObserver, + response)) .submit(() -> { final Table table = exportedTable.get(); authWiring.checkPermissionSeekRow(session.getAuthContext(), request, @@ -473,8 +473,7 @@ public void seekRow( request.getInsensitive(), request.getContains(), request.getIsBackward()).seek(table); - SeekRowResponse.Builder rowResponse = SeekRowResponse.newBuilder(); - safelyComplete(responseObserver, rowResponse.setResultRow(result).build()); + return SeekRowResponse.newBuilder().setResultRow(result).build(); }); } } @@ -525,18 +524,17 @@ public void batch( if (numRemaining > 0) { return; } - + final StatusRuntimeException failure = firstFailure.get(); try (final SafeCloseable ignored2 = queryPerformanceRecorder.resumeQuery()) { - final StatusRuntimeException failure = firstFailure.get(); - if (failure != null) { - safelyError(responseObserver, failure); - } else { - safelyComplete(responseObserver); - } if (queryPerformanceRecorder.endQuery()) { EngineMetrics.getInstance().logQueryProcessingResults(queryPerformanceRecorder, failure); } } + if (failure != null) { + safelyError(responseObserver, failure); + } else { + safelyComplete(responseObserver); + } }; for (int i = 0; i < exportBuilders.size(); ++i) { @@ -611,23 +609,21 @@ public void getExportedTableCreationResponse( try (final SafeCloseable ignored = queryPerformanceRecorder.startQuery()) { final SessionState.ExportObject export = ticketRouter.resolve(session, request, "request"); - session.nonExport() + session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .require(export) .onError(responseObserver) + .onSuccess((final ExportedTableCreationResponse response) -> safelyOnNextAndComplete( + responseObserver, + response)) .submit(() -> { final Object obj = export.get(); if (!(obj instanceof Table)) { - responseObserver.onError( - Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, - "Ticket is not a table")); - return; + throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Ticket is not a table"); } authWiring.checkPermissionGetExportedTableCreationResponse( session.getAuthContext(), request, Collections.singletonList((Table) obj)); - final ExportedTableCreationResponse response = - ExportUtil.buildTableCreationResponse(request, (Table) obj); - safelyComplete(responseObserver, response); + return ExportUtil.buildTableCreationResponse(request, (Table) obj); }); } } @@ -665,17 +661,15 @@ private void oneShotOperationWrapper( .map(ref -> resolveOneShotReference(session, ref)) .collect(Collectors.toList()); - session.newExport(resultId, "resultId") + session.
newExport(resultId, "resultId") .require(dependencies) - .onError(responseObserver) .queryPerformanceRecorder(queryPerformanceRecorder) + .onError(responseObserver) + .onSuccess((final Table result) -> safelyOnNextAndComplete(responseObserver, + ExportUtil.buildTableCreationResponse(resultId, result))) .submit(() -> { operation.checkPermission(request, dependencies); - final Table result = operation.create(request, dependencies); - final ExportedTableCreationResponse response = - ExportUtil.buildTableCreationResponse(resultId, result); - safelyComplete(responseObserver, response); - return result; + return operation.create(request, dependencies); }); } }