Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Stream): raceAll implementation #3131

Merged
merged 9 commits into from
Jul 5, 2024
24 changes: 24 additions & 0 deletions .changeset/stream-race-all.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"effect": minor
---

feat(Stream): implement "raceAll" operator, which returns a stream that mirrors the first source stream to emit an item.

```ts
import { Stream, Schedule, Console, Effect } from "effect";

const stream = Stream.raceAll(
Stream.fromSchedule(Schedule.spaced("1 millis")),
Stream.fromSchedule(Schedule.spaced("2 millis")),
Stream.fromSchedule(Schedule.spaced("4 millis")),
).pipe(Stream.take(6), Stream.tap(Console.log));

Effect.runPromise(Stream.runDrain(stream));
// Output only from the first stream, the rest streams are interrupted
// 0
// 1
// 2
// 3
// 4
// 5
```
34 changes: 34 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2694,6 +2694,40 @@ export const provideSomeLayer: {
): Stream<A, E | E2, RIn | Exclude<R, ROut>>
} = internal.provideSomeLayer

/**
* Returns a stream that mirrors the first upstream to emit an item.
* As soon as one of the upstream emits a first value, all the others are interrupted.
* The resulting stream will forward all items from the "winning" source stream.
* Any upstream failures will cause the returned stream to fail.
*
* @example
* import { Stream, Schedule, Console, Effect } from "effect"
*
* const stream = Stream.raceAll(
* Stream.fromSchedule(Schedule.spaced('1 millis')),
* Stream.fromSchedule(Schedule.spaced('2 millis')),
* Stream.fromSchedule(Schedule.spaced('4 millis')),
* ).pipe(Stream.take(6), Stream.tap(Console.log))
*
* Effect.runPromise(Stream.runDrain(stream))
* // Output each millisecond from the first stream, the rest streams are interrupted
* // 0
* // 1
* // 2
* // 3
* // 4
* // 5
* @since 3.5.0
* @category racing
*/
export const raceAll: <S extends ReadonlyArray<Stream<any, any, any>>>(
...streams: S
) => Stream<
Stream.Success<S[number]>,
Stream.Error<S[number]>,
Stream.Context<S[number]>
> = internal.raceAll

/**
* Constructs a stream from a range of integers, including both endpoints.
*
Expand Down
34 changes: 34 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4686,6 +4686,40 @@ export const range = (min: number, max: number, chunkSize = DefaultChunkSize): S
return new StreamImpl(go(min, max, chunkSize))
})

export const raceAll = <S extends ReadonlyArray<Stream.Stream<any, any, any>>>(
...streams: S
): Stream.Stream<
Stream.Stream.Success<S[number]>,
Stream.Stream.Error<S[number]>,
Stream.Stream.Context<S[number]>
> =>
Deferred.make<void>().pipe(
Effect.map((halt) => {
let winner: number | null = null
return mergeAll(
streams.map((stream, index) =>
stream.pipe(
takeWhile(() => {
if (winner === null) {
winner = index
Deferred.unsafeDone(halt, Exit.void)
return true
}
return winner === index
}),
interruptWhen(
Deferred.await(halt).pipe(
Effect.flatMap(() => winner === index ? Effect.never : Effect.void)
)
)
)
),
{ concurrency: streams.length }
)
}),
unwrap
)

/** @internal */
export const rechunk = dual<
(n: number) => <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A, E, R>,
Expand Down
67 changes: 67 additions & 0 deletions packages/effect/test/Stream/racing.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
import * as Fiber from "effect/Fiber"
import * as Schedule from "effect/Schedule"
import * as Stream from "effect/Stream"
import * as it from "effect/test/utils/extend"
import * as TestClock from "effect/TestClock"
import { assert, describe } from "vitest"

describe("Stream", () => {
it.effect("raceAll sync", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.raceAll(
Stream.make(0, 1, 2, 3),
Stream.make(4, 5, 6, 7),
Stream.make(7, 8, 9, 10)
),
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray)
)
assert.deepStrictEqual(result, [0, 1, 2, 3])
}))

it.effect("raceAll async", () =>
Effect.gen(function*($) {
const fiber = yield* $(
Stream.raceAll(
Stream.fromSchedule(Schedule.spaced("1 second")),
Stream.fromSchedule(Schedule.spaced("2 second"))
),
Stream.take(5),
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray),
Effect.fork
)
yield* TestClock.adjust("5 second")
const result = yield* Fiber.join(fiber)
assert.deepStrictEqual(result, [0, 1, 2, 3, 4])
}))

it.effect("raceAll combined async + sync", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.raceAll(
Stream.fromSchedule(Schedule.spaced("1 second")),
Stream.make(0, 1, 2, 3)
),
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray)
)
assert.deepStrictEqual(result, [0, 1, 2, 3])
}))

it.effect("raceAll combined sync + async", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.raceAll(
Stream.make(0, 1, 2, 3),
Stream.fromSchedule(Schedule.spaced("1 second"))
),
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray)
)
assert.deepStrictEqual(result, [0, 1, 2, 3])
}))
})