Skip to content

Commit

Permalink
Merge pull request #49352 from nextcloud/s3-disable-multipart
Browse files Browse the repository at this point in the history
improve handling of large single-part s3 uploads
  • Loading branch information
juliusknorr authored Dec 6, 2024
2 parents 9bd7304 + 6cf66f9 commit 3328cea
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 21 deletions.
22 changes: 14 additions & 8 deletions apps/dav/lib/Upload/AssemblyStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public function stream_seek($offset, $whence = SEEK_SET) {
$offset = $this->size + $offset;
}

if ($offset === $this->pos) {
return true;
}

if ($offset > $this->size) {
return false;
}
Expand All @@ -95,7 +99,7 @@ public function stream_seek($offset, $whence = SEEK_SET) {

$stream = $this->getStream($this->nodes[$nodeIndex]);
$nodeOffset = $offset - $nodeStart;
if (fseek($stream, $nodeOffset) === -1) {
if ($nodeOffset > 0 && fseek($stream, $nodeOffset) === -1) {
return false;
}
$this->currentNode = $nodeIndex;
Expand Down Expand Up @@ -126,9 +130,14 @@ public function stream_read($count) {
}
}

do {
$collectedData = '';
// read data until we either got all the data requested or there is no more stream left
while ($count > 0 && !is_null($this->currentStream)) {
$data = fread($this->currentStream, $count);
$read = strlen($data);

$count -= $read;
$collectedData .= $data;
$this->currentNodeRead += $read;

if (feof($this->currentStream)) {
Expand All @@ -145,14 +154,11 @@ public function stream_read($count) {
$this->currentStream = null;
}
}
// if no data read, try again with the next node because
// returning empty data can make the caller think there is no more
// data left to read
} while ($read === 0 && !is_null($this->currentStream));
}

// update position
$this->pos += $read;
return $data;
$this->pos += strlen($collectedData);
return $collectedData;
}

/**
Expand Down
22 changes: 19 additions & 3 deletions apps/dav/tests/unit/Upload/AssemblyStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ public function testGetContents($expected, $nodes): void {
/**
* @dataProvider providesNodes()
*/
public function testGetContentsFread($expected, $nodes): void {
public function testGetContentsFread($expected, $nodes, $chunkLength = 3): void {
$stream = AssemblyStream::wrap($nodes);

$content = '';
while (!feof($stream)) {
$content .= fread($stream, 3);
$chunk = fread($stream, $chunkLength);
$content .= $chunk;
if ($chunkLength !== 3) {
$this->assertEquals($chunkLength, strlen($chunk));
}
}

$this->assertEquals($expected, $content);
Expand Down Expand Up @@ -103,7 +107,19 @@ public function providesNodes() {
]],
'a ton of nodes' => [
$tonofdata, $tonofnodes
]
],
'one read over multiple nodes' => [
'1234567890', [
$this->buildNode('0', '1234'),
$this->buildNode('1', '5678'),
$this->buildNode('2', '90'),
], 10],
'two reads over multiple nodes' => [
'1234567890', [
$this->buildNode('0', '1234'),
$this->buildNode('1', '5678'),
$this->buildNode('2', '90'),
], 5],
];
}

Expand Down
8 changes: 8 additions & 0 deletions lib/private/Files/ObjectStore/ObjectStoreStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,14 @@ public function file_put_contents(string $path, mixed $data): int {
}

public function writeStream(string $path, $stream, ?int $size = null): int {
if ($size === null) {
$stats = fstat($stream);
if (is_array($stats) && isset($stats['size'])) {
$size = $stats['size'];
$this->logger->warning("stream size $size");
}
}

$stat = $this->stat($path);
if (empty($stat)) {
// create new file
Expand Down
33 changes: 23 additions & 10 deletions lib/private/Files/ObjectStore/S3ObjectTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,33 @@ protected function writeMultiPart(string $urn, StreamInterface $stream, ?string
* @since 7.0.0
*/
public function writeObject($urn, $stream, ?string $mimetype = null) {
$canSeek = fseek($stream, 0, SEEK_CUR) === 0;
$psrStream = Utils::streamFor($stream);

// ($psrStream->isSeekable() && $psrStream->getSize() !== null) evaluates to true for a On-Seekable stream
// so the optimisation does not apply
$buffer = new Psr7\Stream(fopen('php://memory', 'rwb+'));
Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit);
$buffer->seek(0);
if ($buffer->getSize() < $this->putSizeLimit) {
// buffer is fully seekable, so use it directly for the small upload
$this->writeSingle($urn, $buffer, $mimetype);

$size = $psrStream->getSize();
if ($size === null || !$canSeek) {
// The s3 single-part upload requires the size to be known for the stream.
// So for input streams that don't have a known size, we need to copy (part of)
// the input into a temporary stream so the size can be determined
$buffer = new Psr7\Stream(fopen('php://temp', 'rw+'));
Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit);
$buffer->seek(0);
if ($buffer->getSize() < $this->putSizeLimit) {
// buffer is fully seekable, so use it directly for the small upload
$this->writeSingle($urn, $buffer, $mimetype);
} else {
$loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
$this->writeMultiPart($urn, $loadStream, $mimetype);
}
} else {
$loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
$this->writeMultiPart($urn, $loadStream, $mimetype);
if ($size < $this->putSizeLimit) {
$this->writeSingle($urn, $psrStream, $mimetype);
} else {
$this->writeMultiPart($urn, $psrStream, $mimetype);
}
}
$psrStream->close();
}

/**
Expand Down

0 comments on commit 3328cea

Please sign in to comment.