-
Notifications
You must be signed in to change notification settings - Fork 400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent OOM when receiving large request streams #3174
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix! AFAICT the fix works, although the part that worries me a bit is that with these changes we're delegating ctx.read()
to a different threadpool which is will add a fair bit of overhead due to context switching.
I'm wondering whether we should add a continueReading: () => Boolean
argument to UnsafeAsync.Streaming
. This way we can decide in AsyncBodyReader
whether we'll invoke a ctx.read()
or defer it. The part that worries me with this is that we might be adding too much complexity. What do you think?
@@ -166,7 +174,7 @@ object NettyBody extends BodyEncoding { | |||
maybeError => | |||
ZChannel.fromZIO(queue.shutdown) *> | |||
maybeError.fold[ZChannel[Any, Any, Any, Any, E, Chunk[A], Unit]](ZChannel.unit)(ZChannel.fail(_)), | |||
a => ZChannel.write(a) *> loop, | |||
a => ZChannel.write(a) *> ZChannel.fromZIO(nettyRead.whenZIO(queue.isEmpty)) *> loop, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether we should buffer reads here, e.g., nettyRead.whenZIO(queue.size.map(_ < XXX))
Previously this was set to 4096 (using a bounded queue which we can't use anymore since it was blocking Netty's threads) and there weren't any issues right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, 2 notes on performance.
- Use
whenZIODiscard
instead ofwhenZIO
- I think it might be better to store this ZChannel in a val to avoid creating it on each loop:
val maybeRead = ZChannel.fromZIO(nettyRead.whenZIODiscard(???))
// And then
a => ZChannel.write(a) *> maybeRead *> loop,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix the suggested changes in NettyBody!
Not sure I understand how ctx.read() can be called from AsyncBodyReader as it has be called from the consuming side of the queue. So it's probably to complex for me at least :-) |
I tried locally with queue.size.map(_ < 4096) and it can be a lot of data since the chunks in the queue can be big in some cases. |
It feels like the buffering before #3060 was meant to be maximum 4096 bytes, but was really max 4096 chunks which is not what we want, I guess... |
This PR aims to fix 3173 by making sure we don't read more data than we can handle.
Now ctx.read() is called after the queue is drained by the consuming end of the stream. The call to ctx.read() will fill the queue with more data asynchronously.
Nettys AUTO_READ flag is already turned off in zio-http and ctx.read should be called when we are ready for more data, not before.
I'm not sure if we should keep the unbounded queue or not. I guess it will only contain one chunk at most.