diff --git a/src/Rfc6455Client.php b/src/Rfc6455Client.php index cff3a20..3fa2711 100644 --- a/src/Rfc6455Client.php +++ b/src/Rfc6455Client.php @@ -2,6 +2,7 @@ namespace Amp\Websocket; +use Amp\ByteStream\InMemoryStream; use Amp\ByteStream\InputStream; use Amp\ByteStream\IteratorStream; use Amp\ByteStream\StreamException; @@ -54,9 +55,6 @@ final class Rfc6455Client implements Client /** @var Promise|null */ private $lastEmit; - /** @var string */ - private $emitBuffer = ''; - /** @var bool */ private $masked; @@ -320,8 +318,14 @@ private function onData(int $opcode, string $data, bool $terminated): void return; } - $this->currentMessageEmitter = new Emitter; - $stream = new IteratorStream($this->currentMessageEmitter->iterate()); + if ($terminated) { + $stream = new InMemoryStream($data); + ++$this->metadata->messagesRead; + } else { + $this->currentMessageEmitter = new Emitter; + $stream = new IteratorStream($this->currentMessageEmitter->iterate()); + } + if ($opcode === Opcode::BIN) { $message = Message::fromBinary($stream); } else { @@ -335,6 +339,10 @@ private function onData(int $opcode, string $data, bool $terminated): void } else { $this->messages[] = $message; } + + if ($terminated) { + return; + } } elseif ($opcode !== Opcode::CONT) { $this->onError( Code::PROTOCOL_ERROR, @@ -343,13 +351,8 @@ private function onData(int $opcode, string $data, bool $terminated): void return; } - $this->emitBuffer .= $data; - - if ($terminated || \strlen($this->emitBuffer) >= $this->options->getStreamThreshold()) { - $promise = $this->currentMessageEmitter->emit($this->emitBuffer); - $this->lastEmit = $this->nextMessageDeferred ? null : $promise; - $this->emitBuffer = ''; - } + $promise = $this->currentMessageEmitter->emit($data); + $this->lastEmit = $this->nextMessageDeferred ? null : $promise; if ($terminated) { $emitter = $this->currentMessageEmitter;