diff --git a/.changeset/stream-race-all.md b/.changeset/stream-race-all.md new file mode 100644 index 0000000000..ae50dfabfe --- /dev/null +++ b/.changeset/stream-race-all.md @@ -0,0 +1,24 @@ +--- +"effect": minor +--- + +feat(Stream): implement "raceAll" operator, which returns a stream that mirrors the first source stream to emit an item. + +```ts +import { Stream, Schedule, Console, Effect } from "effect"; + +const stream = Stream.raceAll( + Stream.fromSchedule(Schedule.spaced("1 millis")), + Stream.fromSchedule(Schedule.spaced("2 millis")), + Stream.fromSchedule(Schedule.spaced("4 millis")), +).pipe(Stream.take(6), Stream.tap(Console.log)); + +Effect.runPromise(Stream.runDrain(stream)); +// Output only from the first stream, the rest streams are interrupted +// 0 +// 1 +// 2 +// 3 +// 4 +// 5 +``` diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 798321d434..a494f0b9b5 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2694,6 +2694,40 @@ export const provideSomeLayer: { ): Stream> } = internal.provideSomeLayer +/** + * Returns a stream that mirrors the first upstream to emit an item. + * As soon as one of the upstream emits a first value, all the others are interrupted. + * The resulting stream will forward all items from the "winning" source stream. + * Any upstream failures will cause the returned stream to fail. + * + * @example + * import { Stream, Schedule, Console, Effect } from "effect" + * + * const stream = Stream.raceAll( + * Stream.fromSchedule(Schedule.spaced('1 millis')), + * Stream.fromSchedule(Schedule.spaced('2 millis')), + * Stream.fromSchedule(Schedule.spaced('4 millis')), + * ).pipe(Stream.take(6), Stream.tap(Console.log)) + * + * Effect.runPromise(Stream.runDrain(stream)) + * // Output each millisecond from the first stream, the rest streams are interrupted + * // 0 + * // 1 + * // 2 + * // 3 + * // 4 + * // 5 + * @since 3.5.0 + * @category racing + */ +export const raceAll: >>( + ...streams: S +) => Stream< + Stream.Success, + Stream.Error, + Stream.Context +> = internal.raceAll + /** * Constructs a stream from a range of integers, including both endpoints. * diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 96bb5ce5fc..6b39555748 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -4686,6 +4686,40 @@ export const range = (min: number, max: number, chunkSize = DefaultChunkSize): S return new StreamImpl(go(min, max, chunkSize)) }) +export const raceAll = >>( + ...streams: S +): Stream.Stream< + Stream.Stream.Success, + Stream.Stream.Error, + Stream.Stream.Context +> => + Deferred.make().pipe( + Effect.map((halt) => { + let winner: number | null = null + return mergeAll( + streams.map((stream, index) => + stream.pipe( + takeWhile(() => { + if (winner === null) { + winner = index + Deferred.unsafeDone(halt, Exit.void) + return true + } + return winner === index + }), + interruptWhen( + Deferred.await(halt).pipe( + Effect.flatMap(() => winner === index ? Effect.never : Effect.void) + ) + ) + ) + ), + { concurrency: streams.length } + ) + }), + unwrap + ) + /** @internal */ export const rechunk = dual< (n: number) => (self: Stream.Stream) => Stream.Stream, diff --git a/packages/effect/test/Stream/racing.test.ts b/packages/effect/test/Stream/racing.test.ts new file mode 100644 index 0000000000..dbd7b58a42 --- /dev/null +++ b/packages/effect/test/Stream/racing.test.ts @@ -0,0 +1,67 @@ +import * as Chunk from "effect/Chunk" +import * as Effect from "effect/Effect" +import * as Fiber from "effect/Fiber" +import * as Schedule from "effect/Schedule" +import * as Stream from "effect/Stream" +import * as it from "effect/test/utils/extend" +import * as TestClock from "effect/TestClock" +import { assert, describe } from "vitest" + +describe("Stream", () => { + it.effect("raceAll sync", () => + Effect.gen(function*($) { + const result = yield* $( + Stream.raceAll( + Stream.make(0, 1, 2, 3), + Stream.make(4, 5, 6, 7), + Stream.make(7, 8, 9, 10) + ), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray) + ) + assert.deepStrictEqual(result, [0, 1, 2, 3]) + })) + + it.effect("raceAll async", () => + Effect.gen(function*($) { + const fiber = yield* $( + Stream.raceAll( + Stream.fromSchedule(Schedule.spaced("1 second")), + Stream.fromSchedule(Schedule.spaced("2 second")) + ), + Stream.take(5), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray), + Effect.fork + ) + yield* TestClock.adjust("5 second") + const result = yield* Fiber.join(fiber) + assert.deepStrictEqual(result, [0, 1, 2, 3, 4]) + })) + + it.effect("raceAll combined async + sync", () => + Effect.gen(function*($) { + const result = yield* $( + Stream.raceAll( + Stream.fromSchedule(Schedule.spaced("1 second")), + Stream.make(0, 1, 2, 3) + ), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray) + ) + assert.deepStrictEqual(result, [0, 1, 2, 3]) + })) + + it.effect("raceAll combined sync + async", () => + Effect.gen(function*($) { + const result = yield* $( + Stream.raceAll( + Stream.make(0, 1, 2, 3), + Stream.fromSchedule(Schedule.spaced("1 second")) + ), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray) + ) + assert.deepStrictEqual(result, [0, 1, 2, 3]) + })) +})