From f08f07785b1932e1cf6169e604eb3f8e2ac06837 Mon Sep 17 00:00:00 2001 From: Ludwig DUBOS Date: Tue, 1 Oct 2024 05:33:45 +0200 Subject: [PATCH] Replace `AsyncSeek` trait by `AsyncSeekForward` for `Reader` to address #12880 (#14194) # Objective The primary motivation behind this PR is to (partially?) address the limitations imposed by the recently added `AsyncSeek` trait bound discussed in issue #12880. While the `AsyncSeek` trait add some flexibility to the reader, it inadvertently restricts the ability to write asset readers that can truly stream bytes, particularly in scenarios like HTTP requests where backward seeking is not supported. It is also challenging in contexts where assets are stored in compressed formats or require other kinds of transformations. The logic behind this change is that currently, with `AsyncSeek`, an asset Reader based on streamed data will either 1) fail silently, 2) return an error, or 3) use a buffer to satisfy the trait constraint. I believe that being able to advance in the file without having to "read" it is a good thing. The only issue here is the ability to seek backward. It is highly likely that in this context, we only need to seek forward in the file because we would have already read an entry table upstream and just want to access one or more resources further in the file. I understand that in some cases, this may not be applicable, but I think it is more beneficial not to constrain `Reader`s that want to stream than to allow "Assets" to read files in a completely arbitrary order. ## Solution Replace the current `AsyncSeek` trait with `AsyncSeekForward` on asset `Reader` ## Changelog - Introduced a new custom trait, `AsyncSeekForward`, for the asset Reader. - Replaced the current `AsyncSeek` trait with `AsyncSeekForward` for all asset `Reader` implementations. ## Migration Guide Replace all instances of `AsyncSeek` with `AsyncSeekForward` in your asset reader implementations. --- crates/bevy_asset/src/io/file/file_asset.rs | 25 +++- .../bevy_asset/src/io/file/sync_file_asset.rs | 16 +-- crates/bevy_asset/src/io/memory.rs | 44 +++---- crates/bevy_asset/src/io/mod.rs | 108 ++++++++++-------- crates/bevy_asset/src/io/processor_gated.rs | 16 +-- 5 files changed, 113 insertions(+), 96 deletions(-) diff --git a/crates/bevy_asset/src/io/file/file_asset.rs b/crates/bevy_asset/src/io/file/file_asset.rs index 56a566c219127..3c20702167af7 100644 --- a/crates/bevy_asset/src/io/file/file_asset.rs +++ b/crates/bevy_asset/src/io/file/file_asset.rs @@ -1,14 +1,35 @@ use crate::io::{ - get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, PathStream, - Reader, Writer, + get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, AsyncSeekForward, + PathStream, Reader, Writer, }; use async_fs::{read_dir, File}; +use futures_io::AsyncSeek; use futures_lite::StreamExt; +use core::{pin::Pin, task, task::Poll}; use std::path::Path; use super::{FileAssetReader, FileAssetWriter}; +impl AsyncSeekForward for File { + fn poll_seek_forward( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + offset: u64, + ) -> Poll> { + let offset: Result = offset.try_into(); + + if let Ok(offset) = offset { + Pin::new(&mut self).poll_seek(cx, futures_io::SeekFrom::Current(offset)) + } else { + Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seek position is out of range", + ))) + } + } +} + impl Reader for File {} impl AssetReader for FileAssetReader { diff --git a/crates/bevy_asset/src/io/file/sync_file_asset.rs b/crates/bevy_asset/src/io/file/sync_file_asset.rs index 188257ddc1006..5887724736b47 100644 --- a/crates/bevy_asset/src/io/file/sync_file_asset.rs +++ b/crates/bevy_asset/src/io/file/sync_file_asset.rs @@ -1,9 +1,9 @@ -use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; +use futures_io::{AsyncRead, AsyncWrite}; use futures_lite::Stream; use crate::io::{ - get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, PathStream, - Reader, Writer, + get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, AsyncSeekForward, + PathStream, Reader, Writer, }; use core::{pin::Pin, task::Poll}; @@ -29,14 +29,16 @@ impl AsyncRead for FileReader { } } -impl AsyncSeek for FileReader { - fn poll_seek( +impl AsyncSeekForward for FileReader { + fn poll_seek_forward( self: Pin<&mut Self>, _cx: &mut core::task::Context<'_>, - pos: std::io::SeekFrom, + offset: u64, ) -> Poll> { let this = self.get_mut(); - let seek = this.0.seek(pos); + let current = this.0.stream_position()?; + let seek = this.0.seek(std::io::SeekFrom::Start(current + offset)); + Poll::Ready(seek) } } diff --git a/crates/bevy_asset/src/io/memory.rs b/crates/bevy_asset/src/io/memory.rs index 4164d9bbe906b..2fa2579c581a9 100644 --- a/crates/bevy_asset/src/io/memory.rs +++ b/crates/bevy_asset/src/io/memory.rs @@ -2,13 +2,12 @@ use crate::io::{AssetReader, AssetReaderError, PathStream, Reader}; use alloc::sync::Arc; use bevy_utils::HashMap; use core::{pin::Pin, task::Poll}; -use futures_io::{AsyncRead, AsyncSeek}; +use futures_io::AsyncRead; use futures_lite::{ready, Stream}; use parking_lot::RwLock; -use std::{ - io::SeekFrom, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; + +use super::AsyncSeekForward; #[derive(Default, Debug)] struct DirInternal { @@ -247,37 +246,20 @@ impl AsyncRead for DataReader { } } -impl AsyncSeek for DataReader { - fn poll_seek( +impl AsyncSeekForward for DataReader { + fn poll_seek_forward( mut self: Pin<&mut Self>, _cx: &mut core::task::Context<'_>, - pos: SeekFrom, + offset: u64, ) -> Poll> { - let result = match pos { - SeekFrom::Start(offset) => offset.try_into(), - SeekFrom::End(offset) => self - .data - .value() - .len() - .try_into() - .map(|len: i64| len - offset), - SeekFrom::Current(offset) => self - .bytes_read - .try_into() - .map(|bytes_read: i64| bytes_read + offset), - }; + let result = self + .bytes_read + .try_into() + .map(|bytes_read: u64| bytes_read + offset); if let Ok(new_pos) = result { - if new_pos < 0 { - Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "seek position is out of range", - ))) - } else { - self.bytes_read = new_pos as _; - - Poll::Ready(Ok(new_pos as _)) - } + self.bytes_read = new_pos as _; + Poll::Ready(Ok(new_pos as _)) } else { Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, diff --git a/crates/bevy_asset/src/io/mod.rs b/crates/bevy_asset/src/io/mod.rs index aafa4f1f04990..47ca38b6851f7 100644 --- a/crates/bevy_asset/src/io/mod.rs +++ b/crates/bevy_asset/src/io/mod.rs @@ -28,12 +28,9 @@ use core::{ pin::Pin, task::{Context, Poll}, }; -use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; +use futures_io::{AsyncRead, AsyncWrite}; use futures_lite::{ready, Stream}; -use std::{ - io::SeekFrom, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; use thiserror::Error; /// Errors that occur while loading assets. @@ -83,13 +80,51 @@ pub const STACK_FUTURE_SIZE: usize = 10 * size_of::<&()>(); pub use stackfuture::StackFuture; +/// Asynchronously advances the cursor position by a specified number of bytes. +/// +/// This trait is a simplified version of the [`futures_io::AsyncSeek`] trait, providing +/// support exclusively for the [`futures_io::SeekFrom::Current`] variant. It allows for relative +/// seeking from the current cursor position. +pub trait AsyncSeekForward { + /// Attempts to asynchronously seek forward by a specified number of bytes from the current cursor position. + /// + /// Seeking beyond the end of the stream is allowed and the behavior for this case is defined by the implementation. + /// The new position, relative to the beginning of the stream, should be returned upon successful completion + /// of the seek operation. + /// + /// If the seek operation completes successfully, + /// the new position relative to the beginning of the stream should be returned. + /// + /// # Implementation + /// + /// Implementations of this trait should handle [`Poll::Pending`] correctly, converting + /// [`std::io::ErrorKind::WouldBlock`] errors into [`Poll::Pending`] to indicate that the operation is not + /// yet complete and should be retried, and either internally retry or convert + /// [`std::io::ErrorKind::Interrupted`] into another error kind. + fn poll_seek_forward( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + offset: u64, + ) -> Poll>; +} + +impl AsyncSeekForward for Box { + fn poll_seek_forward( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + offset: u64, + ) -> Poll> { + Pin::new(&mut **self).poll_seek_forward(cx, offset) + } +} + /// A type returned from [`AssetReader::read`], which is used to read the contents of a file /// (or virtual file) corresponding to an asset. /// -/// This is essentially a trait alias for types implementing [`AsyncRead`] and [`AsyncSeek`]. +/// This is essentially a trait alias for types implementing [`AsyncRead`] and [`AsyncSeekForward`]. /// The only reason a blanket implementation is not provided for applicable types is to allow /// implementors to override the provided implementation of [`Reader::read_to_end`]. -pub trait Reader: AsyncRead + AsyncSeek + Unpin + Send + Sync { +pub trait Reader: AsyncRead + AsyncSeekForward + Unpin + Send + Sync { /// Reads the entire contents of this reader and appends them to a vec. /// /// # Note for implementors @@ -559,32 +594,20 @@ impl AsyncRead for VecReader { } } -impl AsyncSeek for VecReader { - fn poll_seek( +impl AsyncSeekForward for VecReader { + fn poll_seek_forward( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, - pos: SeekFrom, + offset: u64, ) -> Poll> { - let result = match pos { - SeekFrom::Start(offset) => offset.try_into(), - SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset), - SeekFrom::Current(offset) => self - .bytes_read - .try_into() - .map(|bytes_read: i64| bytes_read + offset), - }; + let result = self + .bytes_read + .try_into() + .map(|bytes_read: u64| bytes_read + offset); if let Ok(new_pos) = result { - if new_pos < 0 { - Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "seek position is out of range", - ))) - } else { - self.bytes_read = new_pos as _; - - Poll::Ready(Ok(new_pos as _)) - } + self.bytes_read = new_pos as _; + Poll::Ready(Ok(new_pos as _)) } else { Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, @@ -644,32 +667,21 @@ impl<'a> AsyncRead for SliceReader<'a> { } } -impl<'a> AsyncSeek for SliceReader<'a> { - fn poll_seek( +impl<'a> AsyncSeekForward for SliceReader<'a> { + fn poll_seek_forward( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, - pos: SeekFrom, + offset: u64, ) -> Poll> { - let result = match pos { - SeekFrom::Start(offset) => offset.try_into(), - SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset), - SeekFrom::Current(offset) => self - .bytes_read - .try_into() - .map(|bytes_read: i64| bytes_read + offset), - }; + let result = self + .bytes_read + .try_into() + .map(|bytes_read: u64| bytes_read + offset); if let Ok(new_pos) = result { - if new_pos < 0 { - Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "seek position is out of range", - ))) - } else { - self.bytes_read = new_pos as _; + self.bytes_read = new_pos as _; - Poll::Ready(Ok(new_pos as _)) - } + Poll::Ready(Ok(new_pos as _)) } else { Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, diff --git a/crates/bevy_asset/src/io/processor_gated.rs b/crates/bevy_asset/src/io/processor_gated.rs index 963af9fd51556..2179379070ce1 100644 --- a/crates/bevy_asset/src/io/processor_gated.rs +++ b/crates/bevy_asset/src/io/processor_gated.rs @@ -7,15 +7,15 @@ use alloc::sync::Arc; use async_lock::RwLockReadGuardArc; use bevy_utils::tracing::trace; use core::{pin::Pin, task::Poll}; -use futures_io::{AsyncRead, AsyncSeek}; -use std::{io::SeekFrom, path::Path}; +use futures_io::AsyncRead; +use std::path::Path; -use super::ErasedAssetReader; +use super::{AsyncSeekForward, ErasedAssetReader}; /// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a /// given path until that path has been processed by [`AssetProcessor`]. /// -/// [`AssetProcessor`]: crate::processor::AssetProcessor +/// [`AssetProcessor`]: crate::processor::AssetProcessor pub struct ProcessorGatedReader { reader: Box, source: AssetSourceId<'static>, @@ -142,13 +142,13 @@ impl AsyncRead for TransactionLockedReader<'_> { } } -impl AsyncSeek for TransactionLockedReader<'_> { - fn poll_seek( +impl AsyncSeekForward for TransactionLockedReader<'_> { + fn poll_seek_forward( mut self: Pin<&mut Self>, cx: &mut core::task::Context<'_>, - pos: SeekFrom, + offset: u64, ) -> Poll> { - Pin::new(&mut self.reader).poll_seek(cx, pos) + Pin::new(&mut self.reader).poll_seek_forward(cx, offset) } }