diff --git a/Project.toml b/Project.toml index 2f85e7e..5c20766 100644 --- a/Project.toml +++ b/Project.toml @@ -4,7 +4,7 @@ keywords = ["Swagger", "OpenAPI", "REST"] license = "MIT" desc = "OpenAPI server and client helper for Julia" authors = ["JuliaHub Inc."] -version = "0.1.25" +version = "0.1.26" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" diff --git a/src/client.jl b/src/client.jl index 04087fe..a7b6a1e 100644 --- a/src/client.jl +++ b/src/client.jl @@ -151,6 +151,7 @@ struct Client escape_path_params::Union{Nothing,Bool} chunk_reader_type::Union{Nothing,Type{<:AbstractChunkReader}} long_polling_timeout::Int + request_interrupt_supported::Bool function Client(root::String; headers::Dict{String,String}=Dict{String,String}(), @@ -174,7 +175,8 @@ struct Client # disable ALPN to support servers that enable both HTTP/2 and HTTP/1.1 on same port Downloads.Curl.setopt(easy, LibCURL.CURLOPT_SSL_ENABLE_ALPN, 0) end - new(root, headers, get_return_type, clntoptions, downloader, Ref{Int}(timeout), pre_request_hook, escape_path_params, chunk_reader_type, long_polling_timeout) + interruptable = request_supports_interrupt() + new(root, headers, get_return_type, clntoptions, downloader, Ref{Int}(timeout), pre_request_hook, escape_path_params, chunk_reader_type, long_polling_timeout, interruptable) end end @@ -556,6 +558,10 @@ function do_request(ctx::Ctx, stream::Bool=false; stream_to::Union{Channel,Nothi try if stream + interrupt = nothing + if ctx.client.request_interrupt_supported + kwargs[:interrupt] = interrupt = Base.Event() + end @sync begin download_task = @async begin try @@ -565,7 +571,10 @@ function do_request(ctx::Ctx, stream::Bool=false; stream_to::Union{Channel,Nothi kwargs... ) catch ex - if !isa(ex, InterruptException) + # If request method does not support interrupt natively, InterrptException is used to + # signal the download task to stop. Otherwise, InterrptException is not handled and is rethrown. + # Any exception other than InterruptException is rethrown always. + if ctx.client.request_interrupt_supported || !isa(ex, InterruptException) @error("exception invoking request", exception=(ex,catch_backtrace())) rethrow() end @@ -604,10 +613,25 @@ function do_request(ctx::Ctx, stream::Bool=false; stream_to::Union{Channel,Nothi catch ex isa(ex, InvalidStateException) || rethrow(ex) interrupted = true - istaskdone(download_task) || schedule(download_task, InterruptException(), error=true) + if !istaskdone(download_task) + # If the download task is still running, interrupt it. + # If it supports interrupt natively, then use event to signal it. + # Otherwise, throw an InterruptException to stop the download task. + if ctx.client.request_interrupt_supported + notify(interrupt) + else + schedule(download_task, InterruptException(), error=true) + end + end + end + end + if !interrupted && !istaskdone(download_task) + if ctx.client.request_interrupt_supported + notify(interrupt) + else + schedule(download_task, InterruptException(), error=true) end end - interrupted || istaskdone(download_task) || schedule(download_task, InterruptException(), error=true) end end else @@ -863,4 +887,13 @@ function deep_object_serialize(dict::Dict, parent_key::String = "") return Dict(parts) end +function request_supports_interrupt() + for m in methods(request) + if :interrupt in Base.kwarg_decl(m) + return true + end + end + return false +end + end # module Clients