Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsubprocess: support stdin flow control #6291

Open
chu11 opened this issue Sep 19, 2024 · 3 comments
Open

libsubprocess: support stdin flow control #6291

chu11 opened this issue Sep 19, 2024 · 3 comments

Comments

@chu11
Copy link
Member

chu11 commented Sep 19, 2024

From #4572

Rather than adding write responses, some kind of credit based flow control scheme seems appropriate here. For example (just brainstorming):

  • change the sender so that it only sends up to credits bytes per channel before pausing (stopping for example the stdin fd watcher in > - flux-exec. This probably requires a callback to expose credits to the user? Like on_credit (int bytes)?
  • add a new exec response type for issuing credits for a given stream
  • each stream's receive buffer sends a credit type response initially for the full buffer size
  • then each time some data is taken out of the buffer, send some credits
  • enhancement: only send credit when a low water mark is reached
@chu11 chu11 changed the title libsubprocess; support stdin flow control libsubprocess: support stdin flow control Sep 19, 2024
@chu11
Copy link
Member Author

chu11 commented Sep 20, 2024

hmmmm,

hiccup 1. We can't guarantee how fast each subprocess launched by flux-exec reads data out of its buffer. Some may read from their stdin buffer faster than others. Simple solution is determine the minimum amount that any of the subprocesses can take and only send that much to each one. But now we're bottle necked by the slowest subprocess, and it could be a hang if one of them is stuck.

        p = zlist_first (subprocesses);                                                                                                     
        while (p) {                                                                                                                         
            if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT                                                                           
                || flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {                                                                  
                if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0)                                                                      
                    log_err_exit ("flux_subprocess_write");                                                                                 
            }                                                                                                                               
            p = zlist_next (subprocesses);                                                                                                  
        }                      

sounds acceptable? As I ponder this, it seems that the pros simply outweigh the cons in this scenario.

@garlick
Copy link
Member

garlick commented Sep 20, 2024

That seems expected to me since the consumer is in charge of the rate of date transfer.

@garlick
Copy link
Member

garlick commented Sep 24, 2024

See flux-framework/rfc#427 for a proposed protocol.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants