Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

broadcastDynamic broken #56

Open
mikearnaldi opened this issue Feb 16, 2023 · 4 comments
Open

broadcastDynamic broken #56

mikearnaldi opened this issue Feb 16, 2023 · 4 comments
Labels
bug Something isn't working

Comments

@mikearnaldi
Copy link
Member

The following doesn't return:

import * as Effect from "@effect/io/Effect"
import * as Stream from "@effect/stream/Stream"
import { pipe } from "@fp-ts/core/Function"

pipe(
  Stream.fail("fail"),
  Stream.broadcastDynamic(1),
  Effect.flatMap((_) => Effect.zipPar(Stream.runDrain(_), Stream.runDrain(_))),
  Effect.scoped,
  Effect.catchAllCause(Effect.logErrorCause),
  Effect.runFork
)

the following propagates the failure:

import * as Effect from "@effect/io/Effect"
import * as Stream from "@effect/stream/Stream"
import { pipe } from "@fp-ts/core/Function"

pipe(
  Stream.fail("fail"),
  Stream.broadcastDynamic(1),
  Effect.flatMap((_) => Effect.zip(Stream.runDrain(_), Stream.runDrain(_))),
  Effect.scoped,
  Effect.catchAllCause(Effect.logErrorCause),
  Effect.runFork
)
@mikearnaldi
Copy link
Member Author

import * as Duration from "@effect/data/Duration"
import * as Effect from "@effect/io/Effect"
import * as Stream from "@effect/stream/Stream"
import { pipe } from "@fp-ts/core/Function"

pipe(
  Stream.fromEffect(Effect.delay(Duration.seconds(5))(Effect.fail("fail"))),
  Stream.broadcastDynamic(1),
  Effect.flatMap((_) => Effect.zipPar(Stream.runDrain(_), Stream.runDrain(_))),
  Effect.scoped,
  Effect.catchAllCause(Effect.logErrorCause),
  Effect.runFork
)

This correctly prints out two errors, seems we are not dealing with a new subscriber subscribing to a failed stream that should automatically fail

@mikearnaldi mikearnaldi added the bug Something isn't working label Feb 16, 2023
@IMax153
Copy link
Member

IMax153 commented Feb 16, 2023

@mikearnaldi - have you tested this with ZIO yet to see if it repros there?

@mikearnaldi
Copy link
Member Author

mikearnaldi commented Feb 16, 2023

@mikearnaldi - have you tested this with ZIO yet to see if it repros there?

no not yet, would be good to make a zio project with a nix setup to repro this stuff quickly

@IMax153
Copy link
Member

IMax153 commented Jun 26, 2023

@mikearnaldi - I know it's been a while on this issue, but I've run the following two programs in the scala-playground repo using the latest ZIO via scala-cli and they both hang (i.e. neither terminates).

//> using dep dev.zio::zio:2.0.15
//> using dep dev.zio::zio-streams:2.0.15

package com.effect.playground

import zio._
import zio.stream._

object Main extends ZIOAppDefault {

  val program1 = ZIO
    .scoped {
      ZStream
        .fail("fail")
        .broadcastDynamic(1)
        .flatMap((stream) => stream.runDrain.zipPar(stream.runDrain))
    }
    .catchAllCause(ZIO.logErrorCause(_))

  val program2 = ZIO
    .scoped {
      ZStream
        .fail("fail")
        .broadcastDynamic(1)
        .flatMap((stream) => stream.runDrain.zip(stream.runDrain))
    }
    .catchAllCause(ZIO.logErrorCause(_))

  val run = program1

}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants