From fc797b235f8eb1f295a0a4b44e0c89fe7ae5916d Mon Sep 17 00:00:00 2001 From: Mooneer Salem Date: Sat, 31 Aug 2024 09:46:47 -0700 Subject: [PATCH] Use I/O completion ports for stdout/stderr piping. --- src/pipeline/ExternVocoderStep.cpp | 320 +++++++++++++++++++++++------ src/pipeline/ExternVocoderStep.h | 39 ++++ 2 files changed, 294 insertions(+), 65 deletions(-) diff --git a/src/pipeline/ExternVocoderStep.cpp b/src/pipeline/ExternVocoderStep.cpp index 3853980b..ad24bd1b 100644 --- a/src/pipeline/ExternVocoderStep.cpp +++ b/src/pipeline/ExternVocoderStep.cpp @@ -30,6 +30,7 @@ #ifdef _WIN32 #include +#include #else #include #endif // _WIN32 @@ -95,7 +96,7 @@ int ExternVocoderStep::getOutputSampleRate() const std::shared_ptr ExternVocoderStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) { const int MAX_OUTPUT_SAMPLES = 1024; - + *numOutputSamples = 0; short* outputSamples = nullptr; @@ -129,35 +130,11 @@ void ExternVocoderStep::openProcess_() HANDLE tmpStderrWrHandle = NULL; HANDLE tmpStdinRdHandle = NULL; - // Create a pipe for the child process's STDOUT. - if (!CreatePipe(&receiveStdoutHandle_, &tmpStdoutWrHandle, &saAttr, 0) || - !SetHandleInformation(receiveStdoutHandle_, HANDLE_FLAG_INHERIT, 0)) - { - fprintf(stderr, "WARNING: cannot create pipe for stdout!\n"); - if (receiveStdoutHandle_ != nullptr) - { - CloseHandle(receiveStdoutHandle_); - CloseHandle(tmpStdoutWrHandle); - receiveStdoutHandle_ = nullptr; - } - return; - } - - // Create a pipe for the child process's STDERR. - if (!CreatePipe(&receiveStderrHandle_, &tmpStderrWrHandle, &saAttr, 0) || - !SetHandleInformation(receiveStderrHandle_, HANDLE_FLAG_INHERIT, 0)) - { - fprintf(stderr, "WARNING: cannot create pipe for stdout!\n"); - if (receiveStderrHandle_ != nullptr) - { - CloseHandle(receiveStderrHandle_); - CloseHandle(tmpStderrWrHandle); - receiveStderrHandle_ = nullptr; - } - return; - } + // Create pipes for the child process's stdout/stderr. + CreateAsyncPipe_(&receiveStdoutHandle_, &tmpStdoutWrHandle); + CreateAsyncPipe_(&receiveStderrHandle_, &tmpStderrWrHandle); - // Create a pipe for the child process's STDIN. + // Create a pipe for the child process's stdin. if (!CreatePipe(&tmpStdinRdHandle, &receiveStdinHandle_, &saAttr, 0) || !SetHandleInformation(receiveStdinHandle_, HANDLE_FLAG_INHERIT, 0)) { @@ -218,74 +195,287 @@ void ExternVocoderStep::openProcess_() // Save process handle so we can terminate it later recvProcessHandle_ = piProcInfo.hProcess; + recvProcessId_ = piProcInfo.dwProcessId; } void ExternVocoderStep::closeProcess_() { if (recvProcessHandle_ != NULL) { + //KillProcessTree_(recvProcessId_); + + // Make sure process has actually terminated + TerminateProcess(recvProcessHandle_, 0); + WaitForSingleObject(recvProcessHandle_, 1000); + CloseHandle(recvProcessHandle_); + + // Close pipe handles CloseHandle(receiveStdinHandle_); CloseHandle(receiveStdoutHandle_); CloseHandle(receiveStderrHandle_); - TerminateProcess(recvProcessHandle_, 0); - - // Make sure process has actually terminated - WaitForSingleObject(recvProcessHandle_, INFINITE); + } +} + +void ExternVocoderStep::KillProcessTree_(DWORD myprocID) +{ + PROCESSENTRY32 pe; + + memset(&pe, 0, sizeof(PROCESSENTRY32)); + pe.dwSize = sizeof(PROCESSENTRY32); + + HANDLE hSnap = ::CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); + + if (::Process32First(hSnap, &pe)) + { + do // Recursion + { + if (pe.th32ParentProcessID == myprocID) + KillProcessTree_(pe.th32ProcessID); + } while (::Process32Next(hSnap, &pe)); + } + + + // kill the main process + HANDLE hProc = ::OpenProcess(PROCESS_ALL_ACCESS, FALSE, myprocID); + + if (hProc) + { + fprintf(stderr, "killing process ID %lu\n", myprocID); + ::TerminateProcess(hProc, 0); + ::CloseHandle(hProc); } } void ExternVocoderStep::threadEntry_() { - const int NUM_SAMPLES_TO_READ_WRITE = 1; + const int NUM_SAMPLES_TO_READ_WRITE = 160; openProcess_(); - - while (!isExiting_) + + // Kick off reading from stdout/stderr + char* stdoutBuffer = new char[4096]; + assert(stdoutBuffer != nullptr); + char* stderrBuffer = new char[4096]; + assert(stderrBuffer != nullptr); + FileReadBuffer* stdoutRead = new FileReadBuffer(); + assert(stdoutRead != nullptr); + FileReadBuffer* stderrRead = new FileReadBuffer(); + assert(stderrRead != nullptr); + + std::function onStdoutRead = [&](char* buf, size_t bytesAvailable) { + codec2_fifo_write(outputSampleFifo_, (short*)&buf[0], bytesAvailable / sizeof(short)); + }; + + std::function onStderrRead = [&](char* buf, size_t bytesAvailable) { + fprintf(stderr, "%s", buf); + }; + + InitFileReadBuffer_(stdoutRead, receiveStdoutHandle_, stdoutBuffer, 4096, onStdoutRead); + ScheduleFileRead_(stdoutRead); + InitFileReadBuffer_(stderrRead, receiveStderrHandle_, stderrBuffer, 4096, onStderrRead); + ScheduleFileRead_(stderrRead); + + while (!isExiting_ && !stdoutRead->eof && !stderrRead->eof) { - bool operationCompleted = false; - - // Read data from STDERR if available - DWORD bytesAvailable = 0; - if (PeekNamedPipe(receiveStderrHandle_, NULL, 0, NULL, &bytesAvailable, NULL) && bytesAvailable > 0) - { - char buf[bytesAvailable]; - ReadFile(receiveStderrHandle_, buf, bytesAvailable, NULL, NULL); - fprintf(stderr, "%s", buf); - - operationCompleted = true; - } - - // Read data from STDOUT if available - bytesAvailable = 0; - if (PeekNamedPipe(receiveStdoutHandle_, NULL, 0, NULL, &bytesAvailable, NULL) && bytesAvailable > 0) - { - char buf[bytesAvailable]; - ReadFile(receiveStdoutHandle_, buf, bytesAvailable, NULL, NULL); - codec2_fifo_write(outputSampleFifo_, (short*)&buf[0], bytesAvailable / sizeof(short)); - - operationCompleted = true; - } - // Write data to STDIN if available if (codec2_fifo_used(inputSampleFifo_) >= NUM_SAMPLES_TO_READ_WRITE) { short val[NUM_SAMPLES_TO_READ_WRITE]; codec2_fifo_read(inputSampleFifo_, val, NUM_SAMPLES_TO_READ_WRITE); WriteFile(receiveStdinHandle_, val, NUM_SAMPLES_TO_READ_WRITE * sizeof(short), NULL, NULL); - - operationCompleted = true; } - - // Wait 10ms if we didn't do anything in this round - if (!operationCompleted) + else { + // Wait 10ms if we didn't do anything in this round Sleep(10); } } closeProcess_(); + + // Make sure eof is actually set + for (int count = 0; count < 5; count++) + { + if (stdoutRead->eof && stderrRead->eof) break; + Sleep(1000); + } + + delete stdoutRead; + delete[] stdoutBuffer; + delete stderrRead; + delete[] stderrBuffer; +} + +void ExternVocoderStep::CreateAsyncPipe_(HANDLE* outRead, HANDLE* outWrite) +{ + // Generate unique name for the pipe. + UUID uuid; + UuidCreate(&uuid); + char* uuidAsString; + UuidToStringA(&uuid, (RPC_CSTR*)&uuidAsString); + char pipeName[1024]; + sprintf(pipeName, "\\\\.\\pipe\\freedv-%s", uuidAsString); + RpcStringFreeA((RPC_CSTR*)&uuidAsString); + + // Create a pipe. The "instances" parameter is set to 2 because we call this function twice below. + constexpr DWORD Instances = 2; + // Create the named pipe. This will return the handle we use for reading from the pipe. + HANDLE read = CreateNamedPipeA( + pipeName, + // Set FILE_FLAG_OVERLAPPED to enable async I/O for reading from the pipe. + // Note that we still need to set PIPE_WAIT. + PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, + Instances, + // in-bound buffer size + 4096, + // out-going buffer size + 0, + // default timeout for some functions we're not using + 0, + nullptr + ); + if (read == INVALID_HANDLE_VALUE) + { + fprintf(stderr, "Failed to create named pipe (error %lu)", GetLastError()); + return; + } + + // Now create a handle for the other end of the pipe. We are going to pass that handle to the + // process we are creating, so we need to specify that the handle can be inherited. + // Also note that we are NOT setting FILE_FLAG_OVERLAPPED. We could set it, but that's not relevant + // for our end of the pipe. (We do not expect async writes.) + SECURITY_ATTRIBUTES saAttr; + saAttr.nLength = sizeof(SECURITY_ATTRIBUTES); + saAttr.bInheritHandle = TRUE; + saAttr.lpSecurityDescriptor = NULL; + HANDLE write = CreateFileA(pipeName, GENERIC_WRITE, 0, &saAttr, OPEN_EXISTING, 0, 0); + if (write == INVALID_HANDLE_VALUE) + { + fprintf(stderr, "Failed to open named pipe (error %lu)", GetLastError()); + CloseHandle(read); + return; + } + + *outRead = read; + *outWrite = write; } +void ExternVocoderStep::ScheduleFileRead_(FileReadBuffer* readBuffer) +{ + // Prepare the threadpool for an I/O request on our handle. + StartThreadpoolIo(readBuffer->Io); + BOOL success = ReadFile( + readBuffer->FileHandle, + readBuffer->Buffer, + readBuffer->BufferSize, + nullptr, + &readBuffer->Overlapped + ); + if (!success ) { + DWORD error = GetLastError(); + if (error == ERROR_IO_PENDING) { + // Async operation is in progress. This is NOT a failure state. + return; + } + // Since we have started an I/O request above but nothing happened, we need to cancel it. + CancelThreadpoolIo(readBuffer->Io); + + if (error == ERROR_INVALID_USER_BUFFER || error == ERROR_NOT_ENOUGH_MEMORY) { + // Too many outstanding async I/O requests, try again after 10 ms. + // The timer length is given in 100ns increments, negative values indicate relative + // values. FILETIME is actually an unsigned value. Sigh. + constexpr int ToMicro = 10; + constexpr int ToMilli = 1000; + constexpr int64_t Delay = -(10 * ToMicro * ToMilli); + FILETIME timerLength{}; + timerLength.dwHighDateTime = (Delay >> 32) & 0xFFFFFFFF; + timerLength.dwLowDateTime = Delay & 0xFFFFFFFF; + SetThreadpoolTimer(readBuffer->Timer, &timerLength, 0, 0); + return; + } + CloseThreadpoolTimer(readBuffer->Timer); + CloseThreadpoolIo(readBuffer->Io); + if (error == ERROR_BROKEN_PIPE) + { + readBuffer->eof = true; + return; + } + if(error != ERROR_OPERATION_ABORTED) + { + fprintf(stderr, "ReadFile async failed, error code %lu", error); + } + } +} + +void CALLBACK ExternVocoderStep::FileReadComplete_( + PTP_CALLBACK_INSTANCE instance, + void* context, + void* overlapped, + ULONG ioResult, + ULONG_PTR numBytesRead, + PTP_IO io + ) +{ + FileReadBuffer* readBuffer = (FileReadBuffer*)context; + if (ioResult == ERROR_OPERATION_ABORTED) { + // This can happen when someone manually aborts the I/O request. + CloseThreadpoolTimer(readBuffer->Timer); + CloseThreadpoolIo(readBuffer->Io); + return; + } + const bool isEof = ioResult == ERROR_HANDLE_EOF || ioResult == ERROR_BROKEN_PIPE; + if (!(isEof || ioResult == NO_ERROR)) + { + fprintf(stderr, "Got error result %lu while handling I/O callback", ioResult); + readBuffer->eof = true; + return; + } + + readBuffer->onDataRead((char*)readBuffer->Buffer, numBytesRead); + + if (isEof) { + CloseThreadpoolTimer(readBuffer->Timer); + CloseThreadpoolIo(readBuffer->Io); + + readBuffer->eof = true; + } else { + // continue reading + ScheduleFileRead_(readBuffer); + } +} + +void ExternVocoderStep::InitFileReadBuffer_(FileReadBuffer* readBuffer, HANDLE handle, void* buffer, size_t bufferSize, std::function onDataRead) +{ + ZeroMemory(readBuffer, sizeof(*readBuffer)); + readBuffer->onDataRead = onDataRead; + readBuffer->Buffer = buffer; + readBuffer->BufferSize = bufferSize; + readBuffer->FileHandle = handle; + readBuffer->Io = CreateThreadpoolIo(handle, &FileReadComplete_, readBuffer, nullptr); + if (!(readBuffer->Io)) + { + fprintf(stderr, "CreateThreadpoolIo failed, error code %lu", GetLastError()); + return; + } + + // This local struct just exists so I can declare a function here that we can pass to the timer below. + struct Tmp { + static void CALLBACK RetryScheduleFileRead( + PTP_CALLBACK_INSTANCE instance, + void* context, + PTP_TIMER wait + ) { + ScheduleFileRead_((FileReadBuffer*)context); + } + }; + + readBuffer->Timer = CreateThreadpoolTimer(&Tmp::RetryScheduleFileRead, readBuffer, nullptr); + if (!(readBuffer->Timer)) + { + fprintf(stderr, "CreateThreadpoolTimer failed, error code %lu", GetLastError()); + } +} #else void ExternVocoderStep::openProcess_() { diff --git a/src/pipeline/ExternVocoderStep.h b/src/pipeline/ExternVocoderStep.h index 4d2b4afb..6c85979c 100644 --- a/src/pipeline/ExternVocoderStep.h +++ b/src/pipeline/ExternVocoderStep.h @@ -54,6 +54,27 @@ class ExternVocoderStep : public IPipelineStep int outputSampleRate_; #ifdef _WIN32 + struct FileReadBuffer + { + OVERLAPPED Overlapped; + + // Some buffer to read into + void* Buffer; + size_t BufferSize; + + // The handle to the file or pipe. + HANDLE FileHandle; + PTP_IO Io; + PTP_TIMER Timer; + + // Callback when data is read + std::function onDataRead; + + // Indicator that there's nothing left to read + bool eof; + }; + + DWORD recvProcessId_; HANDLE recvProcessHandle_; HANDLE receiveStdoutHandle_; HANDLE receiveStdinHandle_; @@ -75,6 +96,24 @@ class ExternVocoderStep : public IPipelineStep void threadEntry_(); void openProcess_(); void closeProcess_(); + +#ifdef _WIN32 + static void KillProcessTree_(DWORD myprocID); + + // Adapted from https://blog.s-schoener.com/2024-06-16-stream-redirection-win32/. + static void CreateAsyncPipe_(HANDLE* outRead, HANDLE* outWrite); + static void ScheduleFileRead_(FileReadBuffer* readBuffer); + static void CALLBACK FileReadComplete_( + PTP_CALLBACK_INSTANCE instance, + void* context, + void* overlapped, + ULONG ioResult, + ULONG_PTR numBytesRead, + PTP_IO io + ); + static void InitFileReadBuffer_(FileReadBuffer* readBuffer, HANDLE handle, void* buffer, size_t bufferSize, std::function onDataRead); +#endif // _WIN32 + }; #endif // AUDIO_PIPELINE__EXTERN_VOCODER_STEP_H