Skip to content

Commit

Permalink
chunk detection based on response format (#59)
Browse files Browse the repository at this point in the history
This introduces `AbstractChunkReader` and adds a few chunk readers with different chunk detection strategies. Also includes some auto selection heuristics based on detected response format and ability to specify a chunk reader at `Ctx` and `Client` levels.

Chunk readers available:
- `LineChunkReader`: Chunks delimited by newline. This was the only available strategy earlier. It is now the default when the response type is detected to be not of `OpenAPI.APIModel` type.
- `JSONChunkReader`: Each chunk is a JSON. Whitespaces between JSONs are ignored. This is now the default when the response type is detected to be a `OpenAPI.APIModel`.
- `RFC7464ChunkReader`: A reader based on [RFC 7464](https://www.rfc-editor.org/rfc/rfc7464.html). Available for use by overriding through `Client` or `Ctx`.

The `Client` and `Ctx` constructors take an additional `chunk_reader_type` keyword parameter. This can be one of `OpenAPI.Clients.LineChunkReader`, `OpenAPI.Clients.JSONChunkReader` or `OpenAPI.Clients.RFC7464ChunkReader`. If not specified, then the type is automatically determined based on the return type of the API call.
  • Loading branch information
tanmaykm authored Aug 28, 2023
1 parent 8928973 commit 0053243
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 5 deletions.
75 changes: 70 additions & 5 deletions src/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import Base: convert, show, summary, getproperty, setproperty!, iterate
import ..OpenAPI: APIModel, UnionAPIModel, OneOfAPIModel, AnyOfAPIModel, APIClientImpl, OpenAPIException, InvocationException, to_json, from_json, validate_property, property_type
import ..OpenAPI: str2zoneddatetime, str2datetime, str2date


abstract type AbstractChunkReader end

# collection formats (OpenAPI v2)
# TODO: OpenAPI v3 has style and explode options instead of collection formats, which are yet to be supported
# TODO: Examine whether multi is now supported
Expand Down Expand Up @@ -128,6 +131,7 @@ Keyword parameters:
- `pre_request_hook(ctx::Ctx)`: This method is called before every API call. It is passed the context object that will be used for the API call. The function should return the context object to be used for the API call.
- `pre_request_hook(resource_path::AbstractString, body::Any, headers::Dict{String,String})`: This method is called before every API call. It is passed the resource path, request body and request headers that will be used for the API call. The function should return those after making any modifications to them.
- `escape_path_params`: Whether the path parameters should be escaped before being used in the URL. This is useful if the path parameters contain characters that are not allowed in URLs or contain path separators themselves.
- `chunk_reader_type`: The type of chunk reader to be used for streaming responses. This can be one of `LineChunkReader`, `JSONChunkReader` or `RFC7464ChunkReader`. If not specified, then the type is automatically determined based on the return type of the API call.
- `verbose`: Can be set either to a boolean or a function.
- If set to true, then the client will log all HTTP requests and responses.
- If set to a function, then that function will be called with the following parameters:
Expand All @@ -144,6 +148,7 @@ struct Client
timeout::Ref{Int}
pre_request_hook::Function # user provided hook to modify the request before it is sent
escape_path_params::Union{Nothing,Bool}
chunk_reader_type::Union{Nothing,Type{<:AbstractChunkReader}}
long_polling_timeout::Int

function Client(root::String;
Expand All @@ -153,6 +158,7 @@ struct Client
timeout::Int=DEFAULT_TIMEOUT_SECS,
pre_request_hook::Function=noop_pre_request_hook,
escape_path_params::Union{Nothing,Bool}=nothing,
chunk_reader_type::Union{Nothing,Type{<:AbstractChunkReader}}=nothing,
verbose::Union{Bool,Function}=false,
)
clntoptions = Dict{Symbol,Any}(:throw=>false)
Expand All @@ -167,7 +173,7 @@ struct Client
# disable ALPN to support servers that enable both HTTP/2 and HTTP/1.1 on same port
Downloads.Curl.setopt(easy, LibCURL.CURLOPT_SSL_ENABLE_ALPN, 0)
end
new(root, headers, get_return_type, clntoptions, downloader, Ref{Int}(timeout), pre_request_hook, escape_path_params, long_polling_timeout)
new(root, headers, get_return_type, clntoptions, downloader, Ref{Int}(timeout), pre_request_hook, escape_path_params, chunk_reader_type, long_polling_timeout)
end
end

Expand Down Expand Up @@ -237,15 +243,17 @@ struct Ctx
curl_mime_upload::Ref{Any}
pre_request_hook::Function
escape_path_params::Bool
chunk_reader_type::Union{Nothing,Type{<:AbstractChunkReader}}

function Ctx(client::Client, method::String, return_types::Dict{Regex,Type}, resource::String, auth, body=nothing;
timeout::Int=client.timeout[],
pre_request_hook::Function=client.pre_request_hook,
escape_path_params::Bool=something(client.escape_path_params, true),
chunk_reader_type::Union{Nothing,Type{<:AbstractChunkReader}}=client.chunk_reader_type,
)
resource = client.root * resource
headers = copy(client.headers)
new(client, method, return_types, resource, auth, Dict{String,String}(), Dict{String,String}(), headers, Dict{String,String}(), Dict{String,String}(), body, timeout, Ref{Any}(nothing), pre_request_hook, escape_path_params)
new(client, method, return_types, resource, auth, Dict{String,String}(), Dict{String,String}(), headers, Dict{String,String}(), Dict{String,String}(), body, timeout, Ref{Any}(nothing), pre_request_hook, escape_path_params, chunk_reader_type)
end
end

Expand Down Expand Up @@ -414,11 +422,11 @@ response(::Type{T}, data::Dict{String,Any}) where {T} = from_json(T, data)::T
response(::Type{T}, data::Dict{String,Any}) where {T<:Dict} = convert(T, data)
response(::Type{Vector{T}}, data::Vector{V}) where {T,V} = [response(T, v) for v in data]

struct ChunkReader
struct LineChunkReader <: AbstractChunkReader
buffered_input::Base.BufferStream
end

function Base.iterate(iter::ChunkReader, _state=nothing)
function Base.iterate(iter::LineChunkReader, _state=nothing)
if eof(iter.buffered_input)
return nothing
else
Expand All @@ -432,6 +440,57 @@ function Base.iterate(iter::ChunkReader, _state=nothing)
end
end

struct JSONChunkReader <: AbstractChunkReader
buffered_input::Base.BufferStream
end

function Base.iterate(iter::JSONChunkReader, _state=nothing)
if eof(iter.buffered_input)
return nothing
else
# read all whitespaces
while !eof(iter.buffered_input)
byte = peek(iter.buffered_input, UInt8)
if isspace(Char(byte))
read(iter.buffered_input, UInt8)
else
break
end
end
eof(iter.buffered_input) && return nothing
valid_json = JSON.parse(iter.buffered_input)
bytes = convert(Vector{UInt8}, codeunits(JSON.json(valid_json)))
return (bytes, iter)
end
end

# Ref: https://www.rfc-editor.org/rfc/rfc7464.html
const RFC7464_RECORD_SEPARATOR = UInt8(0x1E)
struct RFC7464ChunkReader <: AbstractChunkReader
buffered_input::Base.BufferStream
end

function Base.iterate(iter::RFC7464ChunkReader, _state=nothing)
if eof(iter.buffered_input)
return nothing
else
out = IOBuffer()
while !eof(iter.buffered_input)
byte = read(iter.buffered_input, UInt8)
if byte == RFC7464_RECORD_SEPARATOR
bytes = take!(out)
if isnothing(_state) || !isempty(bytes)
return (bytes, iter)
end
else
write(out, byte)
end
end
bytes = take!(out)
return (bytes, iter)
end
end

noop_pre_request_hook(ctx::Ctx) = ctx
noop_pre_request_hook(resource_path::AbstractString, body::Any, headers::Dict{String,String}) = (resource_path, body, headers)

Expand Down Expand Up @@ -491,7 +550,13 @@ function do_request(ctx::Ctx, stream::Bool=false; stream_to::Union{Channel,Nothi
end
@async begin
try
for chunk in ChunkReader(output)
if isnothing(ctx.chunk_reader_type)
default_return_type = ctx.client.get_return_type(ctx.return_types, nothing, "")
readerT = default_return_type <: APIModel ? JSONChunkReader : LineChunkReader
else
readerT = ctx.chunk_reader_type
end
for chunk in readerT(output)
return_type = ctx.client.get_return_type(ctx.return_types, nothing, String(copy(chunk)))
data = response(return_type, resp, chunk)
put!(stream_to, data)
Expand Down
197 changes: 197 additions & 0 deletions test/chunkreader_tests.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
module ChunkReaderTests
using Test
using JSON
using OpenAPI
using OpenAPI.Clients: AbstractChunkReader, JSONChunkReader, LineChunkReader, RFC7464ChunkReader

function linechunk1()
buff = Base.BufferStream()
reader = LineChunkReader(buff)
results = String[]
readertask = @async begin
for line in reader
push!(results, String(line))
end
end
write(buff, "hello\nworld\n")
write(buff, "goodbye\n")
close(buff)
wait(readertask)
@test results == ["hello", "world", "goodbye"]
end

function linechunk2()
buff = Base.BufferStream()
reader = LineChunkReader(buff)
results = String[]
readertask = @async begin
for line in reader
push!(results, String(line))
end
end
write(buff, "\nhello\nworld\n")
write(buff, "goodbye\n")
close(buff)
wait(readertask)
@test results == ["", "hello", "world", "goodbye"]
end

function linechunk3()
buff = Base.BufferStream()
reader = LineChunkReader(buff)
results = String[]
readertask = @async begin
for line in reader
push!(results, String(line))
end
end
write(buff, "hello\nworld\n")
write(buff, "goodbye")
close(buff)
wait(readertask)
@test results == ["hello", "world", "goodbye"]
end

function jsonchunk1()
buff = Base.BufferStream()
reader = JSONChunkReader(buff)
results = String[]
readertask = @async begin
for json in reader
push!(results, String(json))
end
end

write(buff, "{\"hello\": \"world\"}")
write(buff, "{\"hello\": \"world\"}")
close(buff)
wait(readertask)
for result in results
json = JSON.parse(result)
@test json["hello"] == "world"
end
@test length(results) == 2
end

function jsonchunk2()
buff = Base.BufferStream()
reader = JSONChunkReader(buff)
results = String[]
readertask = @async begin
for json in reader
push!(results, String(json))
end
end

write(buff, "{\"hello\": \"world\"}\n")
write(buff, "{\"hello\": \"world\"}\n")
close(buff)
wait(readertask)
for result in results
json = JSON.parse(result)
@test json["hello"] == "world"
end
@test length(results) == 2
end

function jsonchunk3()
buff = Base.BufferStream()
reader = JSONChunkReader(buff)
results = String[]
readertask = @async begin
for json in reader
push!(results, String(json))
end
end

write(buff, "\n\n{\"hello\": \"world\"}\n\n")
write(buff, "{\"hello\": \"world\"}\n")
close(buff)
wait(readertask)
for result in results
json = JSON.parse(result)
@test json["hello"] == "world"
end
@test length(results) == 2
end

function jsonchunk4()
buff = Base.BufferStream()
reader = JSONChunkReader(buff)
results = String[]
readertask = @async begin
for json in reader
push!(results, String(json))
end
end

write(buff, "\n\n{\"hello\": \"world\"}\n\n")
write(buff, "{\"hello\": \"world\"\n")
close(buff)
@test_throws TaskFailedException wait(readertask)
@test length(results) == 1
end

function rfc7464chunk1()
buff = Base.BufferStream()
reader = RFC7464ChunkReader(buff)
results = String[]
readertask = @async begin
for chunk in reader
push!(results, String(chunk))
end
end

write(buff, OpenAPI.Clients.RFC7464_RECORD_SEPARATOR)
write(buff, "{\"hello\": \"world\"}")
write(buff, OpenAPI.Clients.RFC7464_RECORD_SEPARATOR)
write(buff, "{\"hello\": \"world\"}")
close(buff)
wait(readertask)
for result in results
if !isempty(result)
json = JSON.parse(result)
@test json["hello"] == "world"
end
end
@test length(results) == 3
end

function rfc7464chunk2()
buff = Base.BufferStream()
reader = RFC7464ChunkReader(buff)
results = String[]
readertask = @async begin
for chunk in reader
push!(results, String(chunk))
end
end

write(buff, "{\"hello\": \"world\"}")
write(buff, OpenAPI.Clients.RFC7464_RECORD_SEPARATOR)
write(buff, "{\"hello\": \"world\"}")
write(buff, OpenAPI.Clients.RFC7464_RECORD_SEPARATOR)
close(buff)
wait(readertask)
for result in results
if !isempty(result)
json = JSON.parse(result)
@test json["hello"] == "world"
end
end
@test length(results) == 2
end

function runtests()
linechunk1()
linechunk2()
linechunk3()
jsonchunk1()
jsonchunk2()
jsonchunk3()
jsonchunk4()
rfc7464chunk1()
rfc7464chunk2()
end

end # module ChunkReaderTests
4 changes: 4 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Test, HTTP

include("chunkreader_tests.jl")
include("testutils.jl")
include("modelgen/testmodelgen.jl")
include("client/runtests.jl")
Expand All @@ -10,6 +11,9 @@ include("forms/forms_client.jl")
@testset "ModelGen" begin
TestModelGen.runtests()
end
@testset "Chunk Readers" begin
ChunkReaderTests.runtests()
end
@testset "Petstore Client" begin
try
if run_tests_with_servers && !openapi_generator_env
Expand Down

0 comments on commit 0053243

Please sign in to comment.