Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiStreamController.isPaused remains true after resume() #56915

Open
sgrekhov opened this issue Oct 18, 2024 · 4 comments
Open

MultiStreamController.isPaused remains true after resume() #56915

sgrekhov opened this issue Oct 18, 2024 · 4 comments
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-async type-documentation A request to add or improve documentation

Comments

@sgrekhov
Copy link
Contributor

The following test fails. Why?

import "dart:async";
import "../../../Utils/expect.dart";

var theController;

main() {
  asyncStart();
  var stream = Stream<int>.multi((controller) {
    theController = controller;
    Expect.isFalse(controller.isPaused);
    controller.add(1);
    controller.add(2);
    controller.add(3);
  });
  listen(stream);
}

void listen(Stream<int> stream) {
  late StreamSubscription ss;
  ss = stream.listen((v) {
    if (v == 1) {
      ss.pause(Future.delayed(Duration(milliseconds: 100)));
    } else {
      Expect.isFalse(theController.isPaused); //Expect.isFalse(true) fails.
    }
  }, onDone: asyncEnd);
}

cc @lrhn

@dart-github-bot
Copy link
Collaborator

Summary: The MultiStreamController.isPaused property remains true after calling resume() on a paused stream subscription, even though the stream itself is no longer paused. This leads to incorrect behavior in the provided test case.

@dart-github-bot dart-github-bot added area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. triage-automation See https://github.com/dart-lang/ecosystem/tree/main/pkgs/sdk_triage_bot. type-bug Incorrect behavior (everything from a crash to more subtle misbehavior) labels Oct 18, 2024
@lrhn
Copy link
Member

lrhn commented Oct 18, 2024

TL;DR: Because stream-controllers lie about pause. It's deliberate.

In the optimal case, there is never any, or at least more than one, enqueued events. Events are delivered as soon as possible (first available microtask slot), and an event producer shouldn't produce events faster than they can be consumed. That leads to buffering, and consumers are expected to pause if they can't keep up with events.

When a stream subscription is paused, the consumer has said that it can't keep up with events.
The producer shouldn't add any events while it's paused.
If they do so (or over-delivered before the pause), and the subscription is resumed, the subscription keeps telling the controller that it's paused ("don't send more events yet") until it has cleared its pending events, until it has caught up with events.

That's not special to multi-stream controllers, you get the same effect using:

main() {
  asyncStart();
  var controller = StreamController<int>();
  controller.onListen = () {
    Expect.isFalse(controller.isPaused);
    controller.add(1);
    controller.add(2);
    controller.add(3);
  };
  theController = controller;
  var stream = controller.stream;
  listen(stream);
}

(It's not unsurprising that they behave the same way, they all use the same code. The StreamSubscription code doesn't care what kind of Stream it comes from, it just receives events and sends flow-control requests — pause/resume/cancel — the other way.)

If you add

    Timer(Duration(milliseconds: 200), () {
      controller.add(4);
    });

after the controller.add(3), you can see that that event is sent after pending event queue has been processed, and the controller is told that it's OK to send more events.

A stream subscription has two flags internally: "isPaused" (a counter, actually) and "input-paused" which is what it reports to the controller. When the subscription user's pause is resumed, if the queue is not empty, the input-paused flag isn't cleared, the pending events are just scheduled. When the last pending event has been delivered, and the subscription isn't paused, the internal-paused flag is cleared.
(The actual code is:

void resume() {
    if (_isCanceled) return;
    if (_isPaused) {
      _decrementPauseCount();
      if (!_isPaused) {
        if (_hasPending && !_pending!.isEmpty) {
          // Input is still paused.
          _pending!.schedule(this);
        } else {
          assert(_mayResumeInput);
          _state &= ~_STATE_INPUT_PAUSED;
          if (!_inCallback) _guardCallback(_onResume);
        }
      }
    }
  }

Testing a controller by filling up its pending event queue with everything during the onListen call dosen't cover all its behaviors. That's one kind of behavior. Another would be events occurring at fixed intervales, like incoming network packages, whether the subscription is listening or not, or a stream which only produces new events while the subscription isn't paused.

(Also remember to close the controller.)

@lrhn lrhn added type-question A question about expected behavior or functionality and removed type-bug Incorrect behavior (everything from a crash to more subtle misbehavior) triage-automation See https://github.com/dart-lang/ecosystem/tree/main/pkgs/sdk_triage_bot. labels Oct 18, 2024
@sgrekhov
Copy link
Contributor Author

Thank you for the explanation. Then, I'm going to change the test to:

...
    if (v == 1) {
      ss.pause(Future.delayed(Duration(milliseconds: 100)));
    } else {
      Expect.isTrue(theController.isPaused); // It's Ok. See https://github.com/dart-lang/sdk/issues/56915 for more details.
    }
...

@lrhn
Copy link
Member

lrhn commented Oct 18, 2024

That should be correct. If we ever change behavior (and we haven't so far, so we probably won't) then this test will catch it.

@lrhn lrhn added type-documentation A request to add or improve documentation and removed type-question A question about expected behavior or functionality labels Oct 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-async type-documentation A request to add or improve documentation
Projects
None yet
Development

No branches or pull requests

3 participants