From 02bcb964ea94b5784a5668878707b3b1f92cd2a7 Mon Sep 17 00:00:00 2001 From: bowzee Date: Sun, 30 Jun 2024 15:51:51 +0300 Subject: [PATCH 1/9] feat(Stream): raceAll --- .changeset/stream-race-all.md | 5 ++++ packages/effect/src/Stream.ts | 30 +++++++++++++++++++ packages/effect/src/internal/stream.ts | 35 ++++++++++++++++++++++ packages/effect/test/Stream/racing.test.ts | 21 +++++++++++++ 4 files changed, 91 insertions(+) create mode 100644 .changeset/stream-race-all.md create mode 100644 packages/effect/test/Stream/racing.test.ts diff --git a/.changeset/stream-race-all.md b/.changeset/stream-race-all.md new file mode 100644 index 0000000000..2a37d55202 --- /dev/null +++ b/.changeset/stream-race-all.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +feat(Stream): raceAll \ No newline at end of file diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 798321d434..9cda50ba2d 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2694,6 +2694,36 @@ export const provideSomeLayer: { ): Stream> } = internal.provideSomeLayer +/** + * Returns a stream that mirrors the first source stream to emit an item. + * @example + * import { Stream, Schedule, Console, Effect } from "effect" + * + * const stream = Stream.raceAll( + * Stream.fromSchedule(Schedule.spaced('1 millis')), + * Stream.fromSchedule(Schedule.spaced('2 millis')), + * Stream.fromSchedule(Schedule.spaced('4 millis')), + * ).pipe(Stream.take(6), Stream.tap(Console.log)) + * + * Effect.runPromise(Stream.runDrain(stream)) + * // Output each millisecond from the first stream, the rest streams are interrupted + * // 0 + * // 1 + * // 2 + * // 3 + * // 4 + * // 5 + * @since 3.5.0 + * @category utils + */ +export const raceAll: >>( + ...streams: T +) => Stream< + [T[number]] extends [never] ? never : T[number] extends Stream ? A : never, + [T[number]] extends [never] ? never : T[number] extends Stream ? E : never, + [T[number]] extends [never] ? never : T[number] extends Stream ? R : never +> = internal.raceAll + /** * Constructs a stream from a range of integers, including both endpoints. * diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 96bb5ce5fc..3d8a32ad45 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -4686,6 +4686,41 @@ export const range = (min: number, max: number, chunkSize = DefaultChunkSize): S return new StreamImpl(go(min, max, chunkSize)) }) +export const raceAll = >>( + ...streams: T +): Stream.Stream< + [T[number]] extends [never] ? never : T[number] extends Stream.Stream ? A : never, + [T[number]] extends [never] ? never : T[number] extends Stream.Stream ? E : never, + [T[number]] extends [never] ? never : T[number] extends Stream.Stream ? R : never +> => { + let finished = false + return unwrap( + Effect.gen(function*() { + const halts = yield* Effect.all(streams.map(() => Deferred.make())) + let finishes = streams.map(() => false) + return mergeAll( + streams.map((s, index) => + s.pipe( + tap(() => { + if (finished) { + return Effect.void + } + finished = true + finishes = finishes.map((_, i) => !(i === index)) + return Effect.all( + halts.map((def, i) => i === index ? Effect.void : Deferred.succeed(def, void 0)) + ) + }), + takeWhile(() => !finishes[index]), + interruptWhenDeferred(halts[index]!) + ) + ), + { concurrency: "unbounded" } + ) + }) + ) +} + /** @internal */ export const rechunk = dual< (n: number) => (self: Stream.Stream) => Stream.Stream, diff --git a/packages/effect/test/Stream/racing.test.ts b/packages/effect/test/Stream/racing.test.ts new file mode 100644 index 0000000000..18cd6488f8 --- /dev/null +++ b/packages/effect/test/Stream/racing.test.ts @@ -0,0 +1,21 @@ +import * as Chunk from "effect/Chunk" +import * as Effect from "effect/Effect" +import * as Stream from "effect/Stream" +import * as it from "effect/test/utils/extend" +import { assert, describe } from "vitest" + +describe("Stream", () => { + it.effect("raceAll", () => + Effect.gen(function*($) { + const result = yield* $( + Stream.raceAll( + Stream.make(0, 1, 2, 3), + Stream.make(4, 5, 6, 7), + Stream.make(7, 8, 9, 10) + ), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray) + ) + assert.deepStrictEqual(result, [0, 1, 2, 3]) + })) +}) From cd8d8186463c2863bf2f875328d29bb8a6dda9dd Mon Sep 17 00:00:00 2001 From: bowzee Date: Tue, 2 Jul 2024 14:27:25 +0300 Subject: [PATCH 2/9] fix(Stream/raceAll): use haltWhenDeferred instead of takeWhile --- packages/effect/src/internal/stream.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 3d8a32ad45..58426ea3b9 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -4697,7 +4697,6 @@ export const raceAll = >>( return unwrap( Effect.gen(function*() { const halts = yield* Effect.all(streams.map(() => Deferred.make())) - let finishes = streams.map(() => false) return mergeAll( streams.map((s, index) => s.pipe( @@ -4706,13 +4705,12 @@ export const raceAll = >>( return Effect.void } finished = true - finishes = finishes.map((_, i) => !(i === index)) return Effect.all( halts.map((def, i) => i === index ? Effect.void : Deferred.succeed(def, void 0)) ) }), - takeWhile(() => !finishes[index]), - interruptWhenDeferred(halts[index]!) + interruptWhenDeferred(halts[index]!), + haltWhenDeferred(halts[index]!) ) ), { concurrency: "unbounded" } From bc3d1167a5ff443fef5c8ad5f476fcec2d863928 Mon Sep 17 00:00:00 2001 From: bowzee Date: Thu, 4 Jul 2024 07:04:56 +0300 Subject: [PATCH 3/9] fix(Stream/raceAll): after-review --- packages/effect/src/Stream.ts | 10 ++--- packages/effect/src/internal/stream.ts | 59 ++++++++++++++------------ 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 9cda50ba2d..991b15d1a4 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2716,12 +2716,12 @@ export const provideSomeLayer: { * @since 3.5.0 * @category utils */ -export const raceAll: >>( - ...streams: T +export const raceAll: >>( + ...streams: S ) => Stream< - [T[number]] extends [never] ? never : T[number] extends Stream ? A : never, - [T[number]] extends [never] ? never : T[number] extends Stream ? E : never, - [T[number]] extends [never] ? never : T[number] extends Stream ? R : never + Stream.Success, + Stream.Error, + Stream.Context > = internal.raceAll /** diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 58426ea3b9..83f2d7d6f5 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -4686,38 +4686,43 @@ export const range = (min: number, max: number, chunkSize = DefaultChunkSize): S return new StreamImpl(go(min, max, chunkSize)) }) -export const raceAll = >>( - ...streams: T +export const raceAll = >>( + ...streams: S ): Stream.Stream< - [T[number]] extends [never] ? never : T[number] extends Stream.Stream ? A : never, - [T[number]] extends [never] ? never : T[number] extends Stream.Stream ? E : never, - [T[number]] extends [never] ? never : T[number] extends Stream.Stream ? R : never -> => { - let finished = false - return unwrap( - Effect.gen(function*() { - const halts = yield* Effect.all(streams.map(() => Deferred.make())) + Stream.Stream.Success, + Stream.Stream.Error, + Stream.Stream.Context +> => + Deferred.make().pipe( + Effect.map((halt) => { + let winner: number | null = null; return mergeAll( - streams.map((s, index) => - s.pipe( - tap(() => { - if (finished) { - return Effect.void + streams.map((stream, index) => + stream.pipe( + takeWhile(() => { + if (winner === index) { + return true; + } else if (winner === null) { + winner = index; + Deferred.unsafeDone(halt, Exit.void); + return true; } - finished = true - return Effect.all( - halts.map((def, i) => i === index ? Effect.void : Deferred.succeed(def, void 0)) - ) + return false; }), - interruptWhenDeferred(halts[index]!), - haltWhenDeferred(halts[index]!) - ) + interruptWhen( + Deferred.await(halt).pipe( + Effect.flatMap(() => + winner === index ? Effect.never : Effect.void, + ), + ), + ), + ), ), - { concurrency: "unbounded" } - ) - }) - ) -} + { concurrency: streams.length }, + ); + }), + unwrap, + ); /** @internal */ export const rechunk = dual< From 801a13345c5c89d940a89bee858661312daf4930 Mon Sep 17 00:00:00 2001 From: bowzee Date: Thu, 4 Jul 2024 07:14:46 +0300 Subject: [PATCH 4/9] test(Stream/raceAll): add async tests --- packages/effect/test/Stream/racing.test.ts | 38 +++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/packages/effect/test/Stream/racing.test.ts b/packages/effect/test/Stream/racing.test.ts index 18cd6488f8..ed0e0e5a6f 100644 --- a/packages/effect/test/Stream/racing.test.ts +++ b/packages/effect/test/Stream/racing.test.ts @@ -1,11 +1,14 @@ import * as Chunk from "effect/Chunk" import * as Effect from "effect/Effect" +import * as Fiber from "effect/Fiber" +import * as Schedule from "effect/Schedule" import * as Stream from "effect/Stream" import * as it from "effect/test/utils/extend" +import * as TestClock from "effect/TestClock" import { assert, describe } from "vitest" describe("Stream", () => { - it.effect("raceAll", () => + it.effect("raceAll sync", () => Effect.gen(function*($) { const result = yield* $( Stream.raceAll( @@ -18,4 +21,37 @@ describe("Stream", () => { ) assert.deepStrictEqual(result, [0, 1, 2, 3]) })) + + it.effect("raceAll async", () => + Effect.gen(function*($) { + const fiber = yield* $( + Stream.raceAll( + Stream.fromSchedule(Schedule.spaced("1 second")), + Stream.fromSchedule(Schedule.spaced("2 second")) + ), + Stream.take(5), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray), + Effect.fork + ) + yield* TestClock.adjust("5 second") + const result = yield* Fiber.join(fiber) + assert.deepStrictEqual(result, [0, 1, 2, 3, 4]) + })) + + it.effect("raceAll combined sync + async", () => + Effect.gen(function*($) { + const fiber = yield* $( + Stream.raceAll( + Stream.fromSchedule(Schedule.spaced("1 second")), + Stream.make(1, 2, 3) + ), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray), + Effect.fork + ) + yield* TestClock.adjust("0 millis") + const result = yield* Fiber.join(fiber) + assert.deepStrictEqual(result, [1, 2, 3]) + })) }) From f2b536edfeb2d7666319e53b0434ffefec096d88 Mon Sep 17 00:00:00 2001 From: bowzee Date: Thu, 4 Jul 2024 07:18:28 +0300 Subject: [PATCH 5/9] test(Stream/raceAll): simplify combined sync+async test --- packages/effect/test/Stream/racing.test.ts | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/packages/effect/test/Stream/racing.test.ts b/packages/effect/test/Stream/racing.test.ts index ed0e0e5a6f..6c3f848ee4 100644 --- a/packages/effect/test/Stream/racing.test.ts +++ b/packages/effect/test/Stream/racing.test.ts @@ -39,19 +39,16 @@ describe("Stream", () => { assert.deepStrictEqual(result, [0, 1, 2, 3, 4]) })) - it.effect("raceAll combined sync + async", () => + it.effect("raceAll combined async + sync", () => Effect.gen(function*($) { - const fiber = yield* $( + const result = yield* $( Stream.raceAll( Stream.fromSchedule(Schedule.spaced("1 second")), - Stream.make(1, 2, 3) + Stream.make(0, 1, 2, 3) ), Stream.runCollect, - Effect.map(Chunk.toReadonlyArray), - Effect.fork + Effect.map(Chunk.toReadonlyArray) ) - yield* TestClock.adjust("0 millis") - const result = yield* Fiber.join(fiber) - assert.deepStrictEqual(result, [1, 2, 3]) + assert.deepStrictEqual(result, [0, 1, 2, 3]) })) }) From 9e4f134109d5921db7173458392aaac2ac649e47 Mon Sep 17 00:00:00 2001 From: bowzee Date: Thu, 4 Jul 2024 07:19:12 +0300 Subject: [PATCH 6/9] test(Stream/raceAll): add "combined sync + async" test --- packages/effect/test/Stream/racing.test.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/packages/effect/test/Stream/racing.test.ts b/packages/effect/test/Stream/racing.test.ts index 6c3f848ee4..dbd7b58a42 100644 --- a/packages/effect/test/Stream/racing.test.ts +++ b/packages/effect/test/Stream/racing.test.ts @@ -51,4 +51,17 @@ describe("Stream", () => { ) assert.deepStrictEqual(result, [0, 1, 2, 3]) })) + + it.effect("raceAll combined sync + async", () => + Effect.gen(function*($) { + const result = yield* $( + Stream.raceAll( + Stream.make(0, 1, 2, 3), + Stream.fromSchedule(Schedule.spaced("1 second")) + ), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray) + ) + assert.deepStrictEqual(result, [0, 1, 2, 3]) + })) }) From ff58c534bfc8b38cb6230e7314caa286eeb115af Mon Sep 17 00:00:00 2001 From: bowzee Date: Thu, 4 Jul 2024 07:51:04 +0300 Subject: [PATCH 7/9] refactor(Stream/raceAll): improve readability --- packages/effect/src/internal/stream.ts | 32 +++++++++++--------------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 83f2d7d6f5..6b39555748 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -4695,34 +4695,30 @@ export const raceAll = >>( > => Deferred.make().pipe( Effect.map((halt) => { - let winner: number | null = null; + let winner: number | null = null return mergeAll( streams.map((stream, index) => stream.pipe( takeWhile(() => { - if (winner === index) { - return true; - } else if (winner === null) { - winner = index; - Deferred.unsafeDone(halt, Exit.void); - return true; + if (winner === null) { + winner = index + Deferred.unsafeDone(halt, Exit.void) + return true } - return false; + return winner === index }), interruptWhen( Deferred.await(halt).pipe( - Effect.flatMap(() => - winner === index ? Effect.never : Effect.void, - ), - ), - ), - ), + Effect.flatMap(() => winner === index ? Effect.never : Effect.void) + ) + ) + ) ), - { concurrency: streams.length }, - ); + { concurrency: streams.length } + ) }), - unwrap, - ); + unwrap + ) /** @internal */ export const rechunk = dual< From 51a44fc1c6aa93d6787bb022b3fc7054a8036a08 Mon Sep 17 00:00:00 2001 From: bowzee Date: Thu, 4 Jul 2024 07:51:16 +0300 Subject: [PATCH 8/9] refactor(Stream/raceAll): improve documentation --- .changeset/stream-race-all.md | 25 ++++++++++++++++++++++++- packages/effect/src/Stream.ts | 6 +++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/.changeset/stream-race-all.md b/.changeset/stream-race-all.md index 2a37d55202..f31c5920d0 100644 --- a/.changeset/stream-race-all.md +++ b/.changeset/stream-race-all.md @@ -2,4 +2,27 @@ "effect": minor --- -feat(Stream): raceAll \ No newline at end of file +feat(Stream): implement "raceAll" operator, which returns a stream that mirrors the first source stream to emit an item. + + +```ts +import { Stream, Schedule, Console, Effect } from "effect" + +const stream = Stream.raceAll( + Stream.fromSchedule(Schedule.spaced('1 millis')), + Stream.fromSchedule(Schedule.spaced('2 millis')), + Stream.fromSchedule(Schedule.spaced('4 millis')) +).pipe( + Stream.take(6), + Stream.tap(Console.log) +) + +Effect.runPromise(Stream.runDrain(stream)) +// Output only from the first stream, the rest streams are interrupted +// 0 +// 1 +// 2 +// 3 +// 4 +// 5 +``` \ No newline at end of file diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 991b15d1a4..81334ba15f 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2695,7 +2695,11 @@ export const provideSomeLayer: { } = internal.provideSomeLayer /** - * Returns a stream that mirrors the first source stream to emit an item. + * Returns a stream that mirrors the first upstream to emit an item. + * As soon as one of the upstream emits a first value, all the others are being interrupted. + * The resulting stream will forward all items from the "winning" source stream. + * Any upstream failures will cause the returned stream to fail. + * * @example * import { Stream, Schedule, Console, Effect } from "effect" * From 45c3fadbe799fd12afab09b6d2d4f9c795d0b245 Mon Sep 17 00:00:00 2001 From: Tim Date: Fri, 5 Jul 2024 13:03:47 +1200 Subject: [PATCH 9/9] doc tweaks --- .changeset/stream-race-all.md | 28 ++++++++++++---------------- packages/effect/src/Stream.ts | 10 +++++----- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/.changeset/stream-race-all.md b/.changeset/stream-race-all.md index f31c5920d0..ae50dfabfe 100644 --- a/.changeset/stream-race-all.md +++ b/.changeset/stream-race-all.md @@ -4,25 +4,21 @@ feat(Stream): implement "raceAll" operator, which returns a stream that mirrors the first source stream to emit an item. - ```ts -import { Stream, Schedule, Console, Effect } from "effect" +import { Stream, Schedule, Console, Effect } from "effect"; const stream = Stream.raceAll( - Stream.fromSchedule(Schedule.spaced('1 millis')), - Stream.fromSchedule(Schedule.spaced('2 millis')), - Stream.fromSchedule(Schedule.spaced('4 millis')) -).pipe( - Stream.take(6), - Stream.tap(Console.log) -) + Stream.fromSchedule(Schedule.spaced("1 millis")), + Stream.fromSchedule(Schedule.spaced("2 millis")), + Stream.fromSchedule(Schedule.spaced("4 millis")), +).pipe(Stream.take(6), Stream.tap(Console.log)); -Effect.runPromise(Stream.runDrain(stream)) +Effect.runPromise(Stream.runDrain(stream)); // Output only from the first stream, the rest streams are interrupted -// 0 -// 1 -// 2 -// 3 -// 4 +// 0 +// 1 +// 2 +// 3 +// 4 // 5 -``` \ No newline at end of file +``` diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 81334ba15f..a494f0b9b5 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2696,7 +2696,7 @@ export const provideSomeLayer: { /** * Returns a stream that mirrors the first upstream to emit an item. - * As soon as one of the upstream emits a first value, all the others are being interrupted. + * As soon as one of the upstream emits a first value, all the others are interrupted. * The resulting stream will forward all items from the "winning" source stream. * Any upstream failures will cause the returned stream to fail. * @@ -2704,9 +2704,9 @@ export const provideSomeLayer: { * import { Stream, Schedule, Console, Effect } from "effect" * * const stream = Stream.raceAll( - * Stream.fromSchedule(Schedule.spaced('1 millis')), - * Stream.fromSchedule(Schedule.spaced('2 millis')), - * Stream.fromSchedule(Schedule.spaced('4 millis')), + * Stream.fromSchedule(Schedule.spaced('1 millis')), + * Stream.fromSchedule(Schedule.spaced('2 millis')), + * Stream.fromSchedule(Schedule.spaced('4 millis')), * ).pipe(Stream.take(6), Stream.tap(Console.log)) * * Effect.runPromise(Stream.runDrain(stream)) @@ -2718,7 +2718,7 @@ export const provideSomeLayer: { * // 4 * // 5 * @since 3.5.0 - * @category utils + * @category racing */ export const raceAll: >>( ...streams: S