Skip to content

Commit

Permalink
Merge pull request #7 from utopia-php/feat-di-in-worker-start
Browse files Browse the repository at this point in the history
injections in worker start
  • Loading branch information
eldadfux authored Oct 31, 2022
2 parents 42b132c + 3fe9866 commit 0cad4cf
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 31 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ $server
});

$server
->workerStart(function () {
->workerStart()
->action(function () {
echo "Worker Started" . PHP_EOL;
})
->start();
});

$server->start();


// Enqueue messages to the worker using the Redis adapter
Expand Down
45 changes: 23 additions & 22 deletions src/Queue/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ class Server
*/
protected array $shutdownHooks = [];

/**
* Hook that is called when worker starts
*
* @var Hook
*/
protected Hook $workerStartHook;

/**
* @var array
*/
Expand Down Expand Up @@ -178,28 +185,10 @@ public function init(): Hook
public function start(): self
{
try {
$this->adapter->start();
} catch (Throwable $error) {
self::setResource('error', fn () => $error);
foreach ($this->errorHooks as $hook) {
call_user_func_array($hook->getAction(), $this->getArguments($hook));
}
}
return $this;
}

/**
* Is called when a Worker starts.
* @param callable $callback
* @return self
*/
public function workerStart(callable $callback = null): self
{
try {
$this->adapter->workerStart(function (string $workerId) use ($callback) {
$this->adapter->workerStart(function (string $workerId) {
Console::success("[Worker] Worker {$workerId} is ready!");
if (!is_null($callback)) {
call_user_func($callback);
if (!is_null($this->workerStartHook)) {
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
}
while (true) {
/**
Expand Down Expand Up @@ -311,16 +300,28 @@ public function workerStart(callable $callback = null): self
}
}
});

$this->adapter->start();
} catch (Throwable $error) {
self::setResource('error', fn () => $error);
foreach ($this->errorHooks as $hook) {
call_user_func_array($hook->getAction(), $this->getArguments($hook));
}
}

return $this;
}

/**
* Is called when a Worker starts.
* @return Hook
*/
public function workerStart(): Hook
{
$hook = new Hook();
$this->workerStartHook = $hook;
return $hook;
}

/**
* Is called when a Worker stops.
* @param callable $callback
Expand Down
8 changes: 5 additions & 3 deletions tests/Queue/servers/Swoole/worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
});

$server
->workerStart(function () {
->workerStart()
->action(function () {
echo "Worker Started" . PHP_EOL;
})
->start();
});

$server->start();
8 changes: 5 additions & 3 deletions tests/Queue/servers/Workerman/worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
});

$server
->workerStart(function () {
->workerStart()
->action(function () {
echo "Worker Started" . PHP_EOL;
})
->start();
});

$server->start();

0 comments on commit 0cad4cf

Please sign in to comment.