Skip to content

Commit

Permalink
Use I/O completion ports for stdout/stderr piping.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmiw committed Aug 31, 2024
1 parent 65d60b3 commit fc797b2
Show file tree
Hide file tree
Showing 2 changed files with 294 additions and 65 deletions.
320 changes: 255 additions & 65 deletions src/pipeline/ExternVocoderStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#ifdef _WIN32
#include <windows.h>
#include <tlhelp32.h>
#else
#include <sys/wait.h>
#endif // _WIN32
Expand Down Expand Up @@ -95,7 +96,7 @@ int ExternVocoderStep::getOutputSampleRate() const
std::shared_ptr<short> ExternVocoderStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{
const int MAX_OUTPUT_SAMPLES = 1024;

*numOutputSamples = 0;
short* outputSamples = nullptr;

Expand Down Expand Up @@ -129,35 +130,11 @@ void ExternVocoderStep::openProcess_()
HANDLE tmpStderrWrHandle = NULL;
HANDLE tmpStdinRdHandle = NULL;

// Create a pipe for the child process's STDOUT.
if (!CreatePipe(&receiveStdoutHandle_, &tmpStdoutWrHandle, &saAttr, 0) ||
!SetHandleInformation(receiveStdoutHandle_, HANDLE_FLAG_INHERIT, 0))
{
fprintf(stderr, "WARNING: cannot create pipe for stdout!\n");
if (receiveStdoutHandle_ != nullptr)
{
CloseHandle(receiveStdoutHandle_);
CloseHandle(tmpStdoutWrHandle);
receiveStdoutHandle_ = nullptr;
}
return;
}

// Create a pipe for the child process's STDERR.
if (!CreatePipe(&receiveStderrHandle_, &tmpStderrWrHandle, &saAttr, 0) ||
!SetHandleInformation(receiveStderrHandle_, HANDLE_FLAG_INHERIT, 0))
{
fprintf(stderr, "WARNING: cannot create pipe for stdout!\n");
if (receiveStderrHandle_ != nullptr)
{
CloseHandle(receiveStderrHandle_);
CloseHandle(tmpStderrWrHandle);
receiveStderrHandle_ = nullptr;
}
return;
}
// Create pipes for the child process's stdout/stderr.
CreateAsyncPipe_(&receiveStdoutHandle_, &tmpStdoutWrHandle);
CreateAsyncPipe_(&receiveStderrHandle_, &tmpStderrWrHandle);

// Create a pipe for the child process's STDIN.
// Create a pipe for the child process's stdin.
if (!CreatePipe(&tmpStdinRdHandle, &receiveStdinHandle_, &saAttr, 0) ||
!SetHandleInformation(receiveStdinHandle_, HANDLE_FLAG_INHERIT, 0))
{
Expand Down Expand Up @@ -218,74 +195,287 @@ void ExternVocoderStep::openProcess_()

// Save process handle so we can terminate it later
recvProcessHandle_ = piProcInfo.hProcess;
recvProcessId_ = piProcInfo.dwProcessId;
}

void ExternVocoderStep::closeProcess_()
{
if (recvProcessHandle_ != NULL)
{
//KillProcessTree_(recvProcessId_);

// Make sure process has actually terminated
TerminateProcess(recvProcessHandle_, 0);
WaitForSingleObject(recvProcessHandle_, 1000);
CloseHandle(recvProcessHandle_);

// Close pipe handles
CloseHandle(receiveStdinHandle_);
CloseHandle(receiveStdoutHandle_);
CloseHandle(receiveStderrHandle_);
TerminateProcess(recvProcessHandle_, 0);

// Make sure process has actually terminated
WaitForSingleObject(recvProcessHandle_, INFINITE);
}
}

void ExternVocoderStep::KillProcessTree_(DWORD myprocID)
{
PROCESSENTRY32 pe;

memset(&pe, 0, sizeof(PROCESSENTRY32));
pe.dwSize = sizeof(PROCESSENTRY32);

HANDLE hSnap = ::CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);

if (::Process32First(hSnap, &pe))
{
do // Recursion
{
if (pe.th32ParentProcessID == myprocID)
KillProcessTree_(pe.th32ProcessID);
} while (::Process32Next(hSnap, &pe));
}


// kill the main process
HANDLE hProc = ::OpenProcess(PROCESS_ALL_ACCESS, FALSE, myprocID);

if (hProc)
{
fprintf(stderr, "killing process ID %lu\n", myprocID);
::TerminateProcess(hProc, 0);
::CloseHandle(hProc);
}
}

void ExternVocoderStep::threadEntry_()
{
const int NUM_SAMPLES_TO_READ_WRITE = 1;
const int NUM_SAMPLES_TO_READ_WRITE = 160;

openProcess_();

while (!isExiting_)

// Kick off reading from stdout/stderr
char* stdoutBuffer = new char[4096];
assert(stdoutBuffer != nullptr);
char* stderrBuffer = new char[4096];
assert(stderrBuffer != nullptr);
FileReadBuffer* stdoutRead = new FileReadBuffer();
assert(stdoutRead != nullptr);
FileReadBuffer* stderrRead = new FileReadBuffer();
assert(stderrRead != nullptr);

std::function<void(char*, size_t)> onStdoutRead = [&](char* buf, size_t bytesAvailable) {
codec2_fifo_write(outputSampleFifo_, (short*)&buf[0], bytesAvailable / sizeof(short));
};

std::function<void(char*, size_t)> onStderrRead = [&](char* buf, size_t bytesAvailable) {
fprintf(stderr, "%s", buf);
};

InitFileReadBuffer_(stdoutRead, receiveStdoutHandle_, stdoutBuffer, 4096, onStdoutRead);
ScheduleFileRead_(stdoutRead);
InitFileReadBuffer_(stderrRead, receiveStderrHandle_, stderrBuffer, 4096, onStderrRead);
ScheduleFileRead_(stderrRead);

while (!isExiting_ && !stdoutRead->eof && !stderrRead->eof)
{
bool operationCompleted = false;

// Read data from STDERR if available
DWORD bytesAvailable = 0;
if (PeekNamedPipe(receiveStderrHandle_, NULL, 0, NULL, &bytesAvailable, NULL) && bytesAvailable > 0)
{
char buf[bytesAvailable];
ReadFile(receiveStderrHandle_, buf, bytesAvailable, NULL, NULL);
fprintf(stderr, "%s", buf);

operationCompleted = true;
}

// Read data from STDOUT if available
bytesAvailable = 0;
if (PeekNamedPipe(receiveStdoutHandle_, NULL, 0, NULL, &bytesAvailable, NULL) && bytesAvailable > 0)
{
char buf[bytesAvailable];
ReadFile(receiveStdoutHandle_, buf, bytesAvailable, NULL, NULL);
codec2_fifo_write(outputSampleFifo_, (short*)&buf[0], bytesAvailable / sizeof(short));

operationCompleted = true;
}

// Write data to STDIN if available
if (codec2_fifo_used(inputSampleFifo_) >= NUM_SAMPLES_TO_READ_WRITE)
{
short val[NUM_SAMPLES_TO_READ_WRITE];
codec2_fifo_read(inputSampleFifo_, val, NUM_SAMPLES_TO_READ_WRITE);
WriteFile(receiveStdinHandle_, val, NUM_SAMPLES_TO_READ_WRITE * sizeof(short), NULL, NULL);

operationCompleted = true;
}

// Wait 10ms if we didn't do anything in this round
if (!operationCompleted)
else
{
// Wait 10ms if we didn't do anything in this round
Sleep(10);
}
}

closeProcess_();

// Make sure eof is actually set
for (int count = 0; count < 5; count++)
{
if (stdoutRead->eof && stderrRead->eof) break;
Sleep(1000);
}

delete stdoutRead;
delete[] stdoutBuffer;
delete stderrRead;
delete[] stderrBuffer;
}

void ExternVocoderStep::CreateAsyncPipe_(HANDLE* outRead, HANDLE* outWrite)
{
// Generate unique name for the pipe.
UUID uuid;
UuidCreate(&uuid);
char* uuidAsString;
UuidToStringA(&uuid, (RPC_CSTR*)&uuidAsString);
char pipeName[1024];
sprintf(pipeName, "\\\\.\\pipe\\freedv-%s", uuidAsString);
RpcStringFreeA((RPC_CSTR*)&uuidAsString);

// Create a pipe. The "instances" parameter is set to 2 because we call this function twice below.
constexpr DWORD Instances = 2;
// Create the named pipe. This will return the handle we use for reading from the pipe.
HANDLE read = CreateNamedPipeA(
pipeName,
// Set FILE_FLAG_OVERLAPPED to enable async I/O for reading from the pipe.
// Note that we still need to set PIPE_WAIT.
PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
Instances,
// in-bound buffer size
4096,
// out-going buffer size
0,
// default timeout for some functions we're not using
0,
nullptr
);
if (read == INVALID_HANDLE_VALUE)
{
fprintf(stderr, "Failed to create named pipe (error %lu)", GetLastError());
return;
}

// Now create a handle for the other end of the pipe. We are going to pass that handle to the
// process we are creating, so we need to specify that the handle can be inherited.
// Also note that we are NOT setting FILE_FLAG_OVERLAPPED. We could set it, but that's not relevant
// for our end of the pipe. (We do not expect async writes.)
SECURITY_ATTRIBUTES saAttr;
saAttr.nLength = sizeof(SECURITY_ATTRIBUTES);
saAttr.bInheritHandle = TRUE;
saAttr.lpSecurityDescriptor = NULL;
HANDLE write = CreateFileA(pipeName, GENERIC_WRITE, 0, &saAttr, OPEN_EXISTING, 0, 0);
if (write == INVALID_HANDLE_VALUE)
{
fprintf(stderr, "Failed to open named pipe (error %lu)", GetLastError());
CloseHandle(read);
return;
}

*outRead = read;
*outWrite = write;
}

void ExternVocoderStep::ScheduleFileRead_(FileReadBuffer* readBuffer)
{
// Prepare the threadpool for an I/O request on our handle.
StartThreadpoolIo(readBuffer->Io);
BOOL success = ReadFile(
readBuffer->FileHandle,
readBuffer->Buffer,
readBuffer->BufferSize,
nullptr,
&readBuffer->Overlapped
);
if (!success ) {
DWORD error = GetLastError();
if (error == ERROR_IO_PENDING) {
// Async operation is in progress. This is NOT a failure state.
return;
}
// Since we have started an I/O request above but nothing happened, we need to cancel it.
CancelThreadpoolIo(readBuffer->Io);

if (error == ERROR_INVALID_USER_BUFFER || error == ERROR_NOT_ENOUGH_MEMORY) {
// Too many outstanding async I/O requests, try again after 10 ms.
// The timer length is given in 100ns increments, negative values indicate relative
// values. FILETIME is actually an unsigned value. Sigh.
constexpr int ToMicro = 10;
constexpr int ToMilli = 1000;
constexpr int64_t Delay = -(10 * ToMicro * ToMilli);
FILETIME timerLength{};
timerLength.dwHighDateTime = (Delay >> 32) & 0xFFFFFFFF;
timerLength.dwLowDateTime = Delay & 0xFFFFFFFF;
SetThreadpoolTimer(readBuffer->Timer, &timerLength, 0, 0);
return;
}
CloseThreadpoolTimer(readBuffer->Timer);
CloseThreadpoolIo(readBuffer->Io);
if (error == ERROR_BROKEN_PIPE)
{
readBuffer->eof = true;
return;
}
if(error != ERROR_OPERATION_ABORTED)
{
fprintf(stderr, "ReadFile async failed, error code %lu", error);
}
}
}

void CALLBACK ExternVocoderStep::FileReadComplete_(
PTP_CALLBACK_INSTANCE instance,
void* context,
void* overlapped,
ULONG ioResult,
ULONG_PTR numBytesRead,
PTP_IO io
)
{
FileReadBuffer* readBuffer = (FileReadBuffer*)context;
if (ioResult == ERROR_OPERATION_ABORTED) {
// This can happen when someone manually aborts the I/O request.
CloseThreadpoolTimer(readBuffer->Timer);
CloseThreadpoolIo(readBuffer->Io);
return;
}
const bool isEof = ioResult == ERROR_HANDLE_EOF || ioResult == ERROR_BROKEN_PIPE;
if (!(isEof || ioResult == NO_ERROR))
{
fprintf(stderr, "Got error result %lu while handling I/O callback", ioResult);
readBuffer->eof = true;
return;
}

readBuffer->onDataRead((char*)readBuffer->Buffer, numBytesRead);

if (isEof) {
CloseThreadpoolTimer(readBuffer->Timer);
CloseThreadpoolIo(readBuffer->Io);

readBuffer->eof = true;
} else {
// continue reading
ScheduleFileRead_(readBuffer);
}
}

void ExternVocoderStep::InitFileReadBuffer_(FileReadBuffer* readBuffer, HANDLE handle, void* buffer, size_t bufferSize, std::function<void(char*, size_t)> onDataRead)
{
ZeroMemory(readBuffer, sizeof(*readBuffer));
readBuffer->onDataRead = onDataRead;
readBuffer->Buffer = buffer;
readBuffer->BufferSize = bufferSize;
readBuffer->FileHandle = handle;
readBuffer->Io = CreateThreadpoolIo(handle, &FileReadComplete_, readBuffer, nullptr);
if (!(readBuffer->Io))
{
fprintf(stderr, "CreateThreadpoolIo failed, error code %lu", GetLastError());
return;
}

// This local struct just exists so I can declare a function here that we can pass to the timer below.
struct Tmp {
static void CALLBACK RetryScheduleFileRead(
PTP_CALLBACK_INSTANCE instance,
void* context,
PTP_TIMER wait
) {
ScheduleFileRead_((FileReadBuffer*)context);
}
};

readBuffer->Timer = CreateThreadpoolTimer(&Tmp::RetryScheduleFileRead, readBuffer, nullptr);
if (!(readBuffer->Timer))
{
fprintf(stderr, "CreateThreadpoolTimer failed, error code %lu", GetLastError());
}
}
#else
void ExternVocoderStep::openProcess_()
{
Expand Down
Loading

0 comments on commit fc797b2

Please sign in to comment.