Skip to content

Commit

Permalink
feat(Stream): switchMap
Browse files Browse the repository at this point in the history
  • Loading branch information
dilame authored and tim-smart committed Jun 11, 2024
1 parent 3572b58 commit e843763
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/young-hats-roll.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

feat(Stream): switchMap
39 changes: 39 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3503,6 +3503,45 @@ export const sync: <A>(evaluate: LazyArg<A>) => Stream<A> = internal.sync
*/
export const suspend: <A, E, R>(stream: LazyArg<Stream<A, E, R>>) => Stream<A, E, R> = internal.suspend

/**
* Projects each source value to a Stream which is merged in the output Stream,
* emitting values only from the most recently projected Stream.
* @param project A function that, when applied to an item emitted by the source Stream, returns a Stream.
* @returns A function that returns a Stream that emits the result of applying the projection function to each item emitted by the source Stream and taking only the values from the most recently projected inner Stream.
* @example
* import { Console, Effect, Schedule, Stream } from 'effect';
*
* const upstream = Stream.fromSchedule(Schedule.spaced('4 second')).pipe(
* Stream.switchMap(
* (i) => Stream.fromSchedule(Schedule.spaced('1 second')).pipe(Stream.as(i))
* ),
* Stream.tap(Console.log),
* Stream.take(9),
* );
*
* Effect.runFork(upstream.pipe(Stream.runDrain));
* // 0
* // 0
* // 0
* // 1
* // 1
* // 1
* // 2
* // 2
* // 2
* @since 3.4.0
* @category sequencing
*/
export const switchMap: {
<A, A2, E2, R2>(
project: (a: A) => Stream<A2, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
project: (a: A) => Stream<A2, E2, R2>
): Stream<A2, E | E2, R | R2>
} = internal.switchMap

/**
* Takes the specified number of elements from this stream.
*
Expand Down
19 changes: 19 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5850,6 +5850,25 @@ export const splitOnChunk = dual<
export const splitLines = <E, R>(self: Stream.Stream<string, E, R>): Stream.Stream<string, E, R> =>
pipeThroughChannel(self, channel.splitLines())

/** @internal */
export const switchMap = dual<
<A, A2, E2, R2>(
project: (a: A) => Stream.Stream<A2, E2, R2>
) => <E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A2, E2 | E, R2 | R>,
<A, E, R, A2, E2, R2>(
self: Stream.Stream<A, E, R>,
project: (a: A) => Stream.Stream<A2, E2, R2>
) => Stream.Stream<A2, E2 | E, R2 | R>
>(
(args) => isStream(args[0]),
<A, E, R, A2, E2, R2>(
self: Stream.Stream<A, E, R>,
project: (a: A) => Stream.Stream<A2, E2, R2>
): Stream.Stream<A2, E | E2, R | R2> => {
return flatMap(self, project, { switch: true, concurrency: 1 })
}
)

/** @internal */
export const succeed = <A>(value: A): Stream.Stream<A> => fromChunk(Chunk.of(value))

Expand Down

0 comments on commit e843763

Please sign in to comment.