-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix bug in the shutdown behaviour of the decoder #26
Conversation
How do you repro the bug? I'd like to be able to verify the fix. |
If you run If you're switching between commits, make sure to run the commands in the CONTRIBUTING doc to be sure you are running that version of the code. (In fact just You can also |
Any update on this? The decoder is unfortunately quite useless for batch jobs when the thread never actually joins and instead times out, which also leaves a possible race condition as to whether the decoding actually completed and an undefined situation as to whether the thread ever actually terminates and frees resources based on external code. I was using this project to batch decode FLAC and noticed it's just not viable as is to do so since I have millions of small FLAC audio clips to process and 3 seconds per is just not gonna cut it even if I use a thousand decoder instances. The fix code looks good as to actually solving the problem in my opinion, but might I suggest one final change; loops that call sleep are very inefficient when it comes to timeliness, I think a semaphore should be used for event-driven signaling to wake the thread up when work is available rather than polling. Then signaling that same semaphore in decoder.finish() can wake the thread and cause it to exit when a flag indicates it's time to quit and the buffer is empty. This would allow the worker thread to be way more responsive than sleep could ever be without using media timers, regular sleep is likely going to wake at the best interval of the kernel's context switching (20 ms for Windows) or every 100 ms as the code is now. Just be mindful of potential spurious wakeups if a semaphore is used, but at least waking every 100 ms is better than a 3000 ms timeout in my case. |
Thanks @GOAE, I took your recommendation and updated the thread from polling to signalling using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, you're fast! The way you implemented Event looks good, and testing the code it decodes quite quickly. In my case I'm just creating one decoder instance per FLAC clip to decode, and unfortunately with thread creation overhead, libflac API calls, and database cursor latency it slows it down quite a bit with only one parent thread. But it's still doing a couple dozen per second or so which is a big improvement; testing with a minibatch of 750 clips it took about 30 seconds to process the queue so 25 clips per second.
I used threading.active_count() to verify there was no lingering worker threads, they are indeed exiting in decoder.finish(). Didn't notice I was able to approve this pull request or I would have just done so before, but I'll approve it now.
Thanks for making the fix!
@@ -226,6 +231,7 @@ def finish(self): | |||
# Instruct the decoder to finish up and wait until it is done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above while loop can be removed if the timeout in the below join is also removed. This way we're closer to being purely event driven; that above while loop is polling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually nevermind, I see that _done forces the worker to exit even if the buffer's still not empty without that loop. Could make the worker not exit until the buffer's flushed though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree with removing the timeout. If join
hangs indefinitely it might help spot any shutdown issues before releasing.
For the sake of performance testing I made a synchronous decoder that just one shots, which for reference decoded more or less as quick. class StreamDecoderSynchronous(_Decoder):
def __init__(self, write_callback: Callable[[np.ndarray, int, int, int], None], buffer: bytes):
super().__init__()
self._done = False
self._buffer = deque()
self._buffer.append(buffer)
self._event = threading.Event()
self._event.set()
self._lock = threading.Lock()
self.write_callback = write_callback
rc = _lib.FLAC__stream_decoder_init_stream(
self._decoder,
_lib._read_callback,
_ffi.NULL,
_ffi.NULL,
_ffi.NULL,
_ffi.NULL,
_lib._write_callback,
_ffi.NULL,
_lib._error_callback,
self._decoder_handle
)
if rc != _lib.FLAC__STREAM_DECODER_INIT_STATUS_OK:
raise DecoderInitException(rc)
while len(self._buffer) > 0:
_lib.FLAC__stream_decoder_process_single(self._decoder)
super().finish() |
@GOAE nice idea! Added this in the latest commit, but have rebranded it to a |
1d41596
to
0698711
Compare
464b389
to
0344107
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I like the event-driven pattern
Event
s, and also added aLock
when reading/writing the buffer.FileEncoder
to automatically detect the bit depth of the input file, and use this for encoding. An error is raised if it is not 16 or 32 bit PCM. Since this made thedtype
variable redundant, I updated the version to v3.OneShotDecoder
for anyone that just wants to decode a buffer once, rather than in real time.Closes: #22
Closes: #23
Closes: #24