Skip to content

Commit

Permalink
New caching methods
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Jul 16, 2024
1 parent 9068581 commit a4736ca
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions src/operator/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,21 @@ impl<I: Data + Send> StreamCache<I> {
cache.data.clone()
}

pub fn into_inner(self) -> HashMap<CoordUInt, Vec<I>> {
assert!(self.data.lock().is_complete(), "Reading cache before it was complete. execution from a cached stream must start after the previous StreamContext has completed!");
Arc::into_inner(self.data).expect("multiple Arc ref in StreamCache").into_inner().data
}

pub fn values_cloned(&self) -> impl Iterator<Item=I> {
let clone = self.inner_cloned();
clone.into_values().flatten()
}

pub fn into_values(self) -> impl Iterator<Item=I> {
let inner = self.into_inner();
inner.into_values().flatten()
}

/// Consume the cache creating a new [Stream] in a [StreamContext].
///
/// The [StreamCache] will behave as a source with the same parallelism (and distribution of data)
Expand All @@ -155,6 +170,21 @@ impl<I: Data + Send> StreamCache<I> {
let source = CacheSource::new(self.replication, self.data);
ctx.stream(source)
}

/// Consume the cache creating a new [Stream] in a [StreamContext] with the same [RuntimeConfig].
///
/// The [StreamCache] will behave as a source with the same parallelism (and distribution of data)
/// as the original [Stream] it was cached from.
///
/// **Returns**: a tuple containing the new [StreamContext] and the [Stream] with the [CacheSource].
///
/// + **ATTENTION** ⚠️: The cache can be resumed **only after** the execution of its origin
/// `StreamContext` has terminated.
pub fn stream(self) -> (StreamContext, Stream<CacheSource<I>>) {
let ctx = StreamContext::new(self.config.clone());
let stream = self.stream_in(&ctx);
(ctx, stream)
}
}

impl<I: Data + Send> Clone for StreamCache<I> {
Expand Down

0 comments on commit a4736ca

Please sign in to comment.