Skip to content

Commit

Permalink
Merge pull request #621 from jglick/write-timeout
Browse files Browse the repository at this point in the history
[JENKINS-70531] Apply timeout on WebSocket write operations (and simplify `AbstractByteBufferCommandTransport`)
  • Loading branch information
jglick authored Feb 3, 2023
2 parents 75928a7 + 4e88ad5 commit 665000b
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,18 @@ public abstract class AbstractByteBufferCommandTransport extends CommandTranspor
* Our channel.
*/
private Channel channel;
/**
* The chunk header buffer.
*/
private final ByteBuffer writeChunkHeader = ByteBuffer.allocate(2);
@Deprecated
private final ByteBuffer writeChunkHeader;
/**
* The transport frame size.
*/
private int transportFrameSize = 8192;
@Deprecated
private ByteBuffer writeChunkBody;
/**
* The chunk body.
* The chunk header & body buffer.
*/
private ByteBuffer writeChunkBody = ByteBuffer.allocate(transportFrameSize);
private ByteBuffer writeChunkCombined;
/**
* The delegate, this is required as we cannot access some of the methods of {@link ChunkHeader} outside of the
* remoting module.
Expand Down Expand Up @@ -120,14 +120,43 @@ public abstract class AbstractByteBufferCommandTransport extends CommandTranspor
*/
private final ByteBufferQueue sendStaging = new ByteBufferQueue(transportFrameSize);

/**
* @deprecated Pass {@code true} to {@link #AbstractByteBufferCommandTransport(boolean)} and switch {@link #write(ByteBuffer, ByteBuffer)} to {@link #write(ByteBuffer)}.
*/
@Deprecated
protected AbstractByteBufferCommandTransport() {
this(false);
}

protected AbstractByteBufferCommandTransport(boolean combineBuffers) {
if (combineBuffers) {
writeChunkHeader = null;
writeChunkBody = null;
writeChunkCombined = ByteBuffer.allocate(transportFrameSize + ChunkHeader.SIZE);
} else { // deprecated
writeChunkHeader = ByteBuffer.allocate(ChunkHeader.SIZE);
writeChunkBody = ByteBuffer.allocate(transportFrameSize);
writeChunkCombined = null;
}
}

/**
* @deprecated pass true to {@link #AbstractByteBufferCommandTransport(boolean)} and implement {@link #write(ByteBuffer)}
*/
@Deprecated
protected void write(ByteBuffer header, ByteBuffer data) throws IOException {
throw new AbstractMethodError("implement write(ByteBuffer, ByteBuffer) if !combineBuffers");
}

/**
* Write the packet.
*
* @param header the header to write.
* @param data the data to write.
* @param headerAndData the header and data to write.
* @throws IOException if the data could not be written.
*/
protected abstract void write(ByteBuffer header, ByteBuffer data) throws IOException;
protected void write(ByteBuffer headerAndData) throws IOException {
throw new AbstractMethodError("implement write(ByteBuffer) if combineBuffers");
}

/**
* Handle receiving some data.
Expand All @@ -143,7 +172,7 @@ public final void receive(@NonNull ByteBuffer data) throws IOException, Interrup
while (data.hasRemaining() || readState == READ_STATE_COMMAND_READY) {
switch (readState) {
case READ_STATE_NEED_HEADER:
if (data.remaining() >= 2) {
if (data.remaining() >= ChunkHeader.SIZE) {
// jump straight to state 2
readFrameHeader = ChunkHeader.read(data);
readFrameRemaining = ChunkHeader.length(readFrameHeader);
Expand Down Expand Up @@ -247,7 +276,11 @@ public void setFrameSize(int transportFrameSize) {
}
this.transportFrameSize = transportFrameSize;
// this is the only one that matters when it comes to sizing as we have to accept any frame size on receive
writeChunkBody = ByteBuffer.allocate(transportFrameSize);
if (writeChunkHeader == null) {
writeChunkCombined = ByteBuffer.allocate(transportFrameSize + ChunkHeader.SIZE);
} else {
writeChunkBody = ByteBuffer.allocate(transportFrameSize);
}
}

/**
Expand Down Expand Up @@ -293,14 +326,23 @@ public final void write(Command cmd, boolean last) throws IOException {
int frame = remaining > transportFrameSize
? transportFrameSize
: (int) remaining; // # of bytes we send in this chunk
((Buffer) writeChunkHeader).clear();
ChunkHeader.write(writeChunkHeader, frame, remaining > transportFrameSize);
((Buffer) writeChunkHeader).flip();
((Buffer) writeChunkBody).clear();
((Buffer) writeChunkBody).limit(frame);
sendStaging.get(writeChunkBody);
((Buffer) writeChunkBody).flip();
write(writeChunkHeader, writeChunkBody);
if (writeChunkHeader == null) {
((Buffer) writeChunkCombined).clear();
((Buffer) writeChunkCombined).limit(frame + ChunkHeader.SIZE);
ChunkHeader.write(writeChunkCombined, frame, remaining > transportFrameSize);
sendStaging.get(writeChunkCombined);
((Buffer) writeChunkCombined).flip();
write(writeChunkCombined);
} else {
((Buffer) writeChunkHeader).clear();
ChunkHeader.write(writeChunkHeader, frame, remaining > transportFrameSize);
((Buffer) writeChunkHeader).flip();
((Buffer) writeChunkBody).clear();
((Buffer) writeChunkBody).limit(frame);
sendStaging.get(writeChunkBody);
((Buffer) writeChunkBody).flip();
write(writeChunkHeader, writeChunkBody);
}
remaining -= frame;
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/hudson/remoting/ChunkHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
* Parsing of the chunk header.
*
* <p>
* The header is 2 bytes, in the network order. The first bit designates whether this chunk
* The header is {@link #SIZE} bytes, in the network order. The first bit designates whether this chunk
* is the last chunk (0 if this is the last chunk), and the remaining 15 bits designate the
* length of the chunk as unsigned number.
*
* @author Kohsuke Kawaguchi
*/
public class ChunkHeader {

public static final int SIZE = 2;

public static int read(ByteBuffer buf) {
return parse(buf.get(), buf.get());
}
Expand Down Expand Up @@ -57,7 +60,7 @@ public static void write(ByteBufferQueue buf, int length, boolean hasMore) {
}

public static byte[] pack(int length, boolean hasMore) {
byte[] header = new byte[2];
byte[] header = new byte[SIZE];
header[0] = (byte)((hasMore?0x80:0)|(length>>8));
header[1] = (byte)(length);
return header;
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/hudson/remoting/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -644,13 +644,17 @@ public void onError(Session session, Throwable x) {
class Transport extends AbstractByteBufferCommandTransport {
final Session session;
Transport(Session session) {
super(true);
this.session = session;
}
@Override
protected void write(ByteBuffer header, ByteBuffer data) throws IOException {
LOGGER.finest(() -> "sending message of length + " + ChunkHeader.length(ChunkHeader.peek(header)));
session.getBasicRemote().sendBinary(header, false);
session.getBasicRemote().sendBinary(data, true);
protected void write(ByteBuffer headerAndData) throws IOException {
LOGGER.finest(() -> "sending message of length " + (headerAndData.remaining() - ChunkHeader.SIZE));
try {
session.getAsyncRemote().sendBinary(headerAndData).get(5, TimeUnit.MINUTES);
} catch (Exception x) {
throw new IOException(x);
}
}

@Override
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/jenkinsci/remoting/nio/NioChannelHub.java
Original file line number Diff line number Diff line change
Expand Up @@ -608,15 +608,15 @@ public void run() {
t.closeR();
}

final byte[] buf = new byte[2]; // space for reading the chunk header
final byte[] buf = new byte[ChunkHeader.SIZE];
int pos=0;
int packetSize=0;
while (true) {
if (t.rb.peek(pos,buf)<buf.length)
if (t.rb.peek(pos,buf)<ChunkHeader.SIZE)
break; // we don't have enough to parse header
int header = ChunkHeader.parse(buf);
int chunk = ChunkHeader.length(header);
pos+=buf.length+chunk;
pos+=ChunkHeader.SIZE+chunk;
packetSize+=chunk;
boolean last = ChunkHeader.isLast(header);
if (last && pos<=t.rb.readable()) {// do we have the whole packet in our buffer?
Expand All @@ -625,7 +625,7 @@ public void run() {
int r_ptr = 0;
do {
int r = t.rb.readNonBlocking(buf);
assert r==buf.length;
assert r==ChunkHeader.SIZE;
header = ChunkHeader.parse(buf);
chunk = ChunkHeader.length(header);
last = ChunkHeader.isLast(header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,19 +313,19 @@ private class ByteBufferCommandTransport extends AbstractByteBufferCommandTransp
* @param remoteCapability the remote capability
*/
public ByteBufferCommandTransport(Capability remoteCapability) {
super(true);
this.remoteCapability = remoteCapability;
}

/**
* {@inheritDoc}
*/
@Override
protected void write(ByteBuffer header, ByteBuffer data) throws IOException {
protected void write(ByteBuffer headerAndData) throws IOException {
//TODO: Any way to get channel information here
if (isWriteOpen()) {
try {
ChannelApplicationLayer.this.write(header);
ChannelApplicationLayer.this.write(data);
ChannelApplicationLayer.this.write(headerAndData);
} catch (ClosedChannelException e) {
// Probably it should be another exception type at all
throw new ChannelClosedException(null, "Protocol stack cannot write data anymore. ChannelApplicationLayer reports that the NIO Channel is closed", e);
Expand Down

0 comments on commit 665000b

Please sign in to comment.