-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathStream.php
95 lines (76 loc) · 1.84 KB
/
Stream.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
<?php
namespace RockSSE;
use ProcessWire\RockSSE;
use ProcessWire\WireData;
use function ProcessWire\rocksse;
class Stream extends WireData
{
const done = 'ROCKSSE:STREAM-DONE';
public $iterator;
public $init;
public $loop;
/**
* Sleep after each iteration (in ms)
*/
public float $sleep = 0;
/**
* Reference to RockSSE module
* @var RockSSE
*/
public $sse;
public function __construct(
public string $url,
) {
$this->sse = rocksse();
$this->iterator = $this->sse->newIterator();
$this->loop = function () {};
$this->init = function () {};
}
public function __debugInfo()
{
return [
'url' => $this->url,
'iterator' => $this->iterator,
];
}
public function done(): void
{
$this->send(self::done);
die();
}
/**
* Send SSE message to client
*/
public function send(mixed $msg): void
{
if (!is_string($msg)) $msg = json_encode($msg);
echo "data: $msg\n\n";
echo str_pad('', 8186) . "\n";
flush();
}
public function serve(): void
{
set_time_limit(0);
header("Cache-Control: no-cache");
header("Content-Type: text/event-stream");
// if we have an init callback we call it now
($this->init)($this);
// start endless loop for the stream
$iterator = $this->iterator;
while (true) {
// stop loop when connection is aborted
if (connection_aborted()) break;
// execute the callback and get result
$result = ($this->loop)($this);
// if the callback returned FALSE we break out of the endless loop
// and die() via done() method
if ($result === false) $this->done();
// die() when done
if ($iterator->isDone()) $this->done();
// increment iterator
$iterator->next();
// sleep?
if ($this->sleep > 0) usleep($this->sleep * 1000);
}
}
}