Skip to content

Commit

Permalink
feat(Stream): raceAll implementation (#3131)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim <hello@timsmart.co>
  • Loading branch information
2 people authored and gcanti committed Jul 5, 2024
1 parent 3f99844 commit 8ab2b2c
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 0 deletions.
24 changes: 24 additions & 0 deletions .changeset/stream-race-all.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"effect": minor
---

feat(Stream): implement "raceAll" operator, which returns a stream that mirrors the first source stream to emit an item.

```ts
import { Stream, Schedule, Console, Effect } from "effect";

const stream = Stream.raceAll(
Stream.fromSchedule(Schedule.spaced("1 millis")),
Stream.fromSchedule(Schedule.spaced("2 millis")),
Stream.fromSchedule(Schedule.spaced("4 millis")),
).pipe(Stream.take(6), Stream.tap(Console.log));

Effect.runPromise(Stream.runDrain(stream));
// Output only from the first stream, the rest streams are interrupted
// 0
// 1
// 2
// 3
// 4
// 5
```
34 changes: 34 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2694,6 +2694,40 @@ export const provideSomeLayer: {
): Stream<A, E | E2, RIn | Exclude<R, ROut>>
} = internal.provideSomeLayer

/**
* Returns a stream that mirrors the first upstream to emit an item.
* As soon as one of the upstream emits a first value, all the others are interrupted.
* The resulting stream will forward all items from the "winning" source stream.
* Any upstream failures will cause the returned stream to fail.
*
* @example
* import { Stream, Schedule, Console, Effect } from "effect"
*
* const stream = Stream.raceAll(
* Stream.fromSchedule(Schedule.spaced('1 millis')),
* Stream.fromSchedule(Schedule.spaced('2 millis')),
* Stream.fromSchedule(Schedule.spaced('4 millis')),
* ).pipe(Stream.take(6), Stream.tap(Console.log))
*
* Effect.runPromise(Stream.runDrain(stream))
* // Output each millisecond from the first stream, the rest streams are interrupted
* // 0
* // 1
* // 2
* // 3
* // 4
* // 5
* @since 3.5.0
* @category racing
*/
export const raceAll: <S extends ReadonlyArray<Stream<any, any, any>>>(
...streams: S
) => Stream<
Stream.Success<S[number]>,
Stream.Error<S[number]>,
Stream.Context<S[number]>
> = internal.raceAll

/**
* Constructs a stream from a range of integers, including both endpoints.
*
Expand Down
34 changes: 34 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4686,6 +4686,40 @@ export const range = (min: number, max: number, chunkSize = DefaultChunkSize): S
return new StreamImpl(go(min, max, chunkSize))
})

export const raceAll = <S extends ReadonlyArray<Stream.Stream<any, any, any>>>(
...streams: S
): Stream.Stream<
Stream.Stream.Success<S[number]>,
Stream.Stream.Error<S[number]>,
Stream.Stream.Context<S[number]>
> =>
Deferred.make<void>().pipe(
Effect.map((halt) => {
let winner: number | null = null
return mergeAll(
streams.map((stream, index) =>
stream.pipe(
takeWhile(() => {
if (winner === null) {
winner = index
Deferred.unsafeDone(halt, Exit.void)
return true
}
return winner === index
}),
interruptWhen(
Deferred.await(halt).pipe(
Effect.flatMap(() => winner === index ? Effect.never : Effect.void)
)
)
)
),
{ concurrency: streams.length }
)
}),
unwrap
)

/** @internal */
export const rechunk = dual<
(n: number) => <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A, E, R>,
Expand Down
67 changes: 67 additions & 0 deletions packages/effect/test/Stream/racing.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
import * as Fiber from "effect/Fiber"
import * as Schedule from "effect/Schedule"
import * as Stream from "effect/Stream"
import * as it from "effect/test/utils/extend"
import * as TestClock from "effect/TestClock"
import { assert, describe } from "vitest"

describe("Stream", () => {
it.effect("raceAll sync", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.raceAll(
Stream.make(0, 1, 2, 3),
Stream.make(4, 5, 6, 7),
Stream.make(7, 8, 9, 10)
),
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray)
)
assert.deepStrictEqual(result, [0, 1, 2, 3])
}))

it.effect("raceAll async", () =>
Effect.gen(function*($) {
const fiber = yield* $(
Stream.raceAll(
Stream.fromSchedule(Schedule.spaced("1 second")),
Stream.fromSchedule(Schedule.spaced("2 second"))
),
Stream.take(5),
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray),
Effect.fork
)
yield* TestClock.adjust("5 second")
const result = yield* Fiber.join(fiber)
assert.deepStrictEqual(result, [0, 1, 2, 3, 4])
}))

it.effect("raceAll combined async + sync", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.raceAll(
Stream.fromSchedule(Schedule.spaced("1 second")),
Stream.make(0, 1, 2, 3)
),
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray)
)
assert.deepStrictEqual(result, [0, 1, 2, 3])
}))

it.effect("raceAll combined sync + async", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.raceAll(
Stream.make(0, 1, 2, 3),
Stream.fromSchedule(Schedule.spaced("1 second"))
),
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray)
)
assert.deepStrictEqual(result, [0, 1, 2, 3])
}))
})

0 comments on commit 8ab2b2c

Please sign in to comment.