Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream.interruptWhenDeferred doesn't instantly interrupt the stream #3141

Closed
dilame opened this issue Jul 2, 2024 · 3 comments
Closed

Stream.interruptWhenDeferred doesn't instantly interrupt the stream #3141

dilame opened this issue Jul 2, 2024 · 3 comments

Comments

@dilame
Copy link
Contributor

dilame commented Jul 2, 2024

What version of Effect is running?

3.4.6

What steps can reproduce the bug?

When using Stream.interruptWhenDeferred in conjunction with Deferred.succeed, the stream is not interrupted as expected. Instead of the stream being interrupted after the first element, it continues to process all elements. This behavior is contrary to the expected functionality of Stream.interruptWhenDeferred.

import { Deferred, Effect, Stream } from 'effect';
import * as console from 'node:console';

const s1 = Stream.make(1, 2, 3)

const program = Effect.gen(function* () {
  const interrupt = yield* Deferred.make<void>();
  return yield* s1.pipe(
    Stream.tap(() => Deferred.succeed(interrupt, void 0)),
    Stream.interruptWhenDeferred(interrupt),
    Stream.runCollect,
  );
});

Effect.runPromise(program).then(
  (r) => console.log(r),
  (e) => console.error(e),
);

What is the expected behavior?

{ _id: 'Chunk', values: [ 1 ] }

What do you see instead?

{ _id: 'Chunk', values: [ 1, 2, 3 ] }

Additional information

This issue affects the ability to properly interrupt streams using Deferred, leading to potential performance and logical errors in applications relying on this behavior.

@dilame dilame added the bug Something isn't working label Jul 2, 2024
@mikearnaldi mikearnaldi added working as intended and removed bug Something isn't working labels Jul 3, 2024
@mikearnaldi
Copy link
Member

The stream is fully synchronous and there is no chance of interrupting a sync op, if you add a delay it works as expected:

import { Deferred, Effect, Stream } from "effect"
import * as console from "node:console"

const s1 = Stream.make(1, 2, 3).pipe(
  Stream.tap(() => Effect.sleep("100 millis"))
)

const program = Effect.gen(function*() {
  const interrupt = yield* Deferred.make<void>()
  return yield* s1.pipe(
    Stream.tap(() => Deferred.succeed(interrupt, void 0)),
    Stream.interruptWhenDeferred(interrupt),
    Stream.runCollect
  )
})

Effect.runPromise(program).then(
  (r) => console.log(r),
  (e) => console.error(e)
)

@mikearnaldi
Copy link
Member

Similarly if you add a yield with a lower priority than 0:

import { Deferred, Effect, Stream } from "effect"
import * as console from "node:console"

const s1 = Stream.make(1, 2, 3).pipe(
  Stream.tap(() => Effect.yieldNow({ priority: 1 }))
)

const program = Effect.gen(function*() {
  const interrupt = yield* Deferred.make<void>()
  return yield* s1.pipe(
    Stream.tap(() => Deferred.succeed(interrupt, void 0)),
    Stream.interruptWhenDeferred(interrupt),
    Stream.runCollect
  )
})

Effect.runPromise(program).then(
  (r) => console.log(r),
  (e) => console.error(e)
)

@IMax153
Copy link
Member

IMax153 commented Jul 10, 2024

Going to close this as it's working as intended.

@IMax153 IMax153 closed this as completed Jul 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants