Skip to content

Commit

Permalink
Only create emitter for split messages
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Oct 26, 2021
1 parent 8162d71 commit d0e61e3
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions src/Rfc6455Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Websocket;

use Amp\ByteStream\InMemoryStream;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\IteratorStream;
use Amp\ByteStream\StreamException;
Expand Down Expand Up @@ -54,9 +55,6 @@ final class Rfc6455Client implements Client
/** @var Promise|null */
private $lastEmit;

/** @var string */
private $emitBuffer = '';

/** @var bool */
private $masked;

Expand Down Expand Up @@ -320,8 +318,14 @@ private function onData(int $opcode, string $data, bool $terminated): void
return;
}

$this->currentMessageEmitter = new Emitter;
$stream = new IteratorStream($this->currentMessageEmitter->iterate());
if ($terminated) {
$stream = new InMemoryStream($data);
++$this->metadata->messagesRead;
} else {
$this->currentMessageEmitter = new Emitter;
$stream = new IteratorStream($this->currentMessageEmitter->iterate());
}

if ($opcode === Opcode::BIN) {
$message = Message::fromBinary($stream);
} else {
Expand All @@ -335,6 +339,10 @@ private function onData(int $opcode, string $data, bool $terminated): void
} else {
$this->messages[] = $message;
}

if ($terminated) {
return;
}
} elseif ($opcode !== Opcode::CONT) {
$this->onError(
Code::PROTOCOL_ERROR,
Expand All @@ -343,13 +351,8 @@ private function onData(int $opcode, string $data, bool $terminated): void
return;
}

$this->emitBuffer .= $data;

if ($terminated || \strlen($this->emitBuffer) >= $this->options->getStreamThreshold()) {
$promise = $this->currentMessageEmitter->emit($this->emitBuffer);
$this->lastEmit = $this->nextMessageDeferred ? null : $promise;
$this->emitBuffer = '';
}
$promise = $this->currentMessageEmitter->emit($data);
$this->lastEmit = $this->nextMessageDeferred ? null : $promise;

if ($terminated) {
$emitter = $this->currentMessageEmitter;
Expand Down

0 comments on commit d0e61e3

Please sign in to comment.