Skip to content

Commit

Permalink
v5.0.0 Release (#8)
Browse files Browse the repository at this point in the history
* Add ProcessQueue feature (#7)
  • Loading branch information
sweikenb authored Sep 23, 2023
1 parent a71c98f commit 0c25885
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 10 deletions.
34 changes: 34 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Changelog

## Release [v5.0.0](https://github.com/sweikenb/pcntl/releases/tag/v5.0.0)

**Features**

- `ProcessQueue` added as more flexible alternative to `ProcessPool` but without pre-created pool workers

**Breaking Changes**

- The return value of the optional `ProcessManager::wait`-callback is now used to determine if the method should
continue to wait for further children to exit. If a value other than explicitly `false` is returned, it will continue
to wait.

* * *

## Release [v4.0.0](https://github.com/sweikenb/pcntl/releases/tag/v4.0.0)

**Plan for future releases**

- Introduction of a maintained changelog for each release
- From now on, only major version releases will introduce BC breaks
- Features that are about to be removed in the next major version will be marked with `@deprecated`

**Bugfixes**

- [#4 Bug: very fast/empty forks will exit before the pcntl_wait() can capture it](https://github.com/sweikenb/pcntl/issues/4)

**Breaking Changes**

- PHP v8.2 as minimum requirement
- Due to incompatibility, this library DOES NOT work if the **grpc** PHP-extension is installed
- Removal of the optional `EventDispatcher`
- Some methods of the `ProcessPool` and `ProcessManager` has been renamed
44 changes: 43 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@

Simple and easy to use process manager for PHP based on default PCNTL and POSIX functions.

Please also have a look into the [CHANGELOG](CHANGELOG.md) for latest updates and release information .

**License**: [MIT](LICENSE.txt)

**Topics**

- [Installation](#installation)
- [Basic Usage](#usage)
- [IPC Inter Process Communication](#inter-process-communication)
- [Pool Processing](#process-pool--worker-processes)
- [Queued Processing](#process-queue)
- [Examples](#basic-example)

* * *

## Installation

You can install this library by [composer](https://getcomposer.org/):
Expand Down Expand Up @@ -94,7 +109,34 @@ sleep(5);
$pool->closePool();
```

## Example
## Process Queue

In case you want to work with dynamic callbacks instead of a process-pool, which will require predefined worker, you can
use the `ProcessQueue`. This queue limits the number of threads that will be forked and handles the return-management
for you.

```php
<?php

use Sweikenb\Library\Pcntl\Api\ChildProcessInterface;
use Sweikenb\Library\Pcntl\Api\ParentProcessInterface;
use Sweikenb\Library\Pcntl\ProcessQueue;

require __DIR__ . '/../vendor/autoload.php';

$maxNumberOfThreadsToRunParallel = 4;

$queue = new ProcessQueue($maxNumberOfThreadsToRunParallel);

for ($i = 1; $i <= 50; $i++) {
$queue->addToQueue(function (ChildProcessInterface $child, ParentProcessInterface $parent) use ($i) {
sleep(mt_rand(1, 3));
fwrite(STDOUT, sprintf("Queued thread %d processed message no. %d\n", $child->getId(), $i));
});
}
```

# Basic Example

```php
<?php
Expand Down
18 changes: 18 additions & 0 deletions example/200_process_queue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

use Sweikenb\Library\Pcntl\Api\ChildProcessInterface;
use Sweikenb\Library\Pcntl\Api\ParentProcessInterface;
use Sweikenb\Library\Pcntl\ProcessQueue;

require __DIR__ . '/../vendor/autoload.php';

$maxNumberOfThreadsToRunParallel = 4;

$queue = new ProcessQueue($maxNumberOfThreadsToRunParallel);

for ($i = 1; $i <= 50; $i++) {
$queue->addToQueue(function (ChildProcessInterface $child, ParentProcessInterface $parent) use ($i) {
sleep(mt_rand(1, 3));
fwrite(STDOUT, sprintf("Queued thread %d processed message no. %d\n", $child->getId(), $i));
});
}
7 changes: 4 additions & 3 deletions src/Api/ProcessManagerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ public function getMainProcess(): ParentProcessInterface;
public function runProcess(callable $callback): ChildProcessInterface;

/**
* Waits until all child-processes are returned and calls the optional callback for each process that returns.
* The callback will receive the return-status and the pid of the child process.
*
* Only works in the parent-process.
* By default, it waits until all child-processes are returned and calls the optional callback for each of them.
* The callback will receive the return-status and the pid of the child process as arguments.
* If the callback returns FALSE (boolean), the method will not wait for further children to exit and returns early,
* any other or none value (including NULL) will continue to wait if remaining children are present.
*/
public function wait(?callable $callback = null): void;
}
11 changes: 11 additions & 0 deletions src/Api/ProcessQueueInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

namespace Sweikenb\Library\Pcntl\Api;

interface ProcessQueueInterface
{
/**
* Adds the given callback to the queue for execution.
*/
public function addToQueue(callable $callback): ChildProcessInterface;
}
20 changes: 14 additions & 6 deletions src/ProcessManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -160,26 +160,34 @@ public function wait(?callable $callback = null): void
if ($this->isChildProcess) {
return;
}
$handleChildExit = function (int $pid, int $status) use ($callback): void {
$handleChildExit = function (int $pid, int $status) use ($callback): bool {
if ($pid > 0) {
if (isset($this->childProcesses[$pid])) {
unset($this->childProcesses[$pid]);
}
if (isset($this->earlyExitChildQueue[$pid])) {
unset($this->earlyExitChildQueue[$pid]);
}
if (null !== $callback) {
call_user_func($callback, $status, $pid);
if (null !== $callback && call_user_func($callback, $status, $pid) === false) {
return false;
}
}
return true;
};

// run the callback for all early exit children no matter what
$waitForMoreToExit = true;
while (!empty($this->earlyExitChildQueue)) {
[$pid, $status] = current($this->earlyExitChildQueue);
$handleChildExit($pid, $status);
$waitForMoreToExit = $waitForMoreToExit && $handleChildExit($pid, $status);
}
while (!empty($this->childProcesses)) {

// only wait for the regular children if desired
while ($waitForMoreToExit && !empty($this->childProcesses)) {
$pid = pcntl_wait($status);
$handleChildExit($pid, $status);
if (!$handleChildExit($pid, $status)) {
return;
}
}
}
}
31 changes: 31 additions & 0 deletions src/ProcessQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

namespace Sweikenb\Library\Pcntl;

use Sweikenb\Library\Pcntl\Api\ChildProcessInterface;
use Sweikenb\Library\Pcntl\Api\ProcessManagerInterface;
use Sweikenb\Library\Pcntl\Api\ProcessQueueInterface;

class ProcessQueue implements ProcessQueueInterface
{
private ProcessManagerInterface $processManager;
private int $maxThreads;
private int $threadCounter = 0;

public function __construct(
int $maxThreads,
?ProcessManagerInterface $processManager = null
) {
$this->processManager = $processManager ?? new ProcessManager();
$this->maxThreads = max(1, $maxThreads);
}

public function addToQueue(callable $callback): ChildProcessInterface
{
while ($this->threadCounter >= $this->maxThreads) {
$this->processManager->wait(fn() => --$this->threadCounter >= $this->maxThreads);
}
$this->threadCounter++;
return $this->processManager->runProcess($callback);
}
}

0 comments on commit 0c25885

Please sign in to comment.