-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
libsubprocess: do not spin on large lines #6281
libsubprocess: do not spin on large lines #6281
Conversation
Nice! does this fix #4572 also? |
Hmm, I don't think so. The |
cc9b4fe
to
898611b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See inline comment - it seems like the second problem may be more of a documentation one?
/* At the end of the day, there is a core assumption that | ||
* users will not ignore reading data when they are told | ||
* there is data to read. | ||
* | ||
* If the user didn't read anything above and we're out of | ||
* buffer space, we gotta do something otherwise we will | ||
* spin (i.e. the io watcher is currently stopped, it | ||
* can't be restarted b/c the user isn't reading data, | ||
* etc.) | ||
* | ||
* we could stop the ev watchers (prep, check, idle, and | ||
* io), but this results in little ability to control | ||
* "fallout" from a watcher just (effectively) exiting out | ||
* of the blue. From caller perspectives, it may have | ||
* exited cleanly. | ||
* | ||
* we choose to dump the buffer contents instead. | ||
* Unfortunately this leads to loss of data and no error | ||
* message. However in authors opinion, it is a "cleaner" | ||
* fallout. | ||
* | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pondering this a bit, it seems like this ev watcher ought to behave similar to the libev ev_io watcher:
http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#code_ev_io_code_is_this_file_descrip
That is, it should be level triggered (keep calling callback while data is available) and should be stoppable in the watcher callback. This change would drop data if the watcher chose to stop itself rather than consume the data so I think it would not be correct.
It may be good to add some inline documentation though (I guess in fbuf_watcher.h
?) that the watchers are level triggered and that the read watcher must either read some data or stop the watcher to avoid unnecessary trips through the event loop without progress.
Also, maybe add documentation in subprocess.h that the output callbacks should either consume data or call flux_subprocess_stream_stop()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this PR should just ensure that, when line buffered and the buffer is full, the callback gets called even there isn't a line terminator, and level triggering documentation (and any fixes for in-tree inappropriate uses, if any) could go in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this PR should just ensure that, when line buffered and the buffer is full, the callback gets called even there isn't a line terminator, and level triggering documentation (and any fixes for in-tree inappropriate uses, if any) could go in a separate PR?
Let me try again to split it up. I think the reason I didn't for this PR is b/c a user not being able to read a buffer without a line == a user not reading data b/c they "forgot to". So that's why I ended up putting it all together. It led to some testing difficulties otherwise. But I'll give it another shot.
I was questioning the correctness of the proposed change for "forgot to".
Edit: apologies if that was redundant!
|
898611b
to
93b87e0
Compare
93b87e0
to
3b6bd81
Compare
re-pushed, this PR is now limited to just the line buffering corner case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! I have a few comments that are mostly trivial.
I did want to manually kick it a bit with lptest
and see how things go so I'll save approval for after that test.
// libsubprocess will attempt to get the user to read from the buffer that | ||
// is overrun. So generally speaking, stdout buffer overrun should still | ||
// work. | ||
ok (iostress_run_check (h, "tinystdout", false, 0, 128, 1, 1, 256), | ||
"tinystdout works"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"buffer overrun" kind of implies that data is lost (to me).
Maybe the comment should just be
// When the line size is greater than the buffer size, all the data is transferred.
// flux_subprocess_read_line() will receive a "line" that is not terminated with \n
* | ||
* This function may return an incomplete line under the rare | ||
* circumstance the stream has closed and last output is not a line. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: s/under the rare circumstance/when/
This is a general purpose library so the fact that it's rare for job stdout doesn't necessarily mean API users should consider it rare.
src/common/libsubprocess/remote.c
Outdated
/* no need to handle failure states, on fatal error, these | ||
* reactors are closed */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR but happened to notice this comment is cut & pasted in two places.
Suggestion: s/reactors/watchers/ and perhaps name the watchers that are supposed to be stopped?
/* In the event the buffer is full, the `fbuf_write()` will | ||
* fail. Call user callback to give them a chance to empty | ||
* buffer. If they don't, we'll hit error below. | ||
*/ | ||
if (!fbuf_space (c->read_buffer)) | ||
c->output_cb (c->p, c->name); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was going to comment that throwing an error here is not consistent with allowing the user to stop the stream from the callback, but I see the stream_start/stop functions are noted to be for local processes only.
We can fix that once we have credit based flow control since the remote will never send more data than we have room to put in the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the callback placement here is unfortunate, the reason was the libev did not call things in the order I expected. I expected:
output - put data in buffer, start output prep/check
check_cb - call output callback since there's data in the buffer
<start next iteration of libev loop>
but what happened was
output - put data in buffer, start output prep/check
<start next iteration of libev loop>
prep_check - see data in output buffer, start idle
output - want to put more data in buffer, hit EOVERFLOW
check_cb - this is never reached because of error above
the fact I just started the prep/check means check isn't called in the current iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can fix that once we have credit based flow control since the remote will never send more data than we have room to put in the buffer.
The work on #6291 is only for stdin since that's the specific case brought up by the user. But yeah, for output we should add that as a todo as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the order of events, should the output watcher be stopped when the buffer is full then, and restarted when it's not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was thinking about that after writing the above. The output data is coming from the rexec_continuation()
, which is just the stream of responses from the server. So I don't think we can stop it.
BUT ... then I thought, could we requeue the message at the head of the queue if space is full? Thus the future would be re-called the next iteration in the same way? That would allow us to also alter the behavior to behave more like the io reactor (i.e. spin instead of error out). I don't know how safe or unsafe this is. Skimming code, I guess flux_future_get()
can return a message as a string, then we gotta make it a flux message, and put it back in via flux_requeue()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another route might be to expose watcher priorities in our APIs, and then use them to ensure this check watcher runs before anything else.
I wasn't aware of the libev priority stuff. Hmmmm. I suppose that could be an option, but at this point in time I'm not sure we have a way to add a priority to whatever underneath the covers is calling the flux future's then callback? So perhaps this is something to simply kick the can down the road.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well we could elevate the priority of just this check watcher to get it to be called before the check watcher in the future implementation. Did you want to pause and try that? I could give you a commit that adds a flux_watcher_set_priority()
function to cherry pick. Just as an experiment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Untested but here it is: 1932ce5
Just call flux_watcher_set_priority (check_watcher, 1);
That should raise the priority from the default of 0 to 1. if that doesn't work try 2 :-) 2 is the max.
Edit: it has to be called before the watcher is started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you want to pause and try that?
As this would involve more than a few line tweak, I'm inclined to merge this PR and experiment with it in a different PR. But lets log so we don't forget this conversation.
Edit: see #6302
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
diag ("overflow_output_buffer"); | ||
test_overflow_output_buffer (r); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestoin: rename this test to something like test_long_lines()
or similar since the point is to demonstrate that the buffer does not overflow when it gets a long line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some testing with flux exec
running lptest with various line sizes and lengths, and comparing the output to the same command run locally. I even tried 4G worth of (4MB-500B) lines! No issues.
I then repeated some of these tests with the data going to a job's stdout. Also no issues.
Approving! Nice work.
3b6bd81
to
f3652f9
Compare
re-pushed with tweaks per comments above |
f3652f9
to
41d9426
Compare
5fa082d
to
26ed51c
Compare
Problem: Some comments are a bit unclear because the word "reactor" was used in place of "watcher". Update comments.
Problem: It'd be nice to know how many times the output callback is called, but that is not tracked. Add an output count to the output cb and output its result in diagnostics.
Problem: The flux_subprocess_read_line() function may return an incomplete line if the last output of the stream is not a line. This is not documented. Document this in subprocess.h.
Problem: libsubprocess can hang/spin if the output buffer is line buffered and a line exceeds the current output buffer size. The buffer can never be emptied because output callbacks are never called (i.e. the buffer never contains a line). Solution: If output is line buffered and the buffer is full AND no line exists, call the output callback for the user to get the current data. flux_subprocess_read_line() and similar functions will return data that is not a full line. Fixes flux-framework#6262
Problem: There are no unit tests for when a single line exceeds the size of an output buffer. Add unit tests.
26ed51c
to
3302805
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6281 +/- ##
==========================================
- Coverage 83.32% 83.31% -0.02%
==========================================
Files 523 523
Lines 86124 86133 +9
==========================================
- Hits 71766 71759 -7
- Misses 14358 14374 +16
|
Problem: In several cases, libsubprocess hangs/spins can occur if the internal output buffer is full. For example, if subprocess output is line buffered and a single line exceeds the buffer size, the buffer can never be emptied because output callbacks are never called (i.e. the buffer never contains a line).
Other situations can exist if the user simply does not read data when it becomes available.
Solution: Handle full output buffers with two special cases
if output is line buffered and the buffer is full AND no line exists, call the output callback for the user to get the current data.
flux_subprocess_read_line() and similar functions will return data that is not a full line.
if the buffer is at capacity and the user elected to not read anything in the output callback, drop the data. The internal assumption is that a user must read data that is given to them at that point in time.
Fixes #6262