Skip to content

Commit

Permalink
Fix stream body delay 2 (#2420)
Browse files Browse the repository at this point in the history
* Fix stream body delay

* Remove Content-Length header for stream response

* Restore spec

* File response with 'jar' protocol doesn't have content-type header

* Convert to classic assertions due to a macro error

* fmt

* fix lint
  • Loading branch information
guersam authored Sep 24, 2023
1 parent 7e37bb0 commit 70aabfc
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 48 deletions.
28 changes: 8 additions & 20 deletions zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,27 +66,15 @@ object NettyBodyWriter {
None
case StreamBody(stream, _, _) =>
Some(
stream.chunks
.runFoldZIO(Option.empty[Chunk[Byte]]) {
case (Some(previous), current) =>
NettyFutureExecutor.executed {
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(previous.toArray)))
} *>
ZIO.succeed(Some(current))
case (_, current) =>
ZIO.succeed(Some(current))
stream.chunks.mapZIO { bytes =>
NettyFutureExecutor.executed {
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
}
.flatMap { maybeLastChunk =>
// last chunk is handled separately to avoid fiber interrupt before EMPTY_LAST_CONTENT is sent
ZIO.attempt(
maybeLastChunk.foreach { lastChunk =>
ctx.write(new DefaultHttpContent(Unpooled.wrappedBuffer(lastChunk.toArray)))
},
) *>
NettyFutureExecutor.executed {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
},
}.runDrain.zipRight {
NettyFutureExecutor.executed {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
},
)
case ChunkBody(data, _, _) =>
ctx.write(Unpooled.wrappedBuffer(data.toArray))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ private[zio] object NettyResponseEncoder {
fastEncode(response, bytes)
} else {
val jHeaders = Conversions.headersToNetty(response.headers)
// Prevent client from closing connection before server writes EMPTY_LAST_CONTENT.
if (response.body.isInstanceOf[Body.StreamBody]) {
jHeaders.remove(HttpHeaderNames.CONTENT_LENGTH)
}
val jStatus = Conversions.statusToNetty(response.status)
val hasContentLength = jHeaders.contains(HttpHeaderNames.CONTENT_LENGTH)
if (!hasContentLength) jHeaders.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
Expand Down
4 changes: 0 additions & 4 deletions zio-http/src/test/scala/zio/http/StaticFileServerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ object StaticFileServerSpec extends HttpRunnableSpec {
val res = resourceOk.run().map(_.status)
assertZIO(res)(equalTo(Status.Ok))
},
test("should have content-length") {
val res = resourceOk.run().map(_.header(Header.ContentLength))
assertZIO(res)(isSome(equalTo(Header.ContentLength(7L))))
},
test("should have content") {
val res = resourceOk.run().flatMap(_.body.asString)
assertZIO(res)(equalTo("foo\nbar"))
Expand Down
49 changes: 25 additions & 24 deletions zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package zio.http.netty

import zio._
import zio.test.Assertion._
import zio.test.TestAspect.withLiveClock
import zio.test.{Spec, TestEnvironment, assertTrue}
import zio.test.{Spec, TestEnvironment, assert}

import zio.stream.{ZStream, ZStreamAspect}

Expand All @@ -19,8 +20,7 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
handler(
http.Response(
status = Status.Ok,
// content length header is important,
// in this case the server will not use chunked transfer encoding even if response is a stream
// Content-Length header will be removed when the body is a stream
headers = Headers(Header.ContentLength(len)),
body = Body.fromStream(streams.next()),
),
Expand Down Expand Up @@ -76,13 +76,9 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
)
client <- ZIO.service[Client]
firstResponse <- makeRequest(client, port)
firstResponseBodyReceive <- firstResponse.body.asStream.chunks
.map(chunk => new String(chunk.toArray))
.mapZIO { chunk =>
atLeastOneChunkReceived.succeed(()) *> ZIO.succeed(chunk)
}
.runCollect
.fork
firstResponseBodyReceive <- firstResponse.body.asStream.chunks.mapZIO { chunk =>
atLeastOneChunkReceived.succeed(()) *> ZIO.succeed(chunk.asString)
}.runCollect.fork
_ <- firstResponseQueue.offerAll(message.getBytes.toList)
_ <- atLeastOneChunkReceived.await
// saying that there will be no more data in the first response stream
Expand All @@ -93,20 +89,25 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
// java.lang.IllegalStateException: unexpected message type: LastHttpContent"
// exception will be thrown
secondResponse <- makeRequest(client, port)
secondResponseBody <- secondResponse.body.asStream.chunks.map(chunk => new String(chunk.toArray)).runCollect
firstResponseBody <- firstResponseBodyReceive.join
value =
firstResponse.status == Status.Ok &&
// since response has not chunked transfer encoding header we can't guarantee that
// received chunks will be the same as it was transferred. So we need to check the whole body
firstResponseBody.reduce(_ + _) == message &&
secondResponse.status == Status.Ok &&
secondResponseBody == Chunk(message)
} yield {
assertTrue(
value,
)
}
secondResponseBody <- secondResponse.body.asStream.chunks.map(_.asString).runCollect
firstResponseBody <- firstResponseBodyReceive.join

assertFirst =
assert(firstResponse.status)(equalTo(Status.Ok)) &&
assert(firstResponse.headers.get(Header.ContentLength))(isNone) &&
assert(firstResponse.headers.get(Header.TransferEncoding))(
isSome(equalTo(Header.TransferEncoding.Chunked)),
) &&
assert(firstResponseBody.reduce(_ + _))(equalTo(message))

assertSecond =
assert(secondResponse.status)(equalTo(Status.Ok)) &&
assert(secondResponse.headers.get(Header.ContentLength))(isNone) &&
assert(secondResponse.headers.get(Header.TransferEncoding))(
isSome(equalTo(Header.TransferEncoding.Chunked)),
) &&
assert(secondResponseBody)(equalTo(Chunk(message, "")))
} yield assertFirst && assertSecond
},
).provide(
singleConnectionClient,
Expand Down

0 comments on commit 70aabfc

Please sign in to comment.