Skip to content

Commit

Permalink
fix(Stream/raceAll): use haltWhenDeferred instead of interruptWithDef…
Browse files Browse the repository at this point in the history
…erred + takeWhile
  • Loading branch information
dilame authored and gcanti committed Jul 2, 2024
1 parent dddb378 commit fa4caae
Showing 1 changed file with 1 addition and 4 deletions.
5 changes: 1 addition & 4 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4697,7 +4697,6 @@ export const raceAll = <T extends ReadonlyArray<Stream.Stream<any, any, any>>>(
return unwrap(
Effect.gen(function*() {
const halts = yield* Effect.all(streams.map(() => Deferred.make<void>()))
let finishes = streams.map(() => false)
return mergeAll(
streams.map((s, index) =>
s.pipe(
Expand All @@ -4706,13 +4705,11 @@ export const raceAll = <T extends ReadonlyArray<Stream.Stream<any, any, any>>>(
return Effect.void
}
finished = true
finishes = finishes.map((_, i) => !(i === index))
return Effect.all(
halts.map((def, i) => i === index ? Effect.void : Deferred.succeed(def, void 0))
)
}),
takeWhile(() => !finishes[index]),
interruptWhenDeferred(halts[index]!)
haltWhenDeferred(halts[index]!)
)
),
{ concurrency: "unbounded" }
Expand Down

0 comments on commit fa4caae

Please sign in to comment.