Skip to content

Commit

Permalink
feat: save progress on initial implementation. Almost done.
Browse files Browse the repository at this point in the history
  • Loading branch information
lots0logs committed Jan 19, 2022
1 parent 400ed62 commit 7d87da3
Show file tree
Hide file tree
Showing 6 changed files with 913 additions and 79 deletions.
766 changes: 766 additions & 0 deletions .editorconfig

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
.idea
*.iml
node_modules
vendor
.directory
*.bak
*.gho
*.ori
*.orig
*.tmp
.*.kate-swp
.swp.*

# all files that end in ~ since those are typically temp files
*~

# temporary files which can be created if a process still has a handle open of a deleted file
.fuse_hidden*

# KDE directory preferences
.directory

# Linux trash folder which might appear on any partition or disk
.Trash-*

# .nfs files are created when an open file is removed but is still being accessed
.nfs*

.vscode/*
*.code-workspace

# Local History for Visual Studio Code
.history/

# General
.DS_Store
.AppleDouble
.LSOverride
34 changes: 34 additions & 0 deletions src/Base.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

namespace ET\Hustle;

use Redis, RedisCluster;


abstract class Base {

/**
* @var Redis|RedisCluster
*/
protected static object $_DB;

protected static function _key( string ...$args ): string {
$key = implode( ':', $args );

return "hustle:{$key}";
}

protected static function _uuid4(): string {
return sprintf(
'%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff ),
mt_rand( 0, 0x0fff ) | 0x4000,
mt_rand( 0, 0x3fff ) | 0x8000,
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff )
);
}
}
14 changes: 4 additions & 10 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@

// External Dependencies
use Redis, RedisCluster;
use function et_;


class Client {

/**
* @var Redis|RedisCluster
*/
protected object $_db;
class Client extends Base {

/**
* @var Queue[]
Expand All @@ -30,7 +24,7 @@ protected function __construct( string $name = 'default', array $redis_nodes = [

if ( count( $redis_nodes ) > 1 || is_string( reset( $redis_nodes ) ) ) {
// Redis Cluster
$this->_db = new RedisCluster( null, $redis_nodes );
self::$_DB = new RedisCluster( null, $redis_nodes );
} else {
// Redis Standalone
throw new \ErrorException( 'Support for using redis in standalone mode is not implemented.' );
Expand All @@ -41,11 +35,11 @@ protected function __construct( string $name = 'default', array $redis_nodes = [
* @return Redis|RedisCluster
*/
public function DB(): object {
return $this->_db;
return self::$_DB;
}

public function QUEUE( string $name = 'default' ): Queue {
$this->_queues[ $name ] ??= new Queue( $name, $this );
$this->_queues[ $name ] ??= new Queue( $name );

return $this->_queues[ $name ];
}
Expand Down
67 changes: 40 additions & 27 deletions src/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,57 @@
namespace ET\Hustle;

// Built-in Dependencies
use JsonSerializable;
use JsonSerializable, ErrorException;


class Job implements JsonSerializable {
class Job extends Base implements JsonSerializable {

protected Queue $_queue;
public array $callbacks = [
'run' => null,
'done' => null,
'error' => null,
];

public array $data;

public string $id;

/**
* @var callable
*/
public string $class;
public string $status;

public array $data;

public function __construct( Queue $queue, string $class, array $data ) {
$this->_queue = $queue;
$this->class = $class;
$this->data = $data;
$this->id = $this->_uuid4();
public function __construct( string $id, array $details = [] ) {
$this->callbacks = $details['callbacks'];
$this->data = $details['data'];
$this->id = $details['id'];
$this->status = $details['status'];
}

protected function _uuid4(): string {
return sprintf(
'%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff ),
mt_rand( 0, 0x0fff ) | 0x4000,
mt_rand( 0, 0x3fff ) | 0x8000,
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff )
);
public static function instance( string $queue, ?string $id, ?array $details = null ): self {
if ( $id ) {
// Get existing job details from database
$json = self::$_DB->get( self::_key( 'queues', $queue, 'jobs', $id ) );
$details = json_decode( $json, true );

} else if ( $details ) {
// Create new job in database
$id = $details['id'] = self::_uuid4();

$details['status'] = 'pending';

self::$_DB->set( self::_key( 'queues', $queue, 'jobs', $id ), json_encode( $details ) );

} else {
throw new ErrorException( 'At least one of $id, $details is required.' );
}

return new self( $id, $details );
}

public function jsonSerialize() {
// TODO: Implement jsonSerialize() method.
return [
'id' => $this->id,
'callbacks' => $this->callbacks,
'data' => $this->data,
'status' => $this->status,
];
}
}
73 changes: 31 additions & 42 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,75 +6,64 @@
use Redis, RedisCluster;


class Queue {

protected Client $_client;
class Queue extends Base {

public string $name;

public function __construct( Client $client, string $name = 'default' ) {
$this->_client = $client;
public function __construct( string $name = 'default' ) {
$this->name = $name;
}

protected function __completed(): string {
return $this->_key( 'completed' );
return $this->__key( 'completed' );
}

protected function __failed(): string {
return $this->_key( 'failed' );
return $this->__key( 'failed' );
}

protected function __pending(): string {
return $this->_key( 'pending' );
return $this->__key( 'pending' );
}

protected function __running(): string {
return $this->_key( 'running' );
return $this->__key( 'running' );
}

protected function _key( string ...$args ): string {
$key = implode( ':', $args );

return "hustle:queue:{$this->name}:{$key}";
protected function __key( string ...$args ): string {
return self::_key( 'queues', $this->name, ...$args );
}

protected function _uuid4(): string {
return sprintf(
'%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff ),
mt_rand( 0, 0x0fff ) | 0x4000,
mt_rand( 0, 0x3fff ) | 0x8000,
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff ),
mt_rand( 0, 0xffff )
);
public function JOB( string $job_id ): Job {
return Job::instance( $this->name, $job_id );
}

public function job( string $jid ): array {
$key = $this->_key( 'job', $jid );
}

public function put( callable $job, array $data ): string {
$jid = $this->_uuid4();
$key = $this->_key( 'job', $jid );

$data = [
'jid' => $jid,
'class' => $job,
'data' => $data,
public function put( callable $run, array $data ): string {
$details = [
'data' => $data,
'callbacks' => [
'run' => $run,
'done' => __CLASS__ . '::done',
'error' => __CLASS__ . '::error',
],
];

$this->_client->DB()->set( $key, json_encode( $data ) );
$this->_client->DB()->rpush( $this->__pending(), $jid );
$job = Job::instance( $this->name, null, $details );

return $jid;
self::$_DB->lpush( $this->__pending(), $job->id );

return $job->id;
}

public function take(): string {

public function take(): Job {
$job_id = self::$_DB->brpoplpush( $this->__pending(), $this->__running(), 600 );
$job = Job::instance( $this->name, $job_id );

$job->status = 'running';

self::$_DB->set( $this->__key( 'jobs', $job_id ), json_encode( $job ) );

return $job;
}

}

0 comments on commit 7d87da3

Please sign in to comment.