Skip to content

Commit

Permalink
Merge pull request #211 from well-typed/finley/209
Browse files Browse the repository at this point in the history
Explicit cancellation of streams on client exceptions
  • Loading branch information
edsko authored Sep 5, 2024
2 parents dec0e10 + ab9689a commit 5f7b140
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 165 deletions.
4 changes: 2 additions & 2 deletions grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ library
, exceptions >= 0.10 && < 0.11
, hashable >= 1.3 && < 1.5
, http-types >= 0.12 && < 0.13
, http2 >= 5.3.1 && < 5.4
, http2 >= 5.3.4 && < 5.4
, http2-tls >= 0.4.1 && < 0.5
, lens >= 5.0 && < 5.4
, mtl >= 2.2 && < 2.4
Expand Down Expand Up @@ -350,7 +350,7 @@ test-suite test-grapesy
, containers >= 0.6 && < 0.8
, exceptions >= 0.10 && < 0.11
, http-types >= 0.12 && < 0.13
, http2 >= 5.3.1 && < 5.4
, http2 >= 5.3.4 && < 5.4
, lens >= 5.0 && < 5.4
, mtl >= 2.2 && < 2.4
, network >= 3.1 && < 3.3
Expand Down
2 changes: 1 addition & 1 deletion interop/Interop/Client/TestCase/CancelAfterBegin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Proto.API.Interop
-- cancellation gets reported by the grapesy client library itself.
runTest :: Cmdline -> IO ()
runTest cmdline =
withConnection def (testServer cmdline) $ \conn -> do
withConnection def (testServer cmdline) $ \conn ->
assertThrows (assertEqual GrpcCancelled . grpcError) $
withRPC conn def (Proxy @StreamingInputCall) $ \_call ->
-- Immediately cancel request
Expand Down
2 changes: 1 addition & 1 deletion interop/Interop/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ withInteropServer cmdline k = do
= ServerConfig {
serverSecure = Nothing
, serverInsecure = Just InsecureConfig {
insecureHost = Nothing
insecureHost = Just "127.0.0.1"
, insecurePort = cmdPort cmdline
}
}
Expand Down
140 changes: 104 additions & 36 deletions src/Network/GRPC/Client/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Data.Bifunctor
import Data.Bitraversable
import Data.ByteString.Char8 qualified as BS.Strict.C8
import Data.Default
import Data.Foldable (asum)
import Data.List (intersperse)
import Data.Maybe (fromMaybe, isJust)
import Data.Maybe (fromMaybe)
import Data.Proxy
import Data.Text qualified as Text
import Data.Version
Expand All @@ -55,13 +56,12 @@ import Network.GRPC.Common.Compression qualified as Compression
import Network.GRPC.Common.StreamElem qualified as StreamElem
import Network.GRPC.Spec
import Network.GRPC.Util.GHC
import Network.GRPC.Util.HKD qualified as HKD
import Network.GRPC.Util.HTTP2.Stream (ServerDisconnected(..))
import Network.GRPC.Util.Session qualified as Session
import Network.GRPC.Util.Thread qualified as Thread

import Paths_grapesy qualified as Grapesy
import Network.GRPC.Util.HKD qualified as HKD
import Data.Bifunctor

{-------------------------------------------------------------------------------
Open a call
Expand Down Expand Up @@ -111,25 +111,78 @@ withRPC conn callParams proxy k = fmap fst $
generalBracket
(liftIO $ startRPC conn proxy callParams)
closeRPC
k
(k . fst)
where
closeRPC :: Call rpc -> ExitCase a -> m ()
closeRPC call exitCase = liftIO $ do
closeRPC :: (Call rpc, Session.CancelRequest) -> ExitCase a -> m ()
closeRPC (call, cancelRequest) exitCase = liftIO $ do
-- /Before/ we do anything else (see below), check if we have evidence
-- that we can discard the connection.
canDiscard <- checkCanDiscard call

-- Send the RST_STREAM frame /before/ closing the outbound thread.
--
-- When we call 'Session.close', we will terminate the
-- 'sendMessageLoop', @http2@ will interpret this as a clean termination
-- of the stream. We must therefore cancel this stream before calling
-- 'Session.close'. /If/ the final message has already been sent,
-- @http2@ guarantees (as a postcondition of @outBodyPushFinal@) that
-- cancellation will be a no-op.
sendResetFrame cancelRequest exitCase

-- Now close the /outbound/ thread, see docs of 'Session.close' for
-- details.
mException <- liftIO $ Session.close (callChannel call) exitCase
case mException of
Nothing -> return ()
Nothing ->
-- The outbound thread had already terminated
return ()
Just ex ->
case fromException ex of
Nothing -> throwM ex
Just discarded -> throwCancelled call discarded
Nothing ->
-- We are leaving the scope of 'withRPC' because of an exception
-- in the client, just rethrow that exception.
throwM ex
Just discarded ->
-- We are leaving the scope of 'withRPC' without having sent the
-- final message.
--
-- If the server was closed before we cancelled the stream, this
-- means that the server unilaterally closed the connection.
-- This should be regarded as normal termination of the RPC (see
-- the docs for 'withRPC')
--
-- Otherwise, the client left the scope of 'withRPC' before the
-- RPC was complete, which the gRPC spec mandates to result in a
-- 'GrpcCancelled' exception. See docs of 'throwCancelled'.
unless canDiscard $
throwCancelled discarded

-- Send a @RST_STREAM@ frame if necessary
sendResetFrame :: Session.CancelRequest -> ExitCase a -> IO ()
sendResetFrame cancelRequest exitCase =
cancelRequest $
case exitCase of
ExitCaseSuccess _ ->
-- Error code will be CANCEL
Nothing
ExitCaseAbort ->
-- Error code will be INTERNAL_ERROR. The client aborted with an
-- error that we don't have access to. We want to tell the server
-- that something has gone wrong (i.e. INTERNAL_ERROR), so we must
-- pass an exception, however the exact nature of the exception is
-- not particularly important as it is only recorded locally.
Just . toException $ Session.ChannelAborted callStack
ExitCaseException e ->
-- Error code will be INTERNAL_ERROR
Just e

-- The spec mandates that when a client cancels a request (which in grapesy
-- means exiting the scope of withRPC), the client receives a CANCELLED
-- exception. We need to deal with the edge case mentioned above, however:
-- the server might have already closed the connection. The client must have
-- evidence that this is the case, which could mean one of two things:
--
-- o The received the final message from the server
-- o The client received the final message from the server
-- o The server threw an exception (and the client saw this)
--
-- We can check for the former using 'channelRecvFinal', and the latter
Expand All @@ -141,38 +194,53 @@ withRPC conn callParams proxy k = fmap fst $
-- o If the server threw an exception, and the client observed this, then
-- the inbound thread state /must/ have changed to 'ThreadException'.
--
-- Note that it /not/ sufficient to check if the inbound thread has
-- Note that it is /not/ sufficient to check if the inbound thread has
-- terminated: we might have received the final message, but the thread
-- might still be /about/ to terminate, but not /actually/ have terminated.
--
-- See also:
--
-- o <https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#cancel_after_begin>
-- o <https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#cancel_after_first_response>
throwCancelled :: Call rpc -> ChannelDiscarded -> IO ()
throwCancelled Call{callChannel} (ChannelDiscarded cs) = do
throwCancelled :: ChannelDiscarded -> IO ()
throwCancelled (ChannelDiscarded cs) = do
throwM $ GrpcException {
grpcError = GrpcCancelled
, grpcErrorMessage = Just $ mconcat [
"Channel discarded by client at "
, Text.pack $ prettyCallStack cs
]
, grpcErrorMetadata = []
}

checkCanDiscard :: Call rpc -> IO Bool
checkCanDiscard Call{callChannel} = do
mRecvFinal <- atomically $
readTVar $ Session.channelRecvFinal callChannel
let onNotRunning :: STM ()
onNotRunning = return ()
mTerminated <- atomically $
Thread.hasThreadTerminated $ Session.channelInbound callChannel
let serverClosed :: Bool
serverClosed = or [
case mRecvFinal of
Session.RecvNotFinal -> False
Session.RecvWithoutTrailers _ -> True
Session.RecvFinal _ -> True
, isJust mTerminated
]

unless serverClosed $
throwM $ GrpcException {
grpcError = GrpcCancelled
, grpcErrorMessage = Just $ mconcat [
"Channel discarded by client at "
, Text.pack $ prettyCallStack cs
]
, grpcErrorMetadata = []
}
Thread.getThreadState_
(Session.channelInbound callChannel)
onNotRunning
return $
or [
case mRecvFinal of
Session.RecvNotFinal -> False
Session.RecvWithoutTrailers _ -> True
Session.RecvFinal _ -> True

-- We are checking if we have evidence that we can discard the
-- channel. If the inbound thread is not yet running, this implies
-- that the server has not yet initiated their response to us,
-- which means we have no evidence to believe we can discard the
-- channel.
, case mTerminated of
Thread.ThreadNotYetRunning_ () -> False
Thread.ThreadRunning_ -> False
Thread.ThreadDone_ -> True
Thread.ThreadException_ _ -> True
]

-- | Open new channel to the server
--
Expand All @@ -186,7 +254,7 @@ startRPC :: forall rpc.
=> Connection
-> Proxy rpc
-> CallParams rpc
-> IO (Call rpc)
-> IO (Call rpc, Session.CancelRequest)
startRPC conn _ callParams = do
(connClosed, connToServer) <- Connection.getConnectionToServer conn
cOut <- Connection.getOutboundCompression conn
Expand All @@ -205,7 +273,7 @@ startRPC conn _ callParams = do
. grpcClassifyTermination
. either trailersOnlyToProperTrailers' id

channel <-
(channel, cancelRequest) <-
Session.setupRequestChannel
session
connToServer
Expand Down Expand Up @@ -235,7 +303,7 @@ startRPC conn _ callParams = do
_mAlreadyClosed <- Session.close channel exitReason
return ()

return $ Call channel
return (Call channel, cancelRequest)
where
connParams :: ConnParams
connParams = Connection.connParams conn
Expand Down Expand Up @@ -308,7 +376,7 @@ sendInputWithMeta Call{callChannel} msg = liftIO $ do

-- This should be called before exiting the scope of 'withRPC'.
StreamElem.whenDefinitelyFinal msg $ \_ ->
void $ Session.waitForOutbound callChannel
Session.waitForOutbound callChannel

-- | Receive an output from the peer
--
Expand Down
8 changes: 4 additions & 4 deletions src/Network/GRPC/Server/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,12 @@ sendOutputWithMeta call@Call{callChannel} msg = do
msg' <- bitraverse mkTrailers return msg
Session.send callChannel msg'

-- This /must/ be called before leaving the scope of 'acceptCall' (or we
-- This /must/ be called before leaving the scope of 'runHandler' (or we
-- risk that the HTTP2 stream is cancelled). We can't call 'waitForOutbound'
-- /in/ 'acceptCall', because if the handler for whatever reason never
-- /in/ 'runHandler', because if the handler for whatever reason never
-- writes the final message, such a call would block indefinitely.
StreamElem.whenDefinitelyFinal msg $ \_ ->
void $ Session.waitForOutbound callChannel
Session.waitForOutbound callChannel
where
mkTrailers :: ResponseTrailingMetadata rpc -> IO ProperTrailers
mkTrailers metadata = do
Expand Down Expand Up @@ -650,7 +650,7 @@ sendProperTrailers Call{callContext, callResponseKickoff, callChannel}
-- If we didn't update, then the response has already been initiated and
-- we cannot make use of the Trailers-Only case.
Session.send callChannel (NoMoreElems trailers)
void $ Session.waitForOutbound callChannel
Session.waitForOutbound callChannel
where
ServerContext{serverParams} = callContext

Expand Down
4 changes: 2 additions & 2 deletions src/Network/GRPC/Server/RequestHandler/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ requestHandlerToServer ::
-- ^ Request handler
--
-- We can assume in 'requestHandlerToServer' that the handler will not
-- throw any exceptions(doing so will cause @http2@ to reset the stream,
-- throw any exceptions (doing so will cause @http2@ to reset the stream,
-- which is not always the right thing to do; see detailed comments in
-- 'acceptCall'). It is the responsibility of 'serverTopLevel' (prior to
-- 'runHandler'). It is the responsibility of 'serverTopLevel' (prior to
-- calling 'requestHandlerToServer') to catch any remaining exceptions.
-> HTTP2.Server
requestHandlerToServer handler req _aux respond =
Expand Down
Loading

0 comments on commit 5f7b140

Please sign in to comment.