Skip to content

Commit

Permalink
feat(Stream): raceAll
Browse files Browse the repository at this point in the history
  • Loading branch information
dilame committed Jun 30, 2024
1 parent 95bab19 commit a8abfa4
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/stream-race-all.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

feat(Stream): raceAll
30 changes: 30 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2664,6 +2664,36 @@ export const provideSomeLayer: {
): Stream<A, E | E2, RIn | Exclude<R, ROut>>
} = internal.provideSomeLayer

/**
* Returns a stream that mirrors the first source stream to emit an item.
* @example
* import { Stream, Schedule, Console, Effect } from "effect"
*
* const stream = Stream.raceAll(
* Stream.fromSchedule(Schedule.spaced('1 millis')),
* Stream.fromSchedule(Schedule.spaced('2 millis')),
* Stream.fromSchedule(Schedule.spaced('4 millis')),
* ).pipe(Stream.take(6), Stream.tap(Console.log))
*
* Effect.runPromise(Stream.runDrain(stream))
* // Output each millisecond from the first stream, the rest streams are interrupted
* // 0
* // 1
* // 2
* // 3
* // 4
* // 5
* @since 3.5.0
* @category utils
*/
export const raceAll: <T extends ReadonlyArray<Stream<any, any, any>>>(
...streams: T
) => Stream<
[T[number]] extends [never] ? never : T[number] extends Stream<infer A, any, any> ? A : never,
[T[number]] extends [never] ? never : T[number] extends Stream<any, infer E, any> ? E : never,
[T[number]] extends [never] ? never : T[number] extends Stream<any, any, infer R> ? R : never
> = internal.raceAll

/**
* Constructs a stream from a range of integers, including both endpoints.
*
Expand Down
35 changes: 35 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4686,6 +4686,41 @@ export const range = (min: number, max: number, chunkSize = DefaultChunkSize): S
return new StreamImpl(go(min, max, chunkSize))
})

export const raceAll = <T extends ReadonlyArray<Stream.Stream<any, any, any>>>(
...streams: T
): Stream.Stream<
[T[number]] extends [never] ? never : T[number] extends Stream.Stream<infer A, any, any> ? A : never,
[T[number]] extends [never] ? never : T[number] extends Stream.Stream<any, infer E, any> ? E : never,
[T[number]] extends [never] ? never : T[number] extends Stream.Stream<any, any, infer R> ? R : never
> => {
let finished = false
return unwrap(
Effect.gen(function*() {
const halts = yield* Effect.all(streams.map(() => Deferred.make<void>()))
let finishes = streams.map(() => false)
return mergeAll(
streams.map((s, index) =>
s.pipe(
tap(() => {
if (finished) {
return Effect.void
}
finished = true
finishes = finishes.map((_, i) => !(i === index))
return Effect.all(
halts.map((def, i) => i === index ? Effect.void : Deferred.succeed(def, void 0))
)
}),
takeWhile(() => !finishes[index]),
interruptWhenDeferred(halts[index]!)
)
),
{ concurrency: "unbounded" }
)
})
)
}

/** @internal */
export const rechunk = dual<
(n: number) => <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A, E, R>,
Expand Down
21 changes: 21 additions & 0 deletions packages/effect/test/Stream/racing.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
import * as Stream from "effect/Stream"
import * as it from "effect/test/utils/extend"
import { assert, describe } from "vitest"

describe("Stream", () => {
it.effect("raceAll", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.raceAll(
Stream.make(0, 1, 2, 3),
Stream.make(4, 5, 6, 7),
Stream.make(7, 8, 9, 10)
),
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray)
)
assert.deepStrictEqual(result, [0, 1, 2, 3])
}))
})

0 comments on commit a8abfa4

Please sign in to comment.