Skip to content

Commit

Permalink
Made chunk reading explicit when using read or pread
Browse files Browse the repository at this point in the history
  • Loading branch information
rpecka committed Jul 9, 2024
1 parent e947421 commit 7baadcc
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 15 deletions.
19 changes: 6 additions & 13 deletions Sources/NIOFileSystem/FileChunks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import NIOPosix
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct FileChunks: AsyncSequence {
enum ChunkRange {
case entireFile
case partial(Range<Int64>)
case filePointerToEnd
case range(Range<Int64>)
}

public typealias Element = ByteBuffer
Expand All @@ -39,22 +39,15 @@ public struct FileChunks: AsyncSequence {
internal init(
handle: SystemFileHandle,
chunkLength: ByteCount,
range: Range<Int64>
range: ChunkRange
) {
let chunkRange: ChunkRange
if range.lowerBound == 0, range.upperBound == .max {
chunkRange = .entireFile
} else {
chunkRange = .partial(range)
}

// TODO: choose reasonable watermarks; this should likely be at least somewhat dependent
// on the chunk size.
let stream = BufferedStream.makeFileChunksStream(
of: ByteBuffer.self,
handle: handle,
chunkLength: chunkLength.bytes,
range: chunkRange,
range: range,
lowWatermark: 4,
highWatermark: 8
)
Expand Down Expand Up @@ -96,9 +89,9 @@ extension BufferedStream where Element == ByteBuffer {
) -> BufferedStream<ByteBuffer> {
let state: ProducerState
switch range {
case .entireFile:
case .filePointerToEnd:
state = ProducerState(handle: handle, range: nil)
case .partial(let partialRange):
case .range(let partialRange):
state = ProducerState(handle: handle, range: partialRange)
}
let protectedState = NIOLockedValueBox(state)
Expand Down
8 changes: 8 additions & 0 deletions Sources/NIOFileSystem/FileHandle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ public struct ReadFileHandle: ReadableFileHandleProtocol, _HasFileHandle {
self.fileHandle.systemFileHandle.readChunks(in: range, chunkLength: chunkLength)
}

public func readChunksFromFilePointer(chunkLength size: ByteCount) -> FileChunks {
self.fileHandle.systemFileHandle.readChunksFromFilePointer(chunkLength: size)
}

public func setTimes(
lastAccess: FileInfo.Timespec?,
lastDataModification: FileInfo.Timespec?
Expand Down Expand Up @@ -269,6 +273,10 @@ public struct ReadWriteFileHandle: ReadableAndWritableFileHandleProtocol, _HasFi
self.fileHandle.systemFileHandle.readChunks(in: offset, chunkLength: chunkLength)
}

public func readChunksFromFilePointer(chunkLength size: ByteCount) -> FileChunks {
self.fileHandle.systemFileHandle.readChunksFromFilePointer(chunkLength: size)
}

@discardableResult
public func write(
contentsOf bytes: some (Sequence<UInt8> & Sendable),
Expand Down
9 changes: 8 additions & 1 deletion Sources/NIOFileSystem/FileHandleProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ public protocol ReadableFileHandleProtocol: FileHandleProtocol {
/// - chunkLength: The maximum length of the chunk to read as a ``ByteCount``.
/// - Returns: A sequence of chunks read from the file.
func readChunks(in range: Range<Int64>, chunkLength: ByteCount) -> FileChunks

/// Returns an asynchronous sequence of chunks read from the file starting from the current file pointer.
///
/// - Parameters:
/// - size: The maximum length of the chunk to read as a ``ByteCount``.
/// - Returns: A sequence of chunks read from the file.
func readChunksFromFilePointer(chunkLength size: ByteCount) -> FileChunks
}

// MARK: - Read chunks with default chunk length
Expand Down Expand Up @@ -415,7 +422,7 @@ extension ReadableFileHandleProtocol {
var accumulator = ByteBuffer()
accumulator.reserveCapacity(readSize)

for try await chunk in self.readChunks(in: ..., chunkLength: .mebibytes(8)) {
for try await chunk in self.readChunksFromFilePointer(chunkLength: .mebibytes(8)) {
accumulator.writeImmutableBuffer(chunk)
if accumulator.readableBytes > maximumSizeAllowed.bytes {
throw FileSystemError(
Expand Down
8 changes: 7 additions & 1 deletion Sources/NIOFileSystem/Internal/SystemFileHandle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,13 @@ extension SystemFileHandle: ReadableFileHandleProtocol {
in range: Range<Int64>,
chunkLength size: ByteCount
) -> FileChunks {
return FileChunks(handle: self, chunkLength: size, range: range)
return FileChunks(handle: self, chunkLength: size, range: .range(range))
}

public func readChunksFromFilePointer(
chunkLength size: ByteCount
) -> FileChunks {
return FileChunks(handle: self, chunkLength: size, range: .filePointerToEnd)
}
}

Expand Down
26 changes: 26 additions & 0 deletions Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,32 @@ final class FileHandleTests: XCTestCase {
}
}

func testUnboundedRangeAfterRead() async throws {
// Reading chunks from an UnboundedRange after the file position has been moved to non-zero.
try await self.withHandle(forFileAtPath: Self.thisFile) { handle in
// trigger an initial read of the entire file to attempt to move the file pointer
var firstRead = ByteBuffer()
for try await chunk in handle.readChunks(in: ..., chunkLength: .bytes(128)) {
XCTAssertLessThanOrEqual(chunk.readableBytes, 128)
firstRead.writeImmutableBuffer(chunk)
}
var secondRead = ByteBuffer()
for try await chunk in handle.readChunks(in: ..., chunkLength: .bytes(128)) {
XCTAssertLessThanOrEqual(chunk.readableBytes, 128)
secondRead.writeImmutableBuffer(chunk)
}
// We should read bytes until EOF.
XCTAssertEqual(
secondRead.readableBytes,
firstRead.readableBytes,
"""
Read \(secondRead.readableBytes) which were different to the \(firstRead.readableBytes) \
expected bytes.
"""
)
}
}

func testReadPartialRange() async throws {
// Reading chunks of bytes from a PartialRangeThrough with the upper bound inside the file.
try await self.withHandle(forFileAtPath: Self.thisFile) { handle in
Expand Down

0 comments on commit 7baadcc

Please sign in to comment.