Skip to content

Commit

Permalink
- code cleanup and refactor
Browse files Browse the repository at this point in the history
 - moved to php 7.1
 - libs update
  • Loading branch information
krowinski committed Dec 18, 2019
1 parent 1fe41a2 commit 42e0266
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 162 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ I serialise callable function and sent to child process by exec. To get callback
### Example ?
Sure take a look - https://github.com/krowinski/async/blob/master/example/example.php

### User cases (mostly some code on website that user don't need to wait for)
- send callback
- publish to queue amqp
- send external analytic data
- remove files
- process payments

### Supports M$ Windows?
NO.

Expand Down
6 changes: 4 additions & 2 deletions bin/console
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env php
<?php

declare(strict_types=1);

namespace bin;

array_map(
function ($autoloadFile) {
static function ($autoloadFile) {
if (is_file($autoloadFile)) {
/** @noinspection PhpIncludeInspection */
include $autoloadFile;
}
},
Expand Down
11 changes: 6 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
"name": "krowinski/async",
"description": "Run php closure asynchronously",
"keywords": [
"php", "async"
"php",
"async"
],
"type": "library",
"require": {
"php": ">=5.6",
"php": ">=7.1",
"ext-pcntl": "*",
"symfony/process": "^2.7|^3.3|^4.0",
"symfony/console": "^2.7|^3.3|^4.0",
"jeremeamia/superclosure": "^2.3"
"symfony/process": "^4.0|^5.0",
"symfony/console": "^4.0|^5.0",
"jeremeamia/superclosure": "^2.4"
},
"license": "MIT",
"authors": [
Expand Down
33 changes: 18 additions & 15 deletions example/example.php
Original file line number Diff line number Diff line change
@@ -1,54 +1,56 @@
<?php

/** @noinspection PhpComposerExtensionStubsInspection */
declare(strict_types=1);

namespace bin;


include __DIR__ . '/../vendor/autoload.php';

use Async\AsyncCall;
use Exception;
use RuntimeException;

$s = microtime(true);

AsyncCall::run(
function () {
static function () {
sleep(2);
// there is no callback so echo will not be showed and parent process will not w8
echo 'sleep 2s' . PHP_EOL;
}
);

AsyncCall::run(
function () {
static function () {
sleep(4);
// echo will be catched in child process and returned as second parameter $stdOut
echo 'sleep 4s' . PHP_EOL;
},
function ($results, $stdOut) {
static function ($results, $stdOut) {
echo $stdOut;
}
);

AsyncCall::run(
function () {
throw new \RuntimeException('bar');
static function () {
throw new RuntimeException('bar');
}
);

AsyncCall::run(
function () {
throw new \RuntimeException('foo');
static function () {
throw new RuntimeException('foo');
},
function () {
static function () {
},
function (\Exception $error) {
static function (Exception $error) {
// we will get error
assert($error->getMessage() === 'foo');
}
);

AsyncCall::run(
function () {
static function () {
// if this is in parent, child will not see this
function getPage($url)
{
Expand All @@ -64,7 +66,7 @@ function getPage($url)
// this will be returned to callback as first parameter
return getPage('example.com');
},
function ($results) {
static function ($results) {
echo $results;
}
);
Expand All @@ -77,10 +79,10 @@ function ($results) {
while ($i--) {
// this will start 2 process and then wait them to finish before starting second one
AsyncCall::run(
function () {
static function () {
sleep(1);
},
function () {
static function () {
}
);
}
Expand All @@ -89,5 +91,6 @@ function () {


echo PHP_EOL;
echo 'Script ended: ';
echo microtime(true) - $s;
echo PHP_EOL;
73 changes: 29 additions & 44 deletions src/Async/AsyncCall.php
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
<?php

declare(strict_types=1);

namespace Async;

use SuperClosure\Serializer;
use Symfony\Component\Console\Exception\InvalidArgumentException;

/**
* Class AsyncCall
* @package Async
*/
class AsyncCall
{
const CONSOLE_EXECUTE = 'php ' . __DIR__ . '/../../bin/console app:run-child-process ';
private const CONSOLE_LOCATION = __DIR__ . '/../../bin/console';

/**
* @var bool
Expand All @@ -30,87 +26,76 @@ class AsyncCall
* @var int
*/
private static $processesLimit = 0;
private static $processAmount = 0;

/**
* @param $processesLimit
* @throws \Symfony\Component\Console\Exception\InvalidArgumentException
*/
public static function setProcessLimit($processesLimit)
public static function setProcessLimit(int $processesLimit): void
{
if ($processesLimit < 0) {
throw new InvalidArgumentException('Processes limit Must be possitive itiger');
throw new InvalidArgumentException('Processes limit Must be positive integer');
}
self::$processesLimit = (int)$processesLimit;
self::$processesLimit = $processesLimit;
}

/**
* @param callable $job
* @param callable $callback
* @param callable $onError
* @param float $timeout
* @param float $idleTimeout
* @throws \Symfony\Component\Process\Exception\InvalidArgumentException
* @throws \Symfony\Component\Process\Exception\RuntimeException
* @throws \Symfony\Component\Process\Exception\LogicException
* @throws \RuntimeException
*/
public static function run(
callable $job,
callable $callback = null,
callable $onError = null,
$timeout = null,
$idleTimeout = null
) {
float $timeout = null,
float $idleTimeout = null
): void {
self::registerShutdownFunction();

if (!self::$serializer) {
self::$serializer = new Serializer();
}

// we got process limit so wait for them to finish
if (0 !== self::$processesLimit && self::$processesLimit >= count(self::$processList)) {
if (0 !== self::$processesLimit && self::$processesLimit >= self::$processAmount) {
self::waitForProcessesToFinish(self::$processesLimit);
}

$process = new AsyncProcess(self::CONSOLE_EXECUTE . base64_encode(self::$serializer->serialize($job)));
$process = new AsyncProcess(
[
self::CONSOLE_LOCATION,
AsyncChildCommand::COMMAND_NAME,
base64_encode(self::$serializer->serialize($job))
]
);
$process->setTimeout($timeout);
$process->setIdleTimeout($idleTimeout);
$process->startJob($callback, $onError);

//echo $process->getCommandLine() . PHP_EOL;
self::$processList[] = $process;
self::$processAmount++;
}

private static function registerShutdownFunction()
private static function registerShutdownFunction(): void
{
if (!self::$shutdownFunctionRegistered) {
register_shutdown_function(
function () {
static function () {
self::waitForProcessesToFinish();
}
);
self::$shutdownFunctionRegistered = true;
}
}

/**
* @param int $maxProcessToWait
*/
private static function waitForProcessesToFinish($maxProcessToWait = 0)
private static function waitForProcessesToFinish(int $maxProcessToWait = 0): void
{
while (true) {
$processAmount = count(self::$processList);

if (0 === $processAmount) {
break;
}
if ($maxProcessToWait > $processAmount) {
for (; ;) {
if (0 === self::$processAmount || $maxProcessToWait > self::$processAmount) {
break;
}

foreach (self::$processList as $i => $process) {
if ($process->getStatus() === AsyncProcess::STATUS_TERMINATED || (!$process->hasCallbackSet() && !$process->hasOnErrorSet())) {
if (
$process->getStatus() === AsyncProcess::STATUS_TERMINATED ||
(!$process->hasCallbackSet() && !$process->hasOnErrorSet())
) {
unset(self::$processList[$i]);
self::$processAmount--;

continue;
}
}
Expand Down
52 changes: 15 additions & 37 deletions src/Async/AsyncChildCommand.php
Original file line number Diff line number Diff line change
@@ -1,59 +1,29 @@
<?php

declare(strict_types=1);

namespace Async;


use Exception;
use SuperClosure\Serializer;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

/**
* Class AsyncChildCommand
* @package Async
*/
class AsyncChildCommand extends Command
{
const PARAM_NAME_JOB = 'job';

/**
* @var Serializer
*/
public const COMMAND_NAME = 'app:run-child-process';
private const PARAM_NAME_JOB = 'job';
private $serializer;

/**
* AsyncChildCommand constructor.
* @param null $name
* @throws \Symfony\Component\Console\Exception\LogicException
*/
public function __construct($name = null)
public function __construct(?string $name = null)
{
parent::__construct($name);

$this->serializer = new Serializer();
}

/**
* @throws \Symfony\Component\Console\Exception\InvalidArgumentException
*/
protected function configure()
{
$this
->setName('app:run-child-process')
->setDescription('Runs a child process.')
->addArgument(self::PARAM_NAME_JOB, InputArgument::REQUIRED, 'Serialized callback job param.');
}

/**
* @param InputInterface $input
* @param OutputInterface $output
* @return int
* @throws \Symfony\Component\Console\Exception\InvalidArgumentException
* @throws \SuperClosure\Exception\ClosureUnserializationException
*/
public function execute(InputInterface $input, OutputInterface $output)
public function execute(InputInterface $input, OutputInterface $output): int
{
try {
$job = $this->serializer->unserialize(base64_decode($input->getArgument(self::PARAM_NAME_JOB)));
Expand All @@ -62,7 +32,7 @@ public function execute(InputInterface $input, OutputInterface $output)
$jobResults = $job();
$ob = ob_get_clean();
$error = null;
} catch (\Exception $exception) {
} catch (Exception $exception) {
$jobResults = null;
$ob = null;
$error = $exception;
Expand All @@ -72,4 +42,12 @@ public function execute(InputInterface $input, OutputInterface $output)

return 0;
}

protected function configure(): void
{
$this
->setName(self::COMMAND_NAME)
->setDescription('Runs a child process.')
->addArgument(self::PARAM_NAME_JOB, InputArgument::REQUIRED, 'Serialized callback job param.');
}
}
Loading

0 comments on commit 42e0266

Please sign in to comment.