-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix CallbackStack.Node
leak
#1318
Conversation
Some context to start: Cats Effects has been having memory leaks in CallbackStack since version 3.4.3. See for example: typelevel/cats-effect#3935 I've been facing this memory leak in an application using fs2-kafka, and found that I'm not the only one (typelevel/cats-effect#3973). Using a simple consumer > produce stream application, I monitored the size of the CallbackStack using the following command: ``` while sleep 1; do jcmd <pid> GC.class_histogram | grep 'cats.effect.CallbackStack$Node' ; done ``` I found that swapping the `F.race(shutdown, fetch)` for `fetch` stops the memory leak. This should not be an issue because the Stream is anyway interrupted on `.interruptWhen(F.race(shutdown, stopReqs.get).void.attempt)`, but I'm not 100% convinced of this.
That's interesting... I'm busy this weekend but I'd try to dig deep during the next week. Thanks for taking the time to investigate and find a solution! |
@abestel I wonder if this issue should be reported directly to CE. From your original message I understand that downgrading to 3.5.2 fixes the issue, am I wrong? If that's the case, the bug is on CE land. OTOH, if downgrading it's not working, then it's definitively us. But the thing is that I don't see anything inherently harmful or a misuse of CE API in If you can try that with your sample app, that would've awesome. If not, at least can you share the code to reproduce? 🙏🏽 Pinging @armanbilge just in case this rings a bell |
How is that? I understood that the memory leak has been occurring since 3.4.3:
That makes sense, because v3.4.3 is the release where we changed the implementation of https://github.com/typelevel/cats-effect/releases/tag/v3.4.3 If the leak persists even with the latest Cats Effect v3.5.4, then I'm inclined to say that this is a Cats Effect bug that needs to be fixed. |
As Arman said above, this has been happening since 3.4.3 (or at least before 3.5.x). I've been experience weird behaviours in my fs2-kafka applications at work for quite a while, and the main issue is that even running in Kubernetes does not fully "fix" the problem by rebooting the app because sometimes the GC just gets into a weird state where it consumes all the CPU without throwing a OOM, and the only way out is to manually kill the pod.
It does. My thinking while opening this PR is that the change should not have impacts functionally while fixing a weird behavior that may very well be a bug in Cats Effects. If we can mitigate the memory leak in fs2-kafka while troubleshooting Cats Effect, that would be a first win, but I understand if you'd rather skip this first step and troubleshoot the root cause directly.
I was actually trying to minimized the problem taking fs2-kafka out of the loop, but this is proving to be more complicated than I expected. I'll try to extract the reproducer I was using in a minimal repo and share it with you (most likely tomorrow). Thank you both for the prompt replies! |
Here is a simple repository to reproduce the memory leak: https://github.com/abestel/fs2-kafka-memory-leak |
Digging deeper into this, I managed to get a minimal reproduction that is leaking pretty fast (easily millions of nodes in the CallbackStack in a few seconds): object MemoryLeak extends IOApp.Simple {
override def run: IO[Unit] =
Resource.make(IO.never.start)(_.cancel.start.void).use { fiber =>
fs2.Stream
.repeatEval(
IO.race(fiber.joinWithUnit, IO.unit)
)
.compile
.drain
}
} This is inspired by:
At this point I have no idea if this a misuse of CE API or if it's a bug in CE however. |
Awesome findings @abestel! 🙌🏽 Here's a sligthly simple version that also leaks 👇🏽 import cats.effect._
object Main extends IOApp.Simple {
override def run: IO[Unit] = {
def loop(fiber: FiberIO[Unit]): IO[Unit] = IO.race(fiber.joinWithUnit, IO.unit) >> loop(fiber)
IO.never[Unit].start.flatMap(loop)
}
} With 3.5.2:
With >= 3.5.3:
@armanbilge Given that both leak, it's not related to the latest changes in |
Yes, we consider this a misuse of the API.
So perhaps that code should be re-written to use However, this still wouldn't explain why the memory leak appeared in v3.4.3 which only changed |
That may be a mistake on my side, I started noticing the memory leak quite a while ago and it seemed to fit with my upgrading of dependencies (in particular a big jump in fs2-kafka hence a big jump in CE). Now, even when trying the minimal reproductions with 3.4.2, the memory leak is still happening so it has most likely been there all along. |
Coming back to this, I'm not sure we need to do that change rather than what I'm suggesting in this PR. As far as I understand, the "culprit" is the repeated evaluation of Stream
.repeatEval {
stopReqs
.tryGet
.flatMap {
case None =>
fetchPartition
case Some(()) =>
// Prevent issuing additional requests after partition is
// revoked or shutdown happens, in case the stream isn't
// interrupted fast enough
F.unit
}
}
.interruptWhen(F.race(shutdown, stopReqs.get).void.attempt)
.compile
.drain So the Resource.make(IO.sleep(10.seconds).void.start)(_.cancel.start.void).use { fiber =>
fs2.Stream
.repeatEval(
IO.never.onCancel(IO(println("canceled")))
)
.interruptWhen(fiber.joinWithUnit.attempt)
.compile
.drain
} |
I agree. This can be simplified as you suggest. But I'd like to look at it in more detail, just to be sure I don't miss anything. If everything goes as expected, tomorrow I'll review and merge this PR. I won't make a release though, because I want to use the generated SNAPSHOT in our Staging env first. |
@samspills You might be interested in testing |
CallbackStack.Node
leak
We want to try out the snapshot as well. Where can we find it? (I can't find it in Sonatype's snapshot repo.) |
You can try the release directly: https://github.com/fd4s/fs2-kafka/releases/tag/v3.5.0 |
This sounds like a trap that is a bit too easy to fall into. For instance |
Some context to start: Cats Effects has been having memory leaks in CallbackStack since version 3.4.3.
See for example: typelevel/cats-effect#3935
I've been facing this memory leak in an application using fs2-kafka, and found that I'm not the only one (typelevel/cats-effect#3973).
Using a simple consumer > produce stream application, I monitored the size of the CallbackStack using the following command:
I found that swapping the
F.race(shutdown, fetch)
forfetch
stops the memory leak. This should not be an issue because the Stream is anyway interrupted on.interruptWhen(F.race(shutdown, stopReqs.get).void.attempt)
, but I'm not 100% convinced of this.