From 27b10822e385222d61f8e886527bc4bbb01f3d9d Mon Sep 17 00:00:00 2001 From: Piotr Czarnas Date: Tue, 22 Oct 2024 23:35:23 +0200 Subject: [PATCH] Support silently capturing output from Python if necessary. --- .../utils/python/StreamingPythonProcess.java | 41 +++-- .../dqops/utils/python/TailOutputStream.java | 146 ++++++++++++++++++ 2 files changed, 170 insertions(+), 17 deletions(-) create mode 100644 dqops/src/main/java/com/dqops/utils/python/TailOutputStream.java diff --git a/dqops/src/main/java/com/dqops/utils/python/StreamingPythonProcess.java b/dqops/src/main/java/com/dqops/utils/python/StreamingPythonProcess.java index 4957b44bc5..969b7025b6 100644 --- a/dqops/src/main/java/com/dqops/utils/python/StreamingPythonProcess.java +++ b/dqops/src/main/java/com/dqops/utils/python/StreamingPythonProcess.java @@ -43,6 +43,7 @@ public class StreamingPythonProcess implements Closeable, ExecuteResultHandler { private static final int PYTHON_BUFFER_SIZE = 1024; // buffer size used in the python streaming process in a call to TextIO.read(buffer_size) private static final int PYTHON_RECEIVE_RESPONSE_BUFFER_SIZE = 1024; private static final byte[] PYTHON_BUFFER_SPACE = StringUtils.repeat(' ', PYTHON_BUFFER_SIZE + 10).getBytes(StandardCharsets.UTF_8); + private static final boolean STOP_ON_STDERR_ERROR = true; // TODO: We can support using a parameter to turn it on/off private PipedOutputStream writeToProcessStream; private PipedInputStream writeToProcessStreamProcessSide; @@ -257,23 +258,29 @@ public void startProcessCore(PythonVirtualEnv pythonVirtualEnv) { this.jsonFactory = new JsonFactory(); this.jsonParser = jsonFactory.createParser(this.readFromProcessStreamReader); - ActivityDetectionOutputStream errorOutputStream = new ActivityDetectionOutputStream(new FlushingOutputStream(this.errorStream)); - this.outputDetectedOnStderrFuture = errorOutputStream.getOutputDetectedFuture(); - this.outputDetectedOnStderrFuture - .thenRun(() -> { - try { - // we detected that some output was written to the stderr of the python process, it is an error and we will terminate... - Thread.sleep(100); // we need to wait for the remaining output - this.waitForClose.countDown(); - this.close(); - - String errStreamText = this.errorStream.toString(StandardCharsets.UTF_8); - log.error("Python process failed with an error, the error captured from the stderr: " + errStreamText); - } - catch (Exception ioe) { - log.error("Python process failed with an error and we cannot close the stream: " + ioe.getMessage(), ioe); - } - }); + OutputStream errorOutputStream = null; + if (STOP_ON_STDERR_ERROR) { + ActivityDetectionOutputStream activityDetectionOutputStream = new ActivityDetectionOutputStream(new FlushingOutputStream(this.errorStream)); + errorOutputStream = activityDetectionOutputStream; + this.outputDetectedOnStderrFuture = activityDetectionOutputStream.getOutputDetectedFuture(); + this.outputDetectedOnStderrFuture + .thenRun(() -> { + try { + // we detected that some output was written to the stderr of the python process, it is an error and we will terminate... + Thread.sleep(100); // we need to wait for the remaining output + this.waitForClose.countDown(); + this.close(); + + String errStreamText = this.errorStream.toString(StandardCharsets.UTF_8); + log.error("Python process failed with an error, the error captured from the stderr: " + errStreamText); + } catch (Exception ioe) { + log.error("Python process failed with an error and we cannot close the stream: " + ioe.getMessage(), ioe); + } + }); + } else { + errorOutputStream = new TailOutputStream(this.errorStream); + this.outputDetectedOnStderrFuture = new CompletableFuture<>(); + } this.streamHandler = new FlushingPumpStreamHandler( new FlushingOutputStream(this.readFromProcessStreamProcessSide), diff --git a/dqops/src/main/java/com/dqops/utils/python/TailOutputStream.java b/dqops/src/main/java/com/dqops/utils/python/TailOutputStream.java new file mode 100644 index 0000000000..19ef27288c --- /dev/null +++ b/dqops/src/main/java/com/dqops/utils/python/TailOutputStream.java @@ -0,0 +1,146 @@ +/* + * Copyright © 2021 DQOps (support@dqops.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dqops.utils.python; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.CompletableFuture; + +/** + * Output stream is simply captures the most recently received content and constantly rewinds the output stream. + */ +public class TailOutputStream extends OutputStream { + private final OutputStream nestedStream; + + /** + * Creates an output stream wrapper that will detect any data that was written. + * @param nestedStream Nested stream. + */ + public TailOutputStream(OutputStream nestedStream) { + this.nestedStream = nestedStream; + } + + /** + * Writes {@code b.length} bytes from the specified byte array + * to this output stream. The general contract for {@code write(b)} + * is that it should have exactly the same effect as the call + * {@code write(b, 0, b.length)}. + * + * @param b the data. + * @throws IOException if an I/O error occurs. + * @see OutputStream#write(byte[], int, int) + */ + @Override + public void write(byte[] b) throws IOException { + this.nestedStream.write(b); + } + + /** + * Writes {@code len} bytes from the specified byte array + * starting at offset {@code off} to this output stream. + * The general contract for {@code write(b, off, len)} is that + * some of the bytes in the array {@code b} are written to the + * output stream in order; element {@code b[off]} is the first + * byte written and {@code b[off+len-1]} is the last byte written + * by this operation. + *

+ * The {@code write} method of {@code OutputStream} calls + * the write method of one argument on each of the bytes to be + * written out. Subclasses are encouraged to override this method and + * provide a more efficient implementation. + *

+ * If {@code b} is {@code null}, a + * {@code NullPointerException} is thrown. + *

+ * If {@code off} is negative, or {@code len} is negative, or + * {@code off+len} is greater than the length of the array + * {@code b}, then an {@code IndexOutOfBoundsException} is thrown. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + * @throws IOException if an I/O error occurs. In particular, + * an {@code IOException} is thrown if the output + * stream is closed. + */ + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (this.nestedStream instanceof ByteArrayOutputStream) { + ByteArrayOutputStream byteArrayOutputStream = (ByteArrayOutputStream)this.nestedStream; + byteArrayOutputStream.reset(); + } + this.nestedStream.write(b, off, len); + } + + /** + * Writes the specified byte to this output stream. The general + * contract for {@code write} is that one byte is written + * to the output stream. The byte to be written is the eight + * low-order bits of the argument {@code b}. The 24 + * high-order bits of {@code b} are ignored. + *

+ * Subclasses of {@code OutputStream} must provide an + * implementation for this method. + * + * @param b the {@code byte}. + * @throws IOException if an I/O error occurs. In particular, + * an {@code IOException} may be thrown if the + * output stream has been closed. + */ + @Override + public void write(int b) throws IOException { + this.nestedStream.write(b); + } + + /** + * Flushes this output stream and forces any buffered output bytes + * to be written out. The general contract of {@code flush} is + * that calling it is an indication that, if any bytes previously + * written have been buffered by the implementation of the output + * stream, such bytes should immediately be written to their + * intended destination. + *

+ * If the intended destination of this stream is an abstraction provided by + * the underlying operating system, for example a file, then flushing the + * stream guarantees only that bytes previously written to the stream are + * passed to the operating system for writing; it does not guarantee that + * they are actually written to a physical device such as a disk drive. + *

+ * The {@code flush} method of {@code OutputStream} does nothing. + * + * @throws IOException if an I/O error occurs. + */ + @Override + public void flush() throws IOException { + nestedStream.flush(); + } + + /** + * Closes this output stream and releases any system resources + * associated with this stream. The general contract of {@code close} + * is that it closes the output stream. A closed stream cannot perform + * output operations and cannot be reopened. + *

+ * The {@code close} method of {@code OutputStream} does nothing. + * + * @throws IOException if an I/O error occurs. + */ + @Override + public void close() throws IOException { + nestedStream.close(); + } +}