From 1f7fb2389dd3031e248d3fbe9a9e6b45ba0840e6 Mon Sep 17 00:00:00 2001 From: Mooneer Salem Date: Tue, 20 Aug 2024 18:47:54 -0700 Subject: [PATCH] Move process I/O to a separate thread to avoid audio dropouts. --- radae_demo_rx.sh | 6 +- radae_demo_tx.sh | 6 +- src/pipeline/ExternVocoderStep.cpp | 154 +++++++++++++++++++++-------- src/pipeline/ExternVocoderStep.h | 18 ++++ 4 files changed, 136 insertions(+), 48 deletions(-) diff --git a/radae_demo_rx.sh b/radae_demo_rx.sh index 7d3bbb6a..5d6c2f71 100755 --- a/radae_demo_rx.sh +++ b/radae_demo_rx.sh @@ -13,6 +13,6 @@ RADAE_VENV=$2 # the RADAE folder. cd $RADAE_PATH export PATH=$RADAE_VENV/bin:$PATH -stdbuf -i0 -o0 python3 int16tof32.py --zeropad | \ -stdbuf -i0 -o0 python3 radae_rx.py model19_check3/checkpoints/checkpoint_epoch_100.pth -v 2 --auxdata | \ -stdbuf -i0 -o0 build/src/lpcnet_demo -fargan-synthesis - - +python3 -u int16tof32.py --zeropad | \ + python3 -u radae_rx.py model19_check3/checkpoints/checkpoint_epoch_100.pth -v 2 --auxdata | \ + build/src/lpcnet_demo -fargan-synthesis - - diff --git a/radae_demo_tx.sh b/radae_demo_tx.sh index b3ecd253..8e971c1d 100755 --- a/radae_demo_tx.sh +++ b/radae_demo_tx.sh @@ -13,6 +13,6 @@ RADAE_VENV=$2 # the RADAE folder. cd $RADAE_PATH export PATH=$RADAE_VENV/bin:$PATH -stdbuf -i0 -o0 build/src/lpcnet_demo -features - - | \ -stdbuf -i0 -o0 python3 radae_tx.py model19_check3/checkpoints/checkpoint_epoch_100.pth --auxdata | \ -stdbuf -i0 -o0 python3 f32toint16.py --real --scale 16383 +build/src/lpcnet_demo -features - - | \ + python3 -u radae_tx.py model19_check3/checkpoints/checkpoint_epoch_100.pth --auxdata | \ + python3 -u f32toint16.py --real --scale 16383 diff --git a/src/pipeline/ExternVocoderStep.cpp b/src/pipeline/ExternVocoderStep.cpp index 30b14dc7..532d33c3 100644 --- a/src/pipeline/ExternVocoderStep.cpp +++ b/src/pipeline/ExternVocoderStep.cpp @@ -31,6 +31,73 @@ ExternVocoderStep::ExternVocoderStep(std::string scriptPath, int workingSampleRate, int outputSampleRate) : sampleRate_(workingSampleRate) , outputSampleRate_(outputSampleRate) + , inputSampleFifo_(nullptr) + , isExiting_(false) + , scriptPath_(scriptPath) +{ + // Create FIFOs so we don't lose any samples during run + inputSampleFifo_ = codec2_fifo_create(16384); + assert(inputSampleFifo_ != nullptr); + + outputSampleFifo_ = codec2_fifo_create(16384); + assert(outputSampleFifo_ != nullptr); + + // Create process thread + vocoderProcessHandlerThread_ = std::thread(std::bind(&ExternVocoderStep::threadEntry_, this)); +} + +ExternVocoderStep::~ExternVocoderStep() +{ + // Stop processing audio samples + isExiting_ = true; + vocoderProcessHandlerThread_.join(); + + if (inputSampleFifo_ != nullptr) + { + codec2_fifo_free(inputSampleFifo_); + } + + if (outputSampleFifo_ != nullptr) + { + codec2_fifo_free(outputSampleFifo_); + } +} + +int ExternVocoderStep::getInputSampleRate() const +{ + return sampleRate_; +} + +int ExternVocoderStep::getOutputSampleRate() const +{ + return outputSampleRate_; +} + +std::shared_ptr ExternVocoderStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) +{ + const int MAX_OUTPUT_SAMPLES = 1024; + + *numOutputSamples = 0; + short* outputSamples = nullptr; + + // Write input samples to thread for processing + codec2_fifo_write(inputSampleFifo_, inputSamples.get(), numInputSamples); + + // Read and return output samples from thread. + *numOutputSamples = codec2_fifo_used(outputSampleFifo_); + if (*numOutputSamples > 0) + { + *numOutputSamples = std::min(*numOutputSamples, MAX_OUTPUT_SAMPLES); + outputSamples = new short[*numOutputSamples]; + assert(outputSamples != nullptr); + + codec2_fifo_read(outputSampleFifo_, outputSamples, *numOutputSamples); + } + + return std::shared_ptr(outputSamples, std::default_delete()); +} + +void ExternVocoderStep::openProcess_() { // Create pipes for stdin/stdout int stdinPipes[2]; @@ -44,10 +111,10 @@ ExternVocoderStep::ExternVocoderStep(std::string scriptPath, int workingSampleRa receiveStdinFd_ = stdinPipes[1]; // Make pipes non-blocking. - int flags = fcntl(receiveStdoutFd_, F_GETFL, 0); + /*int flags = fcntl(receiveStdoutFd_, F_GETFL, 0); fcntl(receiveStdoutFd_, F_SETFL, flags | O_NONBLOCK); flags = fcntl(receiveStdinFd_, F_GETFL, 0); - fcntl(receiveStdinFd_, F_SETFL, flags | O_NONBLOCK); + fcntl(receiveStdinFd_, F_SETFL, flags | O_NONBLOCK);*/ // Start external process recvProcessId_ = fork(); @@ -64,7 +131,7 @@ ExternVocoderStep::ExternVocoderStep(std::string scriptPath, int workingSampleRa // Tokenize and generate an argv for exec() std::vector args; - std::stringstream ss(scriptPath); + std::stringstream ss(scriptPath_); std::string tmp; while(std::getline(ss, tmp, ' ')) { @@ -90,7 +157,7 @@ ExternVocoderStep::ExternVocoderStep(std::string scriptPath, int workingSampleRa // Should not normally return. execv(argv[0], argv); - fprintf(stderr, "WARNING: could not run %s (errno %d)\n", scriptPath.c_str(), errno); + fprintf(stderr, "WARNING: could not run %s (errno %d)\n", scriptPath_.c_str(), errno); exit(-1); } else @@ -100,7 +167,7 @@ ExternVocoderStep::ExternVocoderStep(std::string scriptPath, int workingSampleRa } } -ExternVocoderStep::~ExternVocoderStep() +void ExternVocoderStep::closeProcess_() { // Close pipes and kill process. Hopefully the process // will die on its own but if it doesn't, force kill it. @@ -116,45 +183,48 @@ ExternVocoderStep::~ExternVocoderStep() waitpid(recvProcessId_, NULL, 0); } -int ExternVocoderStep::getInputSampleRate() const +void ExternVocoderStep::threadEntry_() { - return sampleRate_; -} - -int ExternVocoderStep::getOutputSampleRate() const -{ - return outputSampleRate_; -} - -std::shared_ptr ExternVocoderStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) -{ - const int MIN_NUM_SAMPLES_TO_READ = 1024; - int samplesToRead = std::max(MIN_NUM_SAMPLES_TO_READ, numInputSamples); - - *numOutputSamples = 0; - short* outputSamples = nullptr; + const int NUM_SAMPLES_TO_READ_WRITE = 1; - short output_buf[samplesToRead]; - short* inputPtr = inputSamples.get(); - - if (numInputSamples > 0) - { - write(receiveStdinFd_, inputPtr, numInputSamples * sizeof(short)); - } - *numOutputSamples = read(receiveStdoutFd_, output_buf, samplesToRead * sizeof(short)); - if (*numOutputSamples == -1) - { - *numOutputSamples = 0; - } - else + openProcess_(); + + while (!isExiting_) { - *numOutputSamples /= sizeof(short); - - outputSamples = new short[*numOutputSamples]; - assert(outputSamples != nullptr); + fd_set processReadFds; + fd_set processWriteFds; + FD_ZERO(&processReadFds); + FD_ZERO(&processWriteFds); + + FD_SET(receiveStdinFd_, &processWriteFds); + FD_SET(receiveStdoutFd_, &processReadFds); + + // 10ms timeout + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 10000; - memcpy(outputSamples, output_buf, *numOutputSamples * sizeof(short)); + int rv = select(std::max(receiveStdinFd_, receiveStdoutFd_) + 1, &processReadFds, &processWriteFds, nullptr, &tv); + if (rv > 0) + { + if (FD_ISSET(receiveStdinFd_, &processWriteFds) && codec2_fifo_used(inputSampleFifo_) >= NUM_SAMPLES_TO_READ_WRITE) + { + // Can write to process + short val[NUM_SAMPLES_TO_READ_WRITE]; + codec2_fifo_read(inputSampleFifo_, val, NUM_SAMPLES_TO_READ_WRITE); + write(receiveStdinFd_, val, NUM_SAMPLES_TO_READ_WRITE * sizeof(short)); + } + + if (FD_ISSET(receiveStdoutFd_, &processReadFds)) + { + short output_buf[NUM_SAMPLES_TO_READ_WRITE]; + if ((rv = read(receiveStdoutFd_, output_buf, NUM_SAMPLES_TO_READ_WRITE * sizeof(short))) > 0) + { + codec2_fifo_write(outputSampleFifo_, output_buf, rv / sizeof(short)); + } + } + } } - - return std::shared_ptr(outputSamples, std::default_delete()); -} + + closeProcess_(); +} \ No newline at end of file diff --git a/src/pipeline/ExternVocoderStep.h b/src/pipeline/ExternVocoderStep.h index be4b5e18..c292fbe6 100644 --- a/src/pipeline/ExternVocoderStep.h +++ b/src/pipeline/ExternVocoderStep.h @@ -25,8 +25,15 @@ #include #include +#include #include "IPipelineStep.h" +// Forward definition of structs from Codec2. +extern "C" +{ + struct FIFO; +} + class ExternVocoderStep : public IPipelineStep { public: @@ -43,6 +50,17 @@ class ExternVocoderStep : public IPipelineStep pid_t recvProcessId_; int receiveStdoutFd_; int receiveStdinFd_; + struct FIFO* inputSampleFifo_; + struct FIFO* outputSampleFifo_; + + std::thread vocoderProcessHandlerThread_; + bool isExiting_; + + std::string scriptPath_; + + void threadEntry_(); + void openProcess_(); + void closeProcess_(); }; #endif // AUDIO_PIPELINE__EXTERN_VOCODER_STEP_H