diff --git a/src/operator/cache.rs b/src/operator/cache.rs index 5dff436..c3e1e91 100644 --- a/src/operator/cache.rs +++ b/src/operator/cache.rs @@ -136,6 +136,21 @@ impl StreamCache { cache.data.clone() } + pub fn into_inner(self) -> HashMap> { + assert!(self.data.lock().is_complete(), "Reading cache before it was complete. execution from a cached stream must start after the previous StreamContext has completed!"); + Arc::into_inner(self.data).expect("multiple Arc ref in StreamCache").into_inner().data + } + + pub fn values_cloned(&self) -> impl Iterator { + let clone = self.inner_cloned(); + clone.into_values().flatten() + } + + pub fn into_values(self) -> impl Iterator { + let inner = self.into_inner(); + inner.into_values().flatten() + } + /// Consume the cache creating a new [Stream] in a [StreamContext]. /// /// The [StreamCache] will behave as a source with the same parallelism (and distribution of data) @@ -155,6 +170,21 @@ impl StreamCache { let source = CacheSource::new(self.replication, self.data); ctx.stream(source) } + + /// Consume the cache creating a new [Stream] in a [StreamContext] with the same [RuntimeConfig]. + /// + /// The [StreamCache] will behave as a source with the same parallelism (and distribution of data) + /// as the original [Stream] it was cached from. + /// + /// **Returns**: a tuple containing the new [StreamContext] and the [Stream] with the [CacheSource]. + /// + /// + **ATTENTION** ⚠️: The cache can be resumed **only after** the execution of its origin + /// `StreamContext` has terminated. + pub fn stream(self) -> (StreamContext, Stream>) { + let ctx = StreamContext::new(self.config.clone()); + let stream = self.stream_in(&ctx); + (ctx, stream) + } } impl Clone for StreamCache {