Skip to content

Commit

Permalink
Move process I/O to a separate thread to avoid audio dropouts.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmiw committed Aug 21, 2024
1 parent ed31b5c commit 1f7fb23
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 48 deletions.
6 changes: 3 additions & 3 deletions radae_demo_rx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ RADAE_VENV=$2
# the RADAE folder.
cd $RADAE_PATH
export PATH=$RADAE_VENV/bin:$PATH
stdbuf -i0 -o0 python3 int16tof32.py --zeropad | \
stdbuf -i0 -o0 python3 radae_rx.py model19_check3/checkpoints/checkpoint_epoch_100.pth -v 2 --auxdata | \
stdbuf -i0 -o0 build/src/lpcnet_demo -fargan-synthesis - -
python3 -u int16tof32.py --zeropad | \
python3 -u radae_rx.py model19_check3/checkpoints/checkpoint_epoch_100.pth -v 2 --auxdata | \
build/src/lpcnet_demo -fargan-synthesis - -
6 changes: 3 additions & 3 deletions radae_demo_tx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ RADAE_VENV=$2
# the RADAE folder.
cd $RADAE_PATH
export PATH=$RADAE_VENV/bin:$PATH
stdbuf -i0 -o0 build/src/lpcnet_demo -features - - | \
stdbuf -i0 -o0 python3 radae_tx.py model19_check3/checkpoints/checkpoint_epoch_100.pth --auxdata | \
stdbuf -i0 -o0 python3 f32toint16.py --real --scale 16383
build/src/lpcnet_demo -features - - | \
python3 -u radae_tx.py model19_check3/checkpoints/checkpoint_epoch_100.pth --auxdata | \
python3 -u f32toint16.py --real --scale 16383
154 changes: 112 additions & 42 deletions src/pipeline/ExternVocoderStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,73 @@
ExternVocoderStep::ExternVocoderStep(std::string scriptPath, int workingSampleRate, int outputSampleRate)
: sampleRate_(workingSampleRate)
, outputSampleRate_(outputSampleRate)
, inputSampleFifo_(nullptr)
, isExiting_(false)
, scriptPath_(scriptPath)
{
// Create FIFOs so we don't lose any samples during run
inputSampleFifo_ = codec2_fifo_create(16384);
assert(inputSampleFifo_ != nullptr);

outputSampleFifo_ = codec2_fifo_create(16384);
assert(outputSampleFifo_ != nullptr);

// Create process thread
vocoderProcessHandlerThread_ = std::thread(std::bind(&ExternVocoderStep::threadEntry_, this));
}

ExternVocoderStep::~ExternVocoderStep()
{
// Stop processing audio samples
isExiting_ = true;
vocoderProcessHandlerThread_.join();

if (inputSampleFifo_ != nullptr)
{
codec2_fifo_free(inputSampleFifo_);
}

if (outputSampleFifo_ != nullptr)
{
codec2_fifo_free(outputSampleFifo_);
}
}

int ExternVocoderStep::getInputSampleRate() const
{
return sampleRate_;
}

int ExternVocoderStep::getOutputSampleRate() const
{
return outputSampleRate_;
}

std::shared_ptr<short> ExternVocoderStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{
const int MAX_OUTPUT_SAMPLES = 1024;

*numOutputSamples = 0;
short* outputSamples = nullptr;

// Write input samples to thread for processing
codec2_fifo_write(inputSampleFifo_, inputSamples.get(), numInputSamples);

// Read and return output samples from thread.
*numOutputSamples = codec2_fifo_used(outputSampleFifo_);
if (*numOutputSamples > 0)
{
*numOutputSamples = std::min(*numOutputSamples, MAX_OUTPUT_SAMPLES);
outputSamples = new short[*numOutputSamples];
assert(outputSamples != nullptr);

codec2_fifo_read(outputSampleFifo_, outputSamples, *numOutputSamples);
}

return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>());
}

void ExternVocoderStep::openProcess_()
{
// Create pipes for stdin/stdout
int stdinPipes[2];
Expand All @@ -44,10 +111,10 @@ ExternVocoderStep::ExternVocoderStep(std::string scriptPath, int workingSampleRa
receiveStdinFd_ = stdinPipes[1];

// Make pipes non-blocking.
int flags = fcntl(receiveStdoutFd_, F_GETFL, 0);
/*int flags = fcntl(receiveStdoutFd_, F_GETFL, 0);
fcntl(receiveStdoutFd_, F_SETFL, flags | O_NONBLOCK);
flags = fcntl(receiveStdinFd_, F_GETFL, 0);
fcntl(receiveStdinFd_, F_SETFL, flags | O_NONBLOCK);
fcntl(receiveStdinFd_, F_SETFL, flags | O_NONBLOCK);*/

// Start external process
recvProcessId_ = fork();
Expand All @@ -64,7 +131,7 @@ ExternVocoderStep::ExternVocoderStep(std::string scriptPath, int workingSampleRa

// Tokenize and generate an argv for exec()
std::vector<std::string> args;
std::stringstream ss(scriptPath);
std::stringstream ss(scriptPath_);
std::string tmp;
while(std::getline(ss, tmp, ' '))
{
Expand All @@ -90,7 +157,7 @@ ExternVocoderStep::ExternVocoderStep(std::string scriptPath, int workingSampleRa

// Should not normally return.
execv(argv[0], argv);
fprintf(stderr, "WARNING: could not run %s (errno %d)\n", scriptPath.c_str(), errno);
fprintf(stderr, "WARNING: could not run %s (errno %d)\n", scriptPath_.c_str(), errno);
exit(-1);
}
else
Expand All @@ -100,7 +167,7 @@ ExternVocoderStep::ExternVocoderStep(std::string scriptPath, int workingSampleRa
}
}

ExternVocoderStep::~ExternVocoderStep()
void ExternVocoderStep::closeProcess_()
{
// Close pipes and kill process. Hopefully the process
// will die on its own but if it doesn't, force kill it.
Expand All @@ -116,45 +183,48 @@ ExternVocoderStep::~ExternVocoderStep()
waitpid(recvProcessId_, NULL, 0);
}

int ExternVocoderStep::getInputSampleRate() const
void ExternVocoderStep::threadEntry_()
{
return sampleRate_;
}

int ExternVocoderStep::getOutputSampleRate() const
{
return outputSampleRate_;
}

std::shared_ptr<short> ExternVocoderStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{
const int MIN_NUM_SAMPLES_TO_READ = 1024;
int samplesToRead = std::max(MIN_NUM_SAMPLES_TO_READ, numInputSamples);

*numOutputSamples = 0;
short* outputSamples = nullptr;
const int NUM_SAMPLES_TO_READ_WRITE = 1;

short output_buf[samplesToRead];
short* inputPtr = inputSamples.get();

if (numInputSamples > 0)
{
write(receiveStdinFd_, inputPtr, numInputSamples * sizeof(short));
}
*numOutputSamples = read(receiveStdoutFd_, output_buf, samplesToRead * sizeof(short));
if (*numOutputSamples == -1)
{
*numOutputSamples = 0;
}
else
openProcess_();

while (!isExiting_)
{
*numOutputSamples /= sizeof(short);

outputSamples = new short[*numOutputSamples];
assert(outputSamples != nullptr);
fd_set processReadFds;
fd_set processWriteFds;
FD_ZERO(&processReadFds);
FD_ZERO(&processWriteFds);

FD_SET(receiveStdinFd_, &processWriteFds);
FD_SET(receiveStdoutFd_, &processReadFds);

// 10ms timeout
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10000;

memcpy(outputSamples, output_buf, *numOutputSamples * sizeof(short));
int rv = select(std::max(receiveStdinFd_, receiveStdoutFd_) + 1, &processReadFds, &processWriteFds, nullptr, &tv);
if (rv > 0)
{
if (FD_ISSET(receiveStdinFd_, &processWriteFds) && codec2_fifo_used(inputSampleFifo_) >= NUM_SAMPLES_TO_READ_WRITE)
{
// Can write to process
short val[NUM_SAMPLES_TO_READ_WRITE];
codec2_fifo_read(inputSampleFifo_, val, NUM_SAMPLES_TO_READ_WRITE);
write(receiveStdinFd_, val, NUM_SAMPLES_TO_READ_WRITE * sizeof(short));
}

if (FD_ISSET(receiveStdoutFd_, &processReadFds))
{
short output_buf[NUM_SAMPLES_TO_READ_WRITE];
if ((rv = read(receiveStdoutFd_, output_buf, NUM_SAMPLES_TO_READ_WRITE * sizeof(short))) > 0)
{
codec2_fifo_write(outputSampleFifo_, output_buf, rv / sizeof(short));
}
}
}
}

return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>());
}
closeProcess_();
}
18 changes: 18 additions & 0 deletions src/pipeline/ExternVocoderStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@

#include <unistd.h>
#include <string>
#include <thread>
#include "IPipelineStep.h"

// Forward definition of structs from Codec2.
extern "C"
{
struct FIFO;
}

class ExternVocoderStep : public IPipelineStep
{
public:
Expand All @@ -43,6 +50,17 @@ class ExternVocoderStep : public IPipelineStep
pid_t recvProcessId_;
int receiveStdoutFd_;
int receiveStdinFd_;
struct FIFO* inputSampleFifo_;
struct FIFO* outputSampleFifo_;

std::thread vocoderProcessHandlerThread_;
bool isExiting_;

std::string scriptPath_;

void threadEntry_();
void openProcess_();
void closeProcess_();
};

#endif // AUDIO_PIPELINE__EXTERN_VOCODER_STEP_H

0 comments on commit 1f7fb23

Please sign in to comment.