diff --git a/src/EventLoop/Internal/AbstractDriver.php b/src/EventLoop/Internal/AbstractDriver.php index fc2e732..8a2e2cc 100644 --- a/src/EventLoop/Internal/AbstractDriver.php +++ b/src/EventLoop/Internal/AbstractDriver.php @@ -48,6 +48,7 @@ abstract class AbstractDriver implements Driver private readonly \Closure $interruptCallback; private readonly \Closure $queueCallback; + /** @var \Closure(): (?\Closure(): mixed) */ private readonly \Closure $runCallback; private readonly \stdClass $internalSuspensionMarker; @@ -87,13 +88,16 @@ public function __construct() /** @psalm-suppress InvalidArgument */ $this->interruptCallback = $this->setInterrupt(...); $this->queueCallback = $this->queue(...); - $this->runCallback = function () { - if ($this->fiber->isTerminated()) { - $this->createLoopFiber(); - } + $this->runCallback = + /** @return ?\Closure(): mixed */ + function (): ?\Closure { + if ($this->fiber->isTerminated()) { + $this->createLoopFiber(); + } - return $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start(); - }; + // Returns a callback that returns the value of the {main} fiber, or null in case of deadlock. + return $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start(); + }; } public function run(): void @@ -533,26 +537,32 @@ private function createLoopFiber(): void { $this->fiber = new \Fiber(function (): void { $this->stopped = false; + do { + // Invoke microtasks if we have some + $this->invokeCallbacks(); - // Invoke microtasks if we have some - $this->invokeCallbacks(); - - /** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */ - while (!$this->stopped) { - if ($this->interrupt) { - $this->invokeInterrupt(); - } + /** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */ + while (!$this->stopped) { + if ($this->interrupt) { + $this->invokeInterrupt(); + } - if ($this->isEmpty()) { - return; - } + if ($this->isEmpty()) { + while (\gc_collect_cycles()); + if (!$this->microtaskQueue->isEmpty() || !$this->callbackQueue->isEmpty() || $this->interrupt) { + continue 2; + } + return; + } - $previousIdle = $this->idle; - $this->idle = true; + $previousIdle = $this->idle; + $this->idle = true; - $this->tick($previousIdle); - $this->invokeCallbacks(); - } + $this->tick($previousIdle); + $this->invokeCallbacks(); + } + return; + } while (true); }); } diff --git a/src/EventLoop/Internal/DriverSuspension.php b/src/EventLoop/Internal/DriverSuspension.php index a3a7e29..6999638 100644 --- a/src/EventLoop/Internal/DriverSuspension.php +++ b/src/EventLoop/Internal/DriverSuspension.php @@ -28,6 +28,7 @@ final class DriverSuspension implements Suspension private bool $deadMain = false; /** + * @param \Closure(): (?\Closure(): mixed) $run * @param \WeakMap> $suspensions */ public function __construct( @@ -145,6 +146,7 @@ public function suspend(): mixed throw new \Error('Event loop terminated without resuming the current suspension (the cause is either a fiber deadlock, or an incorrectly unreferenced/canceled watcher):' . $info); } + assert($result !== null); return $result(); } diff --git a/test/EventLoopTest.php b/test/EventLoopTest.php index 8f334f2..9aacf59 100644 --- a/test/EventLoopTest.php +++ b/test/EventLoopTest.php @@ -9,6 +9,130 @@ class EventLoopTest extends TestCase { + public function testSuspensionResumptionWithQueueInGarbageCollection(): void + { + $suspension = EventLoop::getSuspension(); + + $class = new class ($suspension) { + public function __construct(public Suspension $suspension) + { + } + public function __destruct() + { + $this->suspension->resume(true); + } + }; + $cycle = [$class, &$cycle]; + unset($class, $cycle); + + $ended = $suspension->suspend(); + + $this->assertTrue($ended); + } + + public function testEventLoopResumptionWithQueueInGarbageCollection(): void + { + $suspension = EventLoop::getSuspension(); + + $class = new class ($suspension) { + public function __construct(public Suspension $suspension) + { + } + public function __destruct() + { + EventLoop::queue($this->suspension->resume(...), true); + } + }; + $cycle = [$class, &$cycle]; + unset($class, $cycle); + + $ended = $suspension->suspend(); + + $this->assertTrue($ended); + } + + + public function testSuspensionResumptionWithQueueInGarbageCollectionNested(): void + { + $suspension = EventLoop::getSuspension(); + + $resumer = new class ($suspension) { + public function __construct(public Suspension $suspension) + { + } + public function __destruct() + { + $this->suspension->resume(true); + } + }; + + $class = new class ($resumer) { + public static ?object $staticReference = null; + public function __construct(object $resumer) + { + self::$staticReference = $resumer; + } + public function __destruct() + { + EventLoop::queue(function () { + $class = self::$staticReference; + $cycle = [$class, &$cycle]; + unset($class, $cycle); + + self::$staticReference = null; + }); + } + }; + $cycle = [$class, &$cycle]; + unset($class, $resumer, $cycle); + + + $ended = $suspension->suspend(); + + $this->assertTrue($ended); + } + + public function testEventLoopResumptionWithQueueInGarbageCollectionNested(): void + { + $suspension = EventLoop::getSuspension(); + + $resumer = new class ($suspension) { + public function __construct(public Suspension $suspension) + { + } + public function __destruct() + { + EventLoop::queue($this->suspension->resume(...), true); + } + }; + + $class = new class ($resumer) { + public static ?object $staticReference = null; + public function __construct(object $resumer) + { + self::$staticReference = $resumer; + } + public function __destruct() + { + EventLoop::queue(function () { + $class = self::$staticReference; + $cycle = [$class, &$cycle]; + unset($class, $cycle); + + self::$staticReference = null; + }); + } + }; + $cycle = [$class, &$cycle]; + unset($class, $resumer, $cycle); + + + $ended = $suspension->suspend(); + + $this->assertTrue($ended); + } + + public function testDelayWithNegativeDelay(): void { $this->expectException(\Error::class);