Skip to content

Commit

Permalink
Support silently capturing output from Python if necessary.
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrczarnas committed Oct 22, 2024
1 parent 808fd7e commit 27b1082
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class StreamingPythonProcess implements Closeable, ExecuteResultHandler {
private static final int PYTHON_BUFFER_SIZE = 1024; // buffer size used in the python streaming process in a call to TextIO.read(buffer_size)
private static final int PYTHON_RECEIVE_RESPONSE_BUFFER_SIZE = 1024;
private static final byte[] PYTHON_BUFFER_SPACE = StringUtils.repeat(' ', PYTHON_BUFFER_SIZE + 10).getBytes(StandardCharsets.UTF_8);
private static final boolean STOP_ON_STDERR_ERROR = true; // TODO: We can support using a parameter to turn it on/off

private PipedOutputStream writeToProcessStream;
private PipedInputStream writeToProcessStreamProcessSide;
Expand Down Expand Up @@ -257,23 +258,29 @@ public void startProcessCore(PythonVirtualEnv pythonVirtualEnv) {
this.jsonFactory = new JsonFactory();
this.jsonParser = jsonFactory.createParser(this.readFromProcessStreamReader);

ActivityDetectionOutputStream errorOutputStream = new ActivityDetectionOutputStream(new FlushingOutputStream(this.errorStream));
this.outputDetectedOnStderrFuture = errorOutputStream.getOutputDetectedFuture();
this.outputDetectedOnStderrFuture
.thenRun(() -> {
try {
// we detected that some output was written to the stderr of the python process, it is an error and we will terminate...
Thread.sleep(100); // we need to wait for the remaining output
this.waitForClose.countDown();
this.close();

String errStreamText = this.errorStream.toString(StandardCharsets.UTF_8);
log.error("Python process failed with an error, the error captured from the stderr: " + errStreamText);
}
catch (Exception ioe) {
log.error("Python process failed with an error and we cannot close the stream: " + ioe.getMessage(), ioe);
}
});
OutputStream errorOutputStream = null;
if (STOP_ON_STDERR_ERROR) {
ActivityDetectionOutputStream activityDetectionOutputStream = new ActivityDetectionOutputStream(new FlushingOutputStream(this.errorStream));
errorOutputStream = activityDetectionOutputStream;
this.outputDetectedOnStderrFuture = activityDetectionOutputStream.getOutputDetectedFuture();
this.outputDetectedOnStderrFuture
.thenRun(() -> {
try {
// we detected that some output was written to the stderr of the python process, it is an error and we will terminate...
Thread.sleep(100); // we need to wait for the remaining output
this.waitForClose.countDown();
this.close();

String errStreamText = this.errorStream.toString(StandardCharsets.UTF_8);
log.error("Python process failed with an error, the error captured from the stderr: " + errStreamText);
} catch (Exception ioe) {
log.error("Python process failed with an error and we cannot close the stream: " + ioe.getMessage(), ioe);
}
});
} else {
errorOutputStream = new TailOutputStream(this.errorStream);
this.outputDetectedOnStderrFuture = new CompletableFuture<>();
}

this.streamHandler = new FlushingPumpStreamHandler(
new FlushingOutputStream(this.readFromProcessStreamProcessSide),
Expand Down
146 changes: 146 additions & 0 deletions dqops/src/main/java/com/dqops/utils/python/TailOutputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright © 2021 DQOps (support@dqops.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dqops.utils.python;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;

/**
* Output stream is simply captures the most recently received content and constantly rewinds the output stream.
*/
public class TailOutputStream extends OutputStream {
private final OutputStream nestedStream;

/**
* Creates an output stream wrapper that will detect any data that was written.
* @param nestedStream Nested stream.
*/
public TailOutputStream(OutputStream nestedStream) {
this.nestedStream = nestedStream;
}

/**
* Writes {@code b.length} bytes from the specified byte array
* to this output stream. The general contract for {@code write(b)}
* is that it should have exactly the same effect as the call
* {@code write(b, 0, b.length)}.
*
* @param b the data.
* @throws IOException if an I/O error occurs.
* @see OutputStream#write(byte[], int, int)
*/
@Override
public void write(byte[] b) throws IOException {
this.nestedStream.write(b);
}

/**
* Writes {@code len} bytes from the specified byte array
* starting at offset {@code off} to this output stream.
* The general contract for {@code write(b, off, len)} is that
* some of the bytes in the array {@code b} are written to the
* output stream in order; element {@code b[off]} is the first
* byte written and {@code b[off+len-1]} is the last byte written
* by this operation.
* <p>
* The {@code write} method of {@code OutputStream} calls
* the write method of one argument on each of the bytes to be
* written out. Subclasses are encouraged to override this method and
* provide a more efficient implementation.
* <p>
* If {@code b} is {@code null}, a
* {@code NullPointerException} is thrown.
* <p>
* If {@code off} is negative, or {@code len} is negative, or
* {@code off+len} is greater than the length of the array
* {@code b}, then an {@code IndexOutOfBoundsException} is thrown.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @throws IOException if an I/O error occurs. In particular,
* an {@code IOException} is thrown if the output
* stream is closed.
*/
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (this.nestedStream instanceof ByteArrayOutputStream) {
ByteArrayOutputStream byteArrayOutputStream = (ByteArrayOutputStream)this.nestedStream;
byteArrayOutputStream.reset();
}
this.nestedStream.write(b, off, len);
}

/**
* Writes the specified byte to this output stream. The general
* contract for {@code write} is that one byte is written
* to the output stream. The byte to be written is the eight
* low-order bits of the argument {@code b}. The 24
* high-order bits of {@code b} are ignored.
* <p>
* Subclasses of {@code OutputStream} must provide an
* implementation for this method.
*
* @param b the {@code byte}.
* @throws IOException if an I/O error occurs. In particular,
* an {@code IOException} may be thrown if the
* output stream has been closed.
*/
@Override
public void write(int b) throws IOException {
this.nestedStream.write(b);
}

/**
* Flushes this output stream and forces any buffered output bytes
* to be written out. The general contract of {@code flush} is
* that calling it is an indication that, if any bytes previously
* written have been buffered by the implementation of the output
* stream, such bytes should immediately be written to their
* intended destination.
* <p>
* If the intended destination of this stream is an abstraction provided by
* the underlying operating system, for example a file, then flushing the
* stream guarantees only that bytes previously written to the stream are
* passed to the operating system for writing; it does not guarantee that
* they are actually written to a physical device such as a disk drive.
* <p>
* The {@code flush} method of {@code OutputStream} does nothing.
*
* @throws IOException if an I/O error occurs.
*/
@Override
public void flush() throws IOException {
nestedStream.flush();
}

/**
* Closes this output stream and releases any system resources
* associated with this stream. The general contract of {@code close}
* is that it closes the output stream. A closed stream cannot perform
* output operations and cannot be reopened.
* <p>
* The {@code close} method of {@code OutputStream} does nothing.
*
* @throws IOException if an I/O error occurs.
*/
@Override
public void close() throws IOException {
nestedStream.close();
}
}

0 comments on commit 27b1082

Please sign in to comment.