Skip to content

Commit

Permalink
Release v4.0 (#6)
Browse files Browse the repository at this point in the history
* Add php 8.2 support

* Fix bug #4, remove EventDispatcher and general touch-ups (#5)
  • Loading branch information
sweikenb authored Sep 2, 2023
1 parent dbe55fd commit a71c98f
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 139 deletions.
32 changes: 2 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,6 @@ the `wait()` method automatically at the end of the script. If you do not want t
first argument to the constructor to disable the shutdown handler. If you disable this feature, the process manager will
force a child termination even if they aren't finished yet and exits with status-code `125`.

## Event Dispatcher Support

This library supports the event dispatcher component of Symfony. You can inject the event-listener to
the `ProcessManager` to dispatch events during program runtime.

**Please note that events will only be dispatched for the parent/main process.**

```php
$eventManager = new EventDispatcher();
$eventManager->addListener(ProcessManager::EVENT_CHILD_CREATED, function (ProcessManagerEvent $event) {
echo sprintf("Child created: %d\n", $event->getProcessId());
});

$pm = new ProcessManager();
$pm->setEventDispatcher($eventManager);

// ...
```

The following events are thrown:

| Event | Description |
|---------------------------------|--------------------------------------------|
| process.manager.fork.failed | Forking of the process failed. |
| process.manager.child.created | Fork was created successfully. |
| process.manager.child.exit | A child has exited. |
| process.manager.child.send.kill | A kill signal was sent to a child process. |

## Inter Process Communication

You can send data between the parent and child process using messages.
Expand Down Expand Up @@ -119,7 +91,7 @@ for($i = 0; $i < 100; $i++) {

// wait and cleanup
sleep(5);
$pool->killAll();
$pool->closePool();
```

## Example
Expand Down Expand Up @@ -184,4 +156,4 @@ D
--> All Work done!
```

More examples including the `EventDispatcher` can be found in the [example](./example) folder.
More examples can be found in the [example](./example) folder.
6 changes: 3 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
}
],
"require": {
"php": "^8.1 | ^8.2",
"php": ">=8.2",
"ext-pcntl": "*",
"ext-posix": "*",
"ext-sockets": "*"
},
"suggest": {
"symfony/event-dispatcher": "If you want to add events, you can use the symfony event dispatcher."
"conflict": {
"ext-grpc": "*"
},
"autoload": {
"psr-4": {
Expand Down
12 changes: 0 additions & 12 deletions example/000_simple.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,11 @@

use Sweikenb\Library\Pcntl\Api\ChildProcessInterface;
use Sweikenb\Library\Pcntl\Api\ParentProcessInterface;
use Sweikenb\Library\Pcntl\Event\ProcessManagerEvent;
use Sweikenb\Library\Pcntl\ProcessManager;
use Symfony\Component\EventDispatcher\EventDispatcher;

require __DIR__ . '/../vendor/autoload.php';

$pm = new ProcessManager();
if (class_exists(EventDispatcher::class)) {
/*
* Register events in case the Symfony EventDispatcher is available
*/
$eventManager = new EventDispatcher();
$eventManager->addListener(ProcessManager::EVENT_CHILD_CREATED, function (ProcessManagerEvent $event) {
echo sprintf("[EVENT CALLBACK] Child created: %d\n", $event->getProcessId());
});
$pm->setEventDispatcher($eventManager);
}

$childA = $pm->runProcess(
function (ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess) {
Expand Down
2 changes: 1 addition & 1 deletion example/100_simple_ipc.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ function (ChildProcessInterface $process, ParentProcessInterface $parentProcess)
}

// stop the worker process
fwrite(STDOUT, sprintf('# Stopping worker (%d)', $worker->getId()));
fwrite(STDOUT, sprintf("# Stopping worker (%d)\n", $worker->getId()));
$worker->kill();
}
6 changes: 3 additions & 3 deletions example/110_process_pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
// Usually you would send messages in some kid of event/endless-loop and/or with some custom unblock logic.
sleep(3);

// Work done, kill all workers!
// HINT: if you skipp this kill, the main process and its worker will run infinitely
$pool->killAll();
// Work done, kill all workers and disable the pool. No more messages can be processed.
// This will be called automatically if the main process terminated.
$pool->closePool();
2 changes: 1 addition & 1 deletion src/Api/ProcessManagerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public function runProcess(callable $callback): ChildProcessInterface;
*
* Only works in the parent-process.
*/
public function wait(?callable $callback = null): ProcessManagerInterface;
public function wait(?callable $callback = null): void;
}
2 changes: 1 addition & 1 deletion src/Api/ProcessPoolInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ public function getInvocationBuilder(): callable;

public function sendMessage(WorkerMessageInterface $workerMessage): bool;

public function killAll(): void;
public function closePool(): void;
}
24 changes: 0 additions & 24 deletions src/Event/ProcessManagerEvent.php

This file was deleted.

2 changes: 1 addition & 1 deletion src/Model/ChildProcessModel.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ class ChildProcessModel extends AbstractProcessModel implements ChildProcessInte
{
public function kill(): void
{
@posix_kill($this->getId(), SIGKILL);
posix_kill($this->getId(), SIGKILL);
}
}
99 changes: 54 additions & 45 deletions src/ProcessManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,34 @@

namespace Sweikenb\Library\Pcntl;

use Exception;
use Sweikenb\Library\Pcntl\Api\ChildProcessInterface;
use Sweikenb\Library\Pcntl\Api\ParentProcessInterface;
use Sweikenb\Library\Pcntl\Api\ProcessFactoryInterface;
use Sweikenb\Library\Pcntl\Api\ProcessManagerInterface;
use Sweikenb\Library\Pcntl\Event\ProcessManagerEvent;
use Sweikenb\Library\Pcntl\Exception\ProcessException;
use Sweikenb\Library\Pcntl\Factory\ProcessFactory;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Throwable;

class ProcessManager implements ProcessManagerInterface
{
const EVENT_FORK_FAILED = 'process.manager.fork.failed';
const EVENT_CHILD_CREATED = 'process.manager.child.created';
const EVENT_CHILD_EXIT = 'process.manager.child.exit';
const EVENT_CHILD_SEND_KILL = 'process.manager.child.send.kill';

const PROPAGATE_SIGNALS = [
SIGTERM,
SIGHUP,
SIGUSR1,
SIGALRM,
SIGUSR1,
SIGUSR2
];

private ?EventDispatcherInterface $eventDispatcher = null;
private ProcessFactoryInterface $processFactory;
private ParentProcessInterface $mainProcess;

/**
* @var array<int, ChildProcessInterface>
*/
private array $childProcesses = [];
/**
* @var array<int, int>
*/
private array $earlyExitChildQueue = [];
private bool $isChildProcess = false;

public function __construct(
Expand All @@ -51,8 +48,11 @@ public function __construct(
? self::PROPAGATE_SIGNALS
: $propagateSignals;

// register a signale queue for early exit children
pcntl_async_signals(false);
pcntl_signal(SIGCHLD, [$this, "childEarlyExitQueue"]);

// register the signal-handler for each signal that should be handled
pcntl_async_signals(true);
foreach ($propagateSignals as $handleSignal) {
pcntl_signal(
$handleSignal,
Expand All @@ -72,11 +72,13 @@ function () use ($autoWait) {
} else {
if (!empty($this->childProcesses)) {
foreach ($this->childProcesses as $childProcess) {
echo sprintf(
"[PCNTL ProcessManager] Forcing child process exit for pid %s\n",
$childProcess->getId()
fwrite(
STDERR,
sprintf(
"[PCNTL ProcessManager] Forcing child process exit for pid %s\n",
$childProcess->getId()
)
);
$this->dispatchEvent(self::EVENT_CHILD_SEND_KILL, $childProcess->getId());
@posix_kill($childProcess->getId(), SIGKILL);
}
$this->wait();
Expand All @@ -89,16 +91,12 @@ function () use ($autoWait) {
);
}

public function setEventDispatcher(EventDispatcherInterface $eventDispatcher): void
{
$this->eventDispatcher = $eventDispatcher;
}

private function dispatchEvent(string $name, ?int $pid = null): void
public function childEarlyExitQueue(): void
{
if ($this->eventDispatcher) {
$event = new ProcessManagerEvent($name, $pid);
$this->eventDispatcher->dispatch($event, $name);
if (!$this->isChildProcess) {
while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
$this->earlyExitChildQueue[$pid] = [$pid, pcntl_wexitstatus($status)];
}
}
}

Expand All @@ -125,7 +123,6 @@ public function runProcess(callable $callback): ChildProcessInterface

// error
if ($pid < 0) {
$this->dispatchEvent(self::EVENT_FORK_FAILED);
throw new ProcessException('Forking failed.');
}

Expand All @@ -134,8 +131,6 @@ public function runProcess(callable $callback): ChildProcessInterface
@socket_close($ipc[0]);
$childProcess = $this->processFactory->createChildProcess($pid, $ipc[1]);
$this->childProcesses[$pid] = $childProcess;
$this->dispatchEvent(self::EVENT_CHILD_CREATED, $pid);

return $childProcess;
}

Expand All @@ -144,33 +139,47 @@ public function runProcess(callable $callback): ChildProcessInterface
@socket_close($ipc[1]);
$this->childProcesses = [];
$this->isChildProcess = true;
call_user_func(
$callback,
$this->processFactory->createChildProcess(posix_getpid(), null),
$this->getMainProcess()->setIpcSocket($ipc[0])
$success = false !== call_user_func(
$callback,
$this->processFactory->createChildProcess(posix_getpid(), null),
$this->getMainProcess()->setIpcSocket($ipc[0])
);
} catch (Exception | Throwable $e) {
$success = false;
fwrite(
STDERR,
sprintf("[PCNTL ProcessManager] Child process exception: %s\n", $e->getMessage())
);
} finally {
exit(0);
exit($success ? 0 : 1);
}
}

public function wait(?callable $callback = null): ProcessManagerInterface
public function wait(?callable $callback = null): void
{
if (!$this->isChildProcess) {
while (!empty($this->childProcesses)) {
$pid = pcntl_wait($status);
if ($this->isChildProcess) {
return;
}
$handleChildExit = function (int $pid, int $status) use ($callback): void {
if ($pid > 0) {
if (isset($this->childProcesses[$pid])) {
unset($this->childProcesses[$pid]);

// dispatch event and trigger callback if present
$this->dispatchEvent(self::EVENT_CHILD_EXIT, $pid);
if (null !== $callback) {
call_user_func($callback, $status, $pid);
}
}
if (isset($this->earlyExitChildQueue[$pid])) {
unset($this->earlyExitChildQueue[$pid]);
}
if (null !== $callback) {
call_user_func($callback, $status, $pid);
}
}
};
while (!empty($this->earlyExitChildQueue)) {
[$pid, $status] = current($this->earlyExitChildQueue);
$handleChildExit($pid, $status);
}
while (!empty($this->childProcesses)) {
$pid = pcntl_wait($status);
$handleChildExit($pid, $status);
}

return $this;
}
}
Loading

0 comments on commit a71c98f

Please sign in to comment.