From 1e27c03d255f508b6f1f1adacf0fa2d6305171e7 Mon Sep 17 00:00:00 2001 From: bowzee Date: Sun, 30 Jun 2024 15:51:51 +0300 Subject: [PATCH] feat(Stream): raceAll --- .changeset/stream-race-all.md | 5 ++++ packages/effect/src/Stream.ts | 30 +++++++++++++++++++ packages/effect/src/internal/stream.ts | 35 ++++++++++++++++++++++ packages/effect/test/Stream/racing.test.ts | 21 +++++++++++++ 4 files changed, 91 insertions(+) create mode 100644 .changeset/stream-race-all.md create mode 100644 packages/effect/test/Stream/racing.test.ts diff --git a/.changeset/stream-race-all.md b/.changeset/stream-race-all.md new file mode 100644 index 00000000000..2a37d552029 --- /dev/null +++ b/.changeset/stream-race-all.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +feat(Stream): raceAll \ No newline at end of file diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index be3c6d9537f..778aabea721 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2664,6 +2664,36 @@ export const provideSomeLayer: { ): Stream> } = internal.provideSomeLayer +/** + * Returns a stream that mirrors the first source stream to emit an item. + * @example + * import { Stream, Schedule, Console, Effect } from "effect" + * + * const stream = Stream.raceAll( + * Stream.fromSchedule(Schedule.spaced('1 millis')), + * Stream.fromSchedule(Schedule.spaced('2 millis')), + * Stream.fromSchedule(Schedule.spaced('4 millis')), + * ).pipe(Stream.take(6), Stream.tap(Console.log)) + * + * Effect.runPromise(Stream.runDrain(stream)) + * // Output each millisecond from the first stream, the rest streams are interrupted + * // 0 + * // 1 + * // 2 + * // 3 + * // 4 + * // 5 + * @since 3.5.0 + * @category utils + */ +export const raceAll: >>( + ...streams: T +) => Stream< + [T[number]] extends [never] ? never : T[number] extends Stream ? A : never, + [T[number]] extends [never] ? never : T[number] extends Stream ? E : never, + [T[number]] extends [never] ? never : T[number] extends Stream ? R : never +> = internal.raceAll + /** * Constructs a stream from a range of integers, including both endpoints. * diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 9fd8aa29cf1..fcac7602d20 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -4686,6 +4686,41 @@ export const range = (min: number, max: number, chunkSize = DefaultChunkSize): S return new StreamImpl(go(min, max, chunkSize)) }) +export const raceAll = >>( + ...streams: T +): Stream.Stream< + [T[number]] extends [never] ? never : T[number] extends Stream.Stream ? A : never, + [T[number]] extends [never] ? never : T[number] extends Stream.Stream ? E : never, + [T[number]] extends [never] ? never : T[number] extends Stream.Stream ? R : never +> => { + let finished = false + return unwrap( + Effect.gen(function*() { + const halts = yield* Effect.all(streams.map(() => Deferred.make())) + let finishes = streams.map(() => false) + return mergeAll( + streams.map((s, index) => + s.pipe( + tap(() => { + if (finished) { + return Effect.void + } + finished = true + finishes = finishes.map((_, i) => !(i === index)) + return Effect.all( + halts.map((def, i) => i === index ? Effect.void : Deferred.succeed(def, void 0)) + ) + }), + takeWhile(() => !finishes[index]), + interruptWhenDeferred(halts[index]!) + ) + ), + { concurrency: "unbounded" } + ) + }) + ) +} + /** @internal */ export const rechunk = dual< (n: number) => (self: Stream.Stream) => Stream.Stream, diff --git a/packages/effect/test/Stream/racing.test.ts b/packages/effect/test/Stream/racing.test.ts new file mode 100644 index 00000000000..18cd6488f8b --- /dev/null +++ b/packages/effect/test/Stream/racing.test.ts @@ -0,0 +1,21 @@ +import * as Chunk from "effect/Chunk" +import * as Effect from "effect/Effect" +import * as Stream from "effect/Stream" +import * as it from "effect/test/utils/extend" +import { assert, describe } from "vitest" + +describe("Stream", () => { + it.effect("raceAll", () => + Effect.gen(function*($) { + const result = yield* $( + Stream.raceAll( + Stream.make(0, 1, 2, 3), + Stream.make(4, 5, 6, 7), + Stream.make(7, 8, 9, 10) + ), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray) + ) + assert.deepStrictEqual(result, [0, 1, 2, 3]) + })) +})