diff --git a/src/client.jl b/src/client.jl index 956c324..da138e1 100644 --- a/src/client.jl +++ b/src/client.jl @@ -14,6 +14,9 @@ import Base: convert, show, summary, getproperty, setproperty!, iterate import ..OpenAPI: APIModel, UnionAPIModel, OneOfAPIModel, AnyOfAPIModel, APIClientImpl, OpenAPIException, InvocationException, to_json, from_json, validate_property, property_type import ..OpenAPI: str2zoneddatetime, str2datetime, str2date + +abstract type AbstractChunkReader end + # collection formats (OpenAPI v2) # TODO: OpenAPI v3 has style and explode options instead of collection formats, which are yet to be supported # TODO: Examine whether multi is now supported @@ -128,6 +131,7 @@ Keyword parameters: - `pre_request_hook(ctx::Ctx)`: This method is called before every API call. It is passed the context object that will be used for the API call. The function should return the context object to be used for the API call. - `pre_request_hook(resource_path::AbstractString, body::Any, headers::Dict{String,String})`: This method is called before every API call. It is passed the resource path, request body and request headers that will be used for the API call. The function should return those after making any modifications to them. - `escape_path_params`: Whether the path parameters should be escaped before being used in the URL. This is useful if the path parameters contain characters that are not allowed in URLs or contain path separators themselves. +- `chunk_reader_type`: The type of chunk reader to be used for streaming responses. This can be one of `LineChunkReader`, `JSONChunkReader` or `RFC7464ChunkReader`. If not specified, then the type is automatically determined based on the return type of the API call. - `verbose`: Can be set either to a boolean or a function. - If set to true, then the client will log all HTTP requests and responses. - If set to a function, then that function will be called with the following parameters: @@ -144,6 +148,7 @@ struct Client timeout::Ref{Int} pre_request_hook::Function # user provided hook to modify the request before it is sent escape_path_params::Union{Nothing,Bool} + chunk_reader_type::Union{Nothing,Type{<:AbstractChunkReader}} long_polling_timeout::Int function Client(root::String; @@ -153,6 +158,7 @@ struct Client timeout::Int=DEFAULT_TIMEOUT_SECS, pre_request_hook::Function=noop_pre_request_hook, escape_path_params::Union{Nothing,Bool}=nothing, + chunk_reader_type::Union{Nothing,Type{<:AbstractChunkReader}}=nothing, verbose::Union{Bool,Function}=false, ) clntoptions = Dict{Symbol,Any}(:throw=>false) @@ -167,7 +173,7 @@ struct Client # disable ALPN to support servers that enable both HTTP/2 and HTTP/1.1 on same port Downloads.Curl.setopt(easy, LibCURL.CURLOPT_SSL_ENABLE_ALPN, 0) end - new(root, headers, get_return_type, clntoptions, downloader, Ref{Int}(timeout), pre_request_hook, escape_path_params, long_polling_timeout) + new(root, headers, get_return_type, clntoptions, downloader, Ref{Int}(timeout), pre_request_hook, escape_path_params, chunk_reader_type, long_polling_timeout) end end @@ -237,15 +243,17 @@ struct Ctx curl_mime_upload::Ref{Any} pre_request_hook::Function escape_path_params::Bool + chunk_reader_type::Union{Nothing,Type{<:AbstractChunkReader}} function Ctx(client::Client, method::String, return_types::Dict{Regex,Type}, resource::String, auth, body=nothing; timeout::Int=client.timeout[], pre_request_hook::Function=client.pre_request_hook, escape_path_params::Bool=something(client.escape_path_params, true), + chunk_reader_type::Union{Nothing,Type{<:AbstractChunkReader}}=client.chunk_reader_type, ) resource = client.root * resource headers = copy(client.headers) - new(client, method, return_types, resource, auth, Dict{String,String}(), Dict{String,String}(), headers, Dict{String,String}(), Dict{String,String}(), body, timeout, Ref{Any}(nothing), pre_request_hook, escape_path_params) + new(client, method, return_types, resource, auth, Dict{String,String}(), Dict{String,String}(), headers, Dict{String,String}(), Dict{String,String}(), body, timeout, Ref{Any}(nothing), pre_request_hook, escape_path_params, chunk_reader_type) end end @@ -414,11 +422,11 @@ response(::Type{T}, data::Dict{String,Any}) where {T} = from_json(T, data)::T response(::Type{T}, data::Dict{String,Any}) where {T<:Dict} = convert(T, data) response(::Type{Vector{T}}, data::Vector{V}) where {T,V} = [response(T, v) for v in data] -struct ChunkReader +struct LineChunkReader <: AbstractChunkReader buffered_input::Base.BufferStream end -function Base.iterate(iter::ChunkReader, _state=nothing) +function Base.iterate(iter::LineChunkReader, _state=nothing) if eof(iter.buffered_input) return nothing else @@ -432,6 +440,57 @@ function Base.iterate(iter::ChunkReader, _state=nothing) end end +struct JSONChunkReader <: AbstractChunkReader + buffered_input::Base.BufferStream +end + +function Base.iterate(iter::JSONChunkReader, _state=nothing) + if eof(iter.buffered_input) + return nothing + else + # read all whitespaces + while !eof(iter.buffered_input) + byte = peek(iter.buffered_input, UInt8) + if isspace(Char(byte)) + read(iter.buffered_input, UInt8) + else + break + end + end + eof(iter.buffered_input) && return nothing + valid_json = JSON.parse(iter.buffered_input) + bytes = convert(Vector{UInt8}, codeunits(JSON.json(valid_json))) + return (bytes, iter) + end +end + +# Ref: https://www.rfc-editor.org/rfc/rfc7464.html +const RFC7464_RECORD_SEPARATOR = UInt8(0x1E) +struct RFC7464ChunkReader <: AbstractChunkReader + buffered_input::Base.BufferStream +end + +function Base.iterate(iter::RFC7464ChunkReader, _state=nothing) + if eof(iter.buffered_input) + return nothing + else + out = IOBuffer() + while !eof(iter.buffered_input) + byte = read(iter.buffered_input, UInt8) + if byte == RFC7464_RECORD_SEPARATOR + bytes = take!(out) + if isnothing(_state) || !isempty(bytes) + return (bytes, iter) + end + else + write(out, byte) + end + end + bytes = take!(out) + return (bytes, iter) + end +end + noop_pre_request_hook(ctx::Ctx) = ctx noop_pre_request_hook(resource_path::AbstractString, body::Any, headers::Dict{String,String}) = (resource_path, body, headers) @@ -491,7 +550,13 @@ function do_request(ctx::Ctx, stream::Bool=false; stream_to::Union{Channel,Nothi end @async begin try - for chunk in ChunkReader(output) + if isnothing(ctx.chunk_reader_type) + default_return_type = ctx.client.get_return_type(ctx.return_types, nothing, "") + readerT = default_return_type <: APIModel ? JSONChunkReader : LineChunkReader + else + readerT = ctx.chunk_reader_type + end + for chunk in readerT(output) return_type = ctx.client.get_return_type(ctx.return_types, nothing, String(copy(chunk))) data = response(return_type, resp, chunk) put!(stream_to, data) diff --git a/test/chunkreader_tests.jl b/test/chunkreader_tests.jl new file mode 100644 index 0000000..3b10d7f --- /dev/null +++ b/test/chunkreader_tests.jl @@ -0,0 +1,197 @@ +module ChunkReaderTests +using Test +using JSON +using OpenAPI +using OpenAPI.Clients: AbstractChunkReader, JSONChunkReader, LineChunkReader, RFC7464ChunkReader + +function linechunk1() + buff = Base.BufferStream() + reader = LineChunkReader(buff) + results = String[] + readertask = @async begin + for line in reader + push!(results, String(line)) + end + end + write(buff, "hello\nworld\n") + write(buff, "goodbye\n") + close(buff) + wait(readertask) + @test results == ["hello", "world", "goodbye"] +end + +function linechunk2() + buff = Base.BufferStream() + reader = LineChunkReader(buff) + results = String[] + readertask = @async begin + for line in reader + push!(results, String(line)) + end + end + write(buff, "\nhello\nworld\n") + write(buff, "goodbye\n") + close(buff) + wait(readertask) + @test results == ["", "hello", "world", "goodbye"] +end + +function linechunk3() + buff = Base.BufferStream() + reader = LineChunkReader(buff) + results = String[] + readertask = @async begin + for line in reader + push!(results, String(line)) + end + end + write(buff, "hello\nworld\n") + write(buff, "goodbye") + close(buff) + wait(readertask) + @test results == ["hello", "world", "goodbye"] +end + +function jsonchunk1() + buff = Base.BufferStream() + reader = JSONChunkReader(buff) + results = String[] + readertask = @async begin + for json in reader + push!(results, String(json)) + end + end + + write(buff, "{\"hello\": \"world\"}") + write(buff, "{\"hello\": \"world\"}") + close(buff) + wait(readertask) + for result in results + json = JSON.parse(result) + @test json["hello"] == "world" + end + @test length(results) == 2 +end + +function jsonchunk2() + buff = Base.BufferStream() + reader = JSONChunkReader(buff) + results = String[] + readertask = @async begin + for json in reader + push!(results, String(json)) + end + end + + write(buff, "{\"hello\": \"world\"}\n") + write(buff, "{\"hello\": \"world\"}\n") + close(buff) + wait(readertask) + for result in results + json = JSON.parse(result) + @test json["hello"] == "world" + end + @test length(results) == 2 +end + +function jsonchunk3() + buff = Base.BufferStream() + reader = JSONChunkReader(buff) + results = String[] + readertask = @async begin + for json in reader + push!(results, String(json)) + end + end + + write(buff, "\n\n{\"hello\": \"world\"}\n\n") + write(buff, "{\"hello\": \"world\"}\n") + close(buff) + wait(readertask) + for result in results + json = JSON.parse(result) + @test json["hello"] == "world" + end + @test length(results) == 2 +end + +function jsonchunk4() + buff = Base.BufferStream() + reader = JSONChunkReader(buff) + results = String[] + readertask = @async begin + for json in reader + push!(results, String(json)) + end + end + + write(buff, "\n\n{\"hello\": \"world\"}\n\n") + write(buff, "{\"hello\": \"world\"\n") + close(buff) + @test_throws TaskFailedException wait(readertask) + @test length(results) == 1 +end + +function rfc7464chunk1() + buff = Base.BufferStream() + reader = RFC7464ChunkReader(buff) + results = String[] + readertask = @async begin + for chunk in reader + push!(results, String(chunk)) + end + end + + write(buff, OpenAPI.Clients.RFC7464_RECORD_SEPARATOR) + write(buff, "{\"hello\": \"world\"}") + write(buff, OpenAPI.Clients.RFC7464_RECORD_SEPARATOR) + write(buff, "{\"hello\": \"world\"}") + close(buff) + wait(readertask) + for result in results + if !isempty(result) + json = JSON.parse(result) + @test json["hello"] == "world" + end + end + @test length(results) == 3 +end + +function rfc7464chunk2() + buff = Base.BufferStream() + reader = RFC7464ChunkReader(buff) + results = String[] + readertask = @async begin + for chunk in reader + push!(results, String(chunk)) + end + end + + write(buff, "{\"hello\": \"world\"}") + write(buff, OpenAPI.Clients.RFC7464_RECORD_SEPARATOR) + write(buff, "{\"hello\": \"world\"}") + write(buff, OpenAPI.Clients.RFC7464_RECORD_SEPARATOR) + close(buff) + wait(readertask) + for result in results + if !isempty(result) + json = JSON.parse(result) + @test json["hello"] == "world" + end + end + @test length(results) == 2 +end + +function runtests() + linechunk1() + linechunk2() + linechunk3() + jsonchunk1() + jsonchunk2() + jsonchunk3() + jsonchunk4() + rfc7464chunk1() + rfc7464chunk2() +end + +end # module ChunkReaderTests \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index af3707b..6466b66 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,5 +1,6 @@ using Test, HTTP +include("chunkreader_tests.jl") include("testutils.jl") include("modelgen/testmodelgen.jl") include("client/runtests.jl") @@ -10,6 +11,9 @@ include("forms/forms_client.jl") @testset "ModelGen" begin TestModelGen.runtests() end + @testset "Chunk Readers" begin + ChunkReaderTests.runtests() + end @testset "Petstore Client" begin try if run_tests_with_servers && !openapi_generator_env