From e4e460abffe7a7c70d8c3aa2420cfda38397e3e2 Mon Sep 17 00:00:00 2001 From: bowzee Date: Mon, 10 Jun 2024 19:48:39 +0300 Subject: [PATCH] feat(Stream): switchMap --- .changeset/young-hats-roll.md | 5 ++++ packages/effect/src/Stream.ts | 39 ++++++++++++++++++++++++++ packages/effect/src/internal/stream.ts | 19 +++++++++++++ 3 files changed, 63 insertions(+) create mode 100644 .changeset/young-hats-roll.md diff --git a/.changeset/young-hats-roll.md b/.changeset/young-hats-roll.md new file mode 100644 index 00000000000..1a231766ff0 --- /dev/null +++ b/.changeset/young-hats-roll.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +feat(Stream): switchMap diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index d6b954b2ec8..d53217ae2c3 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -3503,6 +3503,45 @@ export const sync: (evaluate: LazyArg) => Stream = internal.sync */ export const suspend: (stream: LazyArg>) => Stream = internal.suspend +/** + * Projects each source value to a Stream which is merged in the output Stream, + * emitting values only from the most recently projected Stream. + * @param project A function that, when applied to an item emitted by the source Stream, returns a Stream. + * @returns A function that returns a Stream that emits the result of applying the projection function to each item emitted by the source Stream and taking only the values from the most recently projected inner Stream. + * @example + * import { Console, Effect, Schedule, Stream } from 'effect'; + * + * const upstream = Stream.fromSchedule(Schedule.spaced('4 second')).pipe( + * Stream.switchMap( + * (i) => Stream.fromSchedule(Schedule.spaced('1 second')).pipe(Stream.as(i)) + * ), + * Stream.tap(Console.log), + * Stream.take(9), + * ); + * + * Effect.runFork(upstream.pipe(Stream.runDrain)); + * // 0 + * // 0 + * // 0 + * // 1 + * // 1 + * // 1 + * // 2 + * // 2 + * // 2 + * @since 3.4.0 + * @category sequencing + */ +export const switchMap: { + ( + project: (a: A) => Stream + ): (self: Stream) => Stream + ( + self: Stream, + project: (a: A) => Stream + ): Stream +} = internal.switchMap + /** * Takes the specified number of elements from this stream. * diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 9e772515f7c..b2703708760 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -5850,6 +5850,25 @@ export const splitOnChunk = dual< export const splitLines = (self: Stream.Stream): Stream.Stream => pipeThroughChannel(self, channel.splitLines()) +/** @internal */ +export const switchMap = dual< + ( + project: (a: A) => Stream.Stream + ) => (self: Stream.Stream) => Stream.Stream, + ( + self: Stream.Stream, + project: (a: A) => Stream.Stream + ) => Stream.Stream +>( + (args) => isStream(args[0]), + ( + self: Stream.Stream, + project: (a: A) => Stream.Stream + ): Stream.Stream => { + return flatMap(self, project, { switch: true, concurrency: 1 }) + } +) + /** @internal */ export const succeed = (value: A): Stream.Stream => fromChunk(Chunk.of(value))