Skip to content

Commit

Permalink
IPC implementation (#1) (#2)
Browse files Browse the repository at this point in the history
* IPC implementation
  • Loading branch information
sweikenb authored Sep 16, 2022
1 parent 43446a9 commit 9da96ed
Show file tree
Hide file tree
Showing 27 changed files with 759 additions and 43 deletions.
66 changes: 65 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ composer require sweikenb/pcntl
You can just create an instance of `\Sweikenb\Library\Pcntl\ProcessManager` and create a process-fork by
calling `runProcess()`.

The manager will handle the rest and makes sure all process will be terminated properly. It will also make shure that
The manager will handle the rest and makes sure all process will be terminated properly. It will also make sure that
the major system signals will be propagated to the child processes as well. In case you want to define your own set of
signals you want to propagate to the children, you can add an array with the signals as second argument to the
constructor.
Expand Down Expand Up @@ -58,6 +58,70 @@ The following events are thrown:
| process.manager.child.exit | A child has exited. |
| process.manager.child.send.kill | A kill signal was sent to a child process. |

## Inter Process Communication

You can send data between the parent and child process using messages.

The data gets send by sockets and can be anything that can be encoded using `serialize()`:

```php
<?php

use Sweikenb\Library\Pcntl\Api\ChildProcessInterface;
use Sweikenb\Library\Pcntl\Api\ParentProcessInterface;
use Sweikenb\Library\Pcntl\ProcessManager;
use Sweikenb\Library\Pcntl\Model\Ipc\MessageModel;

require "./vendor/autoload.php";

$pm = new ProcessManager();

$child = $pm->runProcess(function(ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess){
$message = $parentProcess->getNextMessage(true);
if ($message) {
// Process message here ...
fwrite(
STDOUT,
fprintf('Got a message from the parent process: %s - %s', $message->getTopic(), $message->getPayload())
);
}
$parentProcess->sendMessage(new MessageModel('some_response', 'hello parent'));
});

$child->sendMessage(new MessageModel('some_topic', 'hello child'));

// wait and cleanup
sleep(3);
$child->kill();
```

## Process Pool & Worker Processes

You can also distribute workload across multiple worker to work in parallel. The actual work must be placed inside a
class that is invokable _(`__invoke`)_ and must not have a constructor.

```php
<?php

use ExampleWorkerService;
use Sweikenb\Library\Pcntl\Factory\WorkerMessageFactory;
use Sweikenb\Library\Pcntl\ProcessPool;

require "./vendor/autoload.php";
$messageFactory = new WorkerMessageFactory();

$numberOfWorkers = 4;
$pool = new ProcessPool($numberOfWorkers);

for($i = 0; $i < 100; $i++) {
$pool->sendMessage($messageFactory->create('some_topic', ExampleWorkerService::class));
}

// wait and cleanup
sleep(5);
$pool->killAll();
```

## Example

```php
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"require": {
"php": "^8.1",
"ext-pcntl": "*",
"ext-posix": "*"
"ext-posix": "*",
"ext-sockets": "*"
},
"suggest": {
"symfony/event-dispatcher": "If you want to add events, you can use the symfony event dispatcher."
Expand Down
8 changes: 4 additions & 4 deletions example/000_simple.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,25 @@

$childA = $pm->runProcess(
function (ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess) {
sleep(mt_rand(1, 10));
sleep(mt_rand(1, 3));
echo "Hallo from child A\n";
}
);
$childB = $pm->runProcess(
function (ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess) {
sleep(mt_rand(1, 10));
sleep(mt_rand(1, 3));
echo "Hallo from child B\n";
}
);
$childC = $pm->runProcess(
function (ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess) {
sleep(mt_rand(1, 10));
sleep(mt_rand(1, 3));
echo "Hallo from child C\n";
}
);
$childD = $pm->runProcess(
function (ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess) {
sleep(mt_rand(1, 10));
sleep(mt_rand(1, 3));
echo "Hallo from child D\n";
}
);
Expand Down
75 changes: 75 additions & 0 deletions example/100_simple_ipc.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php

use Sweikenb\Library\Pcntl\Api\ChildProcessInterface;
use Sweikenb\Library\Pcntl\Api\ParentProcessInterface;
use Sweikenb\Library\Pcntl\Factory\MessageFactory;
use Sweikenb\Library\Pcntl\ProcessManager;

require __DIR__ . '/../vendor/autoload.php';

$numWorker = 4;
$numMessages = 25;

$pm = new ProcessManager();
$factory = new MessageFactory();

$workers = [];
/* @var array<int, ChildProcessInterface> $workers */

for ($i = 0; $i < $numWorker; $i++) {
$workers[$i] = $pm->runProcess(
function (ChildProcessInterface $process, ParentProcessInterface $parentProcess) use ($i, $factory) {
fwrite(
STDOUT,
sprintf("> Worker #%d: started and ready to process messages\n", ($i + 1))
);
$count = 0;
while ($message = $parentProcess->getNextMessage()) {
$count++;
fwrite(
STDOUT,
sprintf(
">> Worker #%d: received a message: '%s' '%s' (no. msg.: %d)\n",
($i + 1),
$message->getTopic(),
$message->getPayload(),
$count
)
);
$parentProcess->sendMessage(
$factory->create(
sprintf('Answer from #%d', $process->getId()),
sprintf("msg %d", $count)
)
);
}
}
);
}

for ($i = 0; $i < $numWorker * $numMessages; $i++) {
$workerId = $i % $numWorker;
$message = $factory->create('some message for ' . ($workerId + 1), 'some payload for ' . ($workerId + 1));
$workers[$workerId]->sendMessage($message);
}

foreach ($workers as $i => $worker) {
$count = 0;
while ($count < $numMessages) {
$count++;
$message = $worker->getNextMessage();
fwrite(
STDOUT,
sprintf(
">> Worker #%d answered with message: '%s' '%s'\n",
$worker->getId(),
$message->getTopic(),
$message->getPayload()
)
);
}

// stop the worker process
fwrite(STDOUT, sprintf('# Stopping worker (%d)', $worker->getId()));
$worker->kill();
}
26 changes: 26 additions & 0 deletions example/110_process_pool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

use Sweikenb\Library\Pcntl\Factory\WorkerMessageFactory;
use Sweikenb\Library\Pcntl\ProcessPool;

require __DIR__ . '/../vendor/autoload.php';
require __DIR__ . '/ExampleWorkerService.php';

$numWorker = 4;
$numMessages = 25 * $numWorker;
$factory = new WorkerMessageFactory();

$pool = new ProcessPool($numWorker);
for ($i = 0; $i < $numMessages; $i++) {
$pool->sendMessage(
$factory->create('hello_world', ExampleWorkerService::class)
);
}

// Give the workers some time to work.
// Usually you would send messages in some kid of event/endless-loop and/or with some custom unblock logic.
sleep(3);

// Work done, kill all workers!
// HINT: if you skipp this kill, the main process and its worker will run infinitely
$pool->killAll();
18 changes: 18 additions & 0 deletions example/ExampleWorkerService.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

use Sweikenb\Library\Pcntl\Api\ChildProcessInterface;
use Sweikenb\Library\Pcntl\Api\ParentProcessInterface;

class ExampleWorkerService
{
public function __invoke(ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess): void
{
fwrite(
STDOUT,
sprintf(
"Hello world message handled by worker #%s\n",
$childProcess->getId()
)
);
}
}
5 changes: 2 additions & 3 deletions src/Api/ChildProcessInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@

interface ChildProcessInterface extends ProcessInterface
{
// The library provides different classes for parent and child to make programming more expressive.
// Basically you could create just one model which implements all interfaces and achieve the same results.
}
public function kill(): void;
}
22 changes: 22 additions & 0 deletions src/Api/Ipc/MessageInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace Sweikenb\Library\Pcntl\Api\Ipc;

use Serializable;

interface MessageInterface extends Serializable
{
/**
* Must return a topic that might be used for later message routing.
*
* @return string
*/
public function getTopic(): string;

/**
* Payload of the message that can be anything that is "serializable()".
*
* @return mixed
*/
public function getPayload(): mixed;
}
12 changes: 12 additions & 0 deletions src/Api/Ipc/WorkerMessageInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

namespace Sweikenb\Library\Pcntl\Api\Ipc;

use Sweikenb\Library\Pcntl\Api\ChildProcessInterface;
use Sweikenb\Library\Pcntl\Api\ParentProcessInterface;
use Sweikenb\Library\Pcntl\Api\ProcessPoolInterface;

interface WorkerMessageInterface extends MessageInterface
{
public function execute(ProcessPoolInterface $processPool, ChildProcessInterface $childProcess, ParentProcessInterface $parentProcess): void;
}
10 changes: 10 additions & 0 deletions src/Api/Ipc/WorkerSelectionStrategyInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace Sweikenb\Library\Pcntl\Api\Ipc;

interface WorkerSelectionStrategyInterface
{
public function configure(array $processIds): void;

public function getNextWorkerPid(): int;
}
8 changes: 5 additions & 3 deletions src/Api/ProcessFactoryInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

namespace Sweikenb\Library\Pcntl\Api;

use Socket;

interface ProcessFactoryInterface
{
public function createParentProcess(int $pid): ParentProcessInterface;
public function createParentProcess(int $pid, ?Socket $ipcSocket): ParentProcessInterface;

public function createChildProcess(int $pid): ChildProcessInterface;
}
public function createChildProcess(int $pid, ?Socket $ipcSocket): ChildProcessInterface;
}
13 changes: 12 additions & 1 deletion src/Api/ProcessInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,18 @@

namespace Sweikenb\Library\Pcntl\Api;

use Socket;
use Sweikenb\Library\Pcntl\Api\Ipc\MessageInterface;

interface ProcessInterface
{
public function getId(): int;
}

public function setIpcSocket(?Socket $socket): self;

public function getIpcSocket(): ?Socket;

public function sendMessage(MessageInterface $message): bool;

public function getNextMessage(bool $wait = true): ?MessageInterface;
}
4 changes: 3 additions & 1 deletion src/Api/ProcessManagerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Sweikenb\Library\Pcntl\Api;

use Sweikenb\Library\Pcntl\Exception\ProcessException;

interface ProcessManagerInterface
{
/**
Expand All @@ -12,7 +14,7 @@ public function getMainProcess(): ParentProcessInterface;
/**
* Executes the provided callback within a child process and does not wait for it to finish.
*
* @throws \Sweikenb\Library\Pcntl\Exception\ProcessException
* @throws ProcessException
*/
public function runProcess(callable $callback): ChildProcessInterface;

Expand Down
14 changes: 14 additions & 0 deletions src/Api/ProcessPoolInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Sweikenb\Library\Pcntl\Api;

use Sweikenb\Library\Pcntl\Api\Ipc\WorkerMessageInterface;

interface ProcessPoolInterface
{
public function getInvocationBuilder(): callable;

public function sendMessage(WorkerMessageInterface $workerMessage): bool;

public function killAll(): void;
}
9 changes: 5 additions & 4 deletions src/Event/ProcessManagerEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
class ProcessManagerEvent extends Event
{
public function __construct(
private string $eventName,
private ?int $processId = null
) {}
private readonly string $eventName,
private readonly ?int $processId = null
) {
}

public function getEventName(): string
{
Expand All @@ -20,4 +21,4 @@ public function getProcessId(): ?int
{
return $this->processId;
}
}
}
Loading

0 comments on commit 9da96ed

Please sign in to comment.