Skip to content

Commit

Permalink
remove F
Browse files Browse the repository at this point in the history
  • Loading branch information
kingwingfly committed Jun 6, 2024
1 parent 77406d4 commit bc8ffa2
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 46 deletions.
7 changes: 3 additions & 4 deletions fav_core/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,16 @@ impl<T> ProtoLocal for T where T: PathInfo + MessageFull {}
/// Make it able to save the resource to local
pub trait SaveLocal: Net {
/// Save the resource to local.
/// `F: Fn() -> Future<...>`, if Future is ready, one can cleanup and
/// `cancelled: Future<...>`, if Future is ready, one can cleanup and
/// shutdown gracefully, then return `FavCoreError::Cancel`.
fn download<R, F, Fut, Any>(
fn download<R, Fut, Any>(
&self,
res: &mut R,
urls: Vec<Url>,
cancelled: F,
cancelled: Fut,
) -> impl Future<Output = FavCoreResult<()>>
where
R: Res,
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send;
}
37 changes: 16 additions & 21 deletions fav_core/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,10 @@ pub trait LocalSetOps: Net + HttpConfig {
/// The set type the operations on
type Set: Set;
/// Fetch one resource set,
/// `F: Fn() -> Future<...>`, if Future is ready, one can cleanup and
/// `cancelled: Future<...>`, if Future is ready, one can cleanup and
/// shutdown gracefully, then return `FavCoreError::Cancel`.
async fn fetch_set<F, Fut, Any>(&self, set: &mut Self::Set, cancelled: F) -> FavCoreResult<()>
async fn fetch_set<Fut, Any>(&self, set: &mut Self::Set, cancelled: Fut) -> FavCoreResult<()>
where
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send;
}
Expand All @@ -101,27 +100,25 @@ pub trait LocalResOps: Net + HttpConfig {
/// The resource type the operations on
type Res: Res;
/// Fetch one resource,
/// `F: Fn() -> Future<...>`, if Future is ready, one can cleanup and
/// `cancelled: Future<...>`, if Future is ready, one can cleanup and
/// shutdown gracefully, then return `FavCoreError::Cancel`.
fn fetch_res<F, Fut, Any>(
async fn fetch_res<Fut, Any>(
&self,
resource: &mut Self::Res,
cancelled: F,
) -> impl Future<Output = FavCoreResult<()>>
cancelled: Fut,
) -> FavCoreResult<()>
where
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send;
/// Pull one resource,
/// `F: Fn() -> Future<...>`, if Future is ready, one can cleanup and
/// `cancelled: Future<...>`, if Future is ready, one can cleanup and
/// shutdown gracefully, then return `FavCoreError::Cancel`.
async fn pull_res<F, Fut, Any>(
async fn pull_res<Fut, Any>(
&self,
resource: &mut Self::Res,
cancelled: F,
cancelled: Fut,
) -> FavCoreResult<()>
where
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send;
}
Expand All @@ -146,7 +143,7 @@ pub trait SetOpsExt: SetOps {
where
SS: Sets<Set = Self::Set>,
{
batch_op_set(sets, |r, f| self.fetch_set(r, f))
batch_op_set(sets, |s, fut| self.fetch_set(s, fut))
}
}

Expand All @@ -157,14 +154,13 @@ pub trait SetOpsExt: SetOps {
pub async fn batch_op_set<'a, SS, F, T>(sets: &'a mut SS, mut f: F) -> FavCoreResult<()>
where
SS: Sets + 'a,
F: FnMut(&'a mut SS::Set, Box<dyn FnOnce() -> WaitForCancellationFutureOwned + Send>) -> T,
F: FnMut(&'a mut SS::Set, WaitForCancellationFutureOwned) -> T,
T: Future<Output = FavCoreResult<()>>,
{
let token = CancellationToken::new();
let mut stream = tokio_stream::iter(sets.iter_mut())
.map(|s| {
let token1 = token.clone();
let shutdown = Box::new(move || token1.cancelled_owned());
let shutdown = token.clone().cancelled_owned();
let fut = f(s, shutdown);
let token = token.clone();
async move {
Expand Down Expand Up @@ -221,15 +217,15 @@ pub trait ResOpsExt: ResOps {
where
S: Set<Res = Self::Res>,
{
batch_op_res(set, |r, f| self.fetch_res(r, f))
batch_op_res(set, |r, fut| self.fetch_res(r, fut))
}

/// **Asynchronously** pull resourses in set using [`ResOps::pull_res`].
fn batch_pull_res<'a, S>(&self, set: &'a mut S) -> impl Future<Output = FavCoreResult<()>>
where
S: Set<Res = Self::Res>,
{
batch_op_res(set, |r, f| self.pull_res(r, f))
batch_op_res(set, |r, fut| self.pull_res(r, fut))
}
}

Expand Down Expand Up @@ -270,14 +266,13 @@ pub trait ResOpsExt: ResOps {
pub async fn batch_op_res<'a, S, F, T>(set: &'a mut S, mut f: F) -> FavCoreResult<()>
where
S: Set + 'a,
F: FnMut(&'a mut S::Res, Box<dyn FnOnce() -> WaitForCancellationFutureOwned + Send>) -> T,
F: FnMut(&'a mut S::Res, WaitForCancellationFutureOwned) -> T,
T: Future<Output = FavCoreResult<()>>,
{
let token = CancellationToken::new();
let mut stream = tokio_stream::iter(set.iter_mut())
.map(|s| {
let token1 = token.clone();
let shutdown = Box::new(move || token1.cancelled_owned());
let shutdown = token.clone().cancelled_owned();
let fut = f(s, shutdown);
let token = token.clone();
async move {
Expand Down
9 changes: 3 additions & 6 deletions fav_core/src/test_utils/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,16 @@ impl AuthOps for App {
impl ResOps for App {
type Res = TestRes;

async fn fetch_res<F, Fut, Any>(&self, _: &mut Self::Res, _: F) -> FavCoreResult<()>
async fn fetch_res<Fut, Any>(&self, _: &mut Self::Res, _: Fut) -> FavCoreResult<()>
where
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send,
{
todo!()
}

async fn pull_res<F, Fut, Any>(&self, _: &mut Self::Res, _: F) -> FavCoreResult<()>
async fn pull_res<Fut, Any>(&self, _: &mut Self::Res, _: Fut) -> FavCoreResult<()>
where
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send,
{
Expand All @@ -63,9 +61,8 @@ impl ResOps for App {
impl SetOps for App {
type Set = TestSet;

async fn fetch_set<F, Fut, Any>(&self, _: &mut Self::Set, _: F) -> FavCoreResult<()>
async fn fetch_set<Fut, Any>(&self, _: &mut Self::Set, _: Fut) -> FavCoreResult<()>
where
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send,
{
Expand Down
7 changes: 3 additions & 4 deletions fav_utils/src/bili/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ impl PathInfo for BiliSets {
}

impl SaveLocal for Bili {
async fn download<R, F, Fut, Any>(
async fn download<R, Fut, Any>(
&self,
res: &mut R,
urls: Vec<reqwest::Url>,
f: F,
cancelled: Fut,
) -> FavCoreResult<()>
where
R: Res,
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send,
{
Expand Down Expand Up @@ -76,7 +75,7 @@ impl SaveLocal for Bili {
res.on_status(StatusFlags::SAVED);
Ok(())
},
_ = f() => {
_ = cancelled => {
file_v.into_inner().unwrap().close()?;
file_a.into_inner().unwrap().close()?;
Err(FavCoreError::Cancel)
Expand Down
28 changes: 17 additions & 11 deletions fav_utils/src/bili/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ impl SetsOps for Bili {
impl SetOps for Bili {
type Set = BiliSet;

async fn fetch_set<F, Fut, Any>(&self, set: &mut Self::Set, cancelled: F) -> FavCoreResult<()>
async fn fetch_set<Fut, Any>(&self, set: &mut Self::Set, cancelled: Fut) -> FavCoreResult<()>
where
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send,
{
Expand All @@ -94,17 +93,20 @@ impl SetOps for Bili {
} => {
res
}
_ = cancelled() => Err(FavCoreError::Cancel)
_ = cancelled => Err(FavCoreError::Cancel)
}
}
}

impl ResOps for Bili {
type Res = BiliRes;

async fn fetch_res<F, Fut, Any>(&self, resource: &mut Self::Res, f: F) -> FavCoreResult<()>
async fn fetch_res<Fut, Any>(
&self,
resource: &mut Self::Res,
cancelled: Fut,
) -> FavCoreResult<()>
where
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send,
{
Expand All @@ -119,15 +121,18 @@ impl ResOps for Bili {
resource.on_status(StatusFlags::FETCHED);
Ok(())
},
_ = f() => {
_ = cancelled => {
Err(FavCoreError::Cancel)
}
}
}

async fn pull_res<F, Fut, Any>(&self, resource: &mut Self::Res, f: F) -> FavCoreResult<()>
async fn pull_res<Fut, Any>(
&self,
resource: &mut Self::Res,
cancelled: Fut,
) -> FavCoreResult<()>
where
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send,
{
Expand Down Expand Up @@ -157,7 +162,8 @@ impl ResOps for Bili {
}
Err(e) => return Err(e),
};
self.download(resource, vec![audio, video], f).await?;
self.download(resource, vec![audio, video], cancelled)
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -267,7 +273,7 @@ mod tests {
let mut sets = BiliSets::default();
bili.fetch_sets(&mut sets).await.unwrap();
let set = sets.iter_mut().min_by_key(|s| s.media_count).unwrap();
bili.fetch_set(set, tokio::signal::ctrl_c).await.unwrap();
bili.fetch_set(set, tokio::signal::ctrl_c()).await.unwrap();
bili.batch_fetch_res(set).await.unwrap();
bili.batch_pull_res(set).await.unwrap();
sets.write().unwrap();
Expand All @@ -280,7 +286,7 @@ mod tests {
let mut sets = BiliSets::read().unwrap();
bili.fetch_sets(&mut sets).await.unwrap();
let set = sets.iter_mut().min_by_key(|s| s.media_count).unwrap();
bili.fetch_set(set, tokio::signal::ctrl_c).await.unwrap();
bili.fetch_set(set, tokio::signal::ctrl_c()).await.unwrap();
set.on_res_status(StatusFlags::TRACK);
let mut sub = set.subset(|r| r.check_status(StatusFlags::TRACK));
bili.batch_fetch_res(&mut sub).await.unwrap();
Expand Down

0 comments on commit bc8ffa2

Please sign in to comment.